本文共 3137 字,大约阅读时间需要 10 分钟。
(1)SparkStreaming 整合 kafka 两种方式对比
Direct 方式的优缺点分析 :
- 优点:
- 简化并行(Simplified Parallelism)。不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应。
- 高效(Efficiency)。基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAheadLog.enable=true,此种方式需要保存两份数据,浪费存储空间也影响效率。而 Direct 方式则不存在这个问题。
- 强一致语义(Exactly-once semantics)。High-level 数据由 Spark Streaming 消费,但是Offsets 则是由 Zookeeper 保存。通过参数配置,可以实现 at-least once 消费,此种情况有重复消费数据的可能。
- 降低资源。Direct 不需要 Receivers,其申请的 Executors 全部参与到计算任务中;而Receiver-based 则需要专门的 Receivers 来读取 Kafka 数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
- 降低内存。Receiver-based 的 Receiver 与其他 Exectuor 是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高 Receiver 的内存,但是参与计算的 Executor 并无需那么多的内存。而 Direct 因为没有 Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
- 缺点:
- 提高成本。Direct 需要用户采用 checkpoint 或者第三方存储来维护 offsets,而不像Receiver-based 那样,通过 ZooKeeper 来维护 Offsets,此提高了用户的开发成本。
- 监控可视化。Receiver-based 方式指定 topic 指定 consumer 的消费情况均能通过ZooKeeper 来监控,而 Direct 则没有这种便利,不能自动保存 offset 到 zookeeper,如果做到监控并可视化,则需要投入人力开发。Receiver 方式的优缺点分析 :
- 优点:
- 专注计算。Kafka 的 high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护 consumer 的 offsets,这减少用户的工作量以及代码量而且相对比较简单。
- 缺点:
- 防数据丢失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable参数,配置 spark.streaming.receiver.writeAheadLog.enable 参数,每次处理之前需要将该batch 内的日志备份到 checkpoint 目录中,这降低了数据处理效率,反过来又加重了Receiver 端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
- 单 Receiver 内存。由于 receiver 也是属于 Executor 的一部分,那么为了提高吞吐量
- 重复消费。在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新 offset的情况,这导致数据重复消费。
- Receiver 和计算的 Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver 则在一直接收数据,这非常容易导致程序崩溃。
(2)对kafka消费的offset的管理
- spark自带的checkpoint:
- 启用spark streaming的checkpoint是存储偏移量的最简单方法
- 流式checkpoint专门保存用户应用程序的状态
- 但是checkpoint的目录是不能共享的,无法跨越应用程序进行恢复
- 一般不使用checkpoint管理offset
- 使用zookeeper管理offset
- 如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset
- 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
- 使用hbase保存offset
- Rowkey的设计:topic名称 + groupid + streaming的batchtime.milliSeconds
- 使用hdfs管理offset:当然这种情况不推荐使用,因为在hdfs中会生成大量的小文件,导致,hdfs的性能急剧下降
(3)Driver的HA
介绍:他能够在driver失败的时候,通过读取checkpoint目录下的元数据,恢复当前streamingContext对象的状态;它能够察觉到driver进程异常退出之后,自动重启。
具体流程:当第一次运行程序时,发现checkpoint中没有数据,则根据定义的函数来第一次创建StreamingContext对象,当程序异常退出的时候,此时会根据checkpoint中的元数据恢复一个StreamingContext对象,达到异常退出之前的状态,而实现异常退出并自动启动则是sparkStreaming应用程序对driver进行监控,并且在他失败的时候感知,并进行重启。 必要条件: - spark-submit提交作业的时候,必须是集群模式(cluster),并且必须在spark-standalong下。 spark-submit \--class com.aura.mazh.spark.streaming.kafka.SparkStreamDemo_Direct \//这里只能使用spark的standalong模式,所以配置为spark集群--master spark://hadoop02:7077,hadoop04:7077 \--driver-memory 512m \--total-executor-cores 3 \--executor-memory 512m \#这句代码一定要加,他可以使异常退出的driver程序,重新启动--supervise \ --name SparkStreamDemo_Direct \--jars /home/hadoop/lib/kafka_2.11-0.8.2.1.jar,\/home/hadoop/lib/metrics-core-2.2.0.jar,\/home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\/home/hadoop/lib/spark-streaming-kafka-0-8_2.11-2.3.2.jar,\/home/hadoop/lib/zkclient-0.3.jar \/home/hadoop/original-spark-1.0-SNAPSHOT.jar \spark://hadoop02:7077,hadoop04:7077
- 需要添加--supervise \,才能实现失败自启动
- 需要配置checkpoint目录,并且是存储在hdfs上,jar也要放置在hdfs上
转载于:https://blog.51cto.com/14048416/2339933