博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming整合kafka的补充
阅读量:6402 次
发布时间:2019-06-23

本文共 3137 字,大约阅读时间需要 10 分钟。

(1)SparkStreaming 整合 kafka 两种方式对比

Direct 方式的优缺点分析

  • 优点:
    • 简化并行(Simplified Parallelism)。不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应。
    • 高效(Efficiency)。基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAheadLog.enable=true,此种方式需要保存两份数据,浪费存储空间也影响效率。而 Direct 方式则不存在这个问题。
    • 强一致语义(Exactly-once semantics)。High-level 数据由 Spark Streaming 消费,但是Offsets 则是由 Zookeeper 保存。通过参数配置,可以实现 at-least once 消费,此种情况有重复消费数据的可能。
    • 降低资源。Direct 不需要 Receivers,其申请的 Executors 全部参与到计算任务中;而Receiver-based 则需要专门的 Receivers 来读取 Kafka 数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
    • 降低内存。Receiver-based 的 Receiver 与其他 Exectuor 是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高 Receiver 的内存,但是参与计算的 Executor 并无需那么多的内存。而 Direct 因为没有 Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
  • 缺点:
    • 提高成本。Direct 需要用户采用 checkpoint 或者第三方存储来维护 offsets,而不像Receiver-based 那样,通过 ZooKeeper 来维护 Offsets,此提高了用户的开发成本。
    • 监控可视化。Receiver-based 方式指定 topic 指定 consumer 的消费情况均能通过ZooKeeper 来监控,而 Direct 则没有这种便利,不能自动保存 offset 到 zookeeper,如果做到监控并可视化,则需要投入人力开发。
      Receiver 方式的优缺点分析
  • 优点:
    • 专注计算。Kafka 的 high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护 consumer 的 offsets,这减少用户的工作量以及代码量而且相对比较简单。
  • 缺点:
    • 防数据丢失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable参数,配置 spark.streaming.receiver.writeAheadLog.enable 参数,每次处理之前需要将该batch 内的日志备份到 checkpoint 目录中,这降低了数据处理效率,反过来又加重了Receiver 端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
    • 单 Receiver 内存。由于 receiver 也是属于 Executor 的一部分,那么为了提高吞吐量
    • 重复消费。在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新 offset的情况,这导致数据重复消费。
    • Receiver 和计算的 Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver 则在一直接收数据,这非常容易导致程序崩溃。

      (2)对kafka消费的offset的管理

  • spark自带的checkpoint:
    • 启用spark streaming的checkpoint是存储偏移量的最简单方法
    • 流式checkpoint专门保存用户应用程序的状态
    • 但是checkpoint的目录是不能共享的,无法跨越应用程序进行恢复
    • 一般不使用checkpoint管理offset
  • 使用zookeeper管理offset
    • 如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset
    • 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
  • 使用hbase保存offset
    • Rowkey的设计:topic名称 + groupid + streaming的batchtime.milliSeconds
  • 使用hdfs管理offset:当然这种情况不推荐使用,因为在hdfs中会生成大量的小文件,导致,hdfs的性能急剧下降

    (3)Driver的HA

      介绍:他能够在driver失败的时候,通过读取checkpoint目录下的元数据,恢复当前streamingContext对象的状态;它能够察觉到driver进程异常退出之后,自动重启。

      具体流程:当第一次运行程序时,发现checkpoint中没有数据,则根据定义的函数来第一次创建StreamingContext对象,当程序异常退出的时候,此时会根据checkpoint中的元数据恢复一个StreamingContext对象,达到异常退出之前的状态,而实现异常退出并自动启动则是sparkStreaming应用程序对driver进行监控,并且在他失败的时候感知,并进行重启。
      必要条件
      - spark-submit提交作业的时候,必须是集群模式(cluster),并且必须在spark-standalong下。

    spark-submit \--class com.aura.mazh.spark.streaming.kafka.SparkStreamDemo_Direct \//这里只能使用spark的standalong模式,所以配置为spark集群--master spark://hadoop02:7077,hadoop04:7077 \--driver-memory 512m \--total-executor-cores 3 \--executor-memory 512m \#这句代码一定要加,他可以使异常退出的driver程序,重新启动--supervise \   --name SparkStreamDemo_Direct \--jars /home/hadoop/lib/kafka_2.11-0.8.2.1.jar,\/home/hadoop/lib/metrics-core-2.2.0.jar,\/home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\/home/hadoop/lib/spark-streaming-kafka-0-8_2.11-2.3.2.jar,\/home/hadoop/lib/zkclient-0.3.jar \/home/hadoop/original-spark-1.0-SNAPSHOT.jar \spark://hadoop02:7077,hadoop04:7077

      - 需要添加--supervise \,才能实现失败自启动

      - 需要配置checkpoint目录,并且是存储在hdfs上,jar也要放置在hdfs上

转载于:https://blog.51cto.com/14048416/2339933

你可能感兴趣的文章
即将成为传奇的微软
查看>>
Android项目实战--手机卫士18--读取用户的短信内容以及短信备份
查看>>
nanosleep纳秒级延迟
查看>>
[C#] 我的log4net使用手册
查看>>
FineUI官方论坛出现空白页的解决办法!
查看>>
进程和线程之间的通信
查看>>
Android MIFARE NFCA源码解析
查看>>
Maven自定义Archetype(zz)
查看>>
设计模式java----单例模式
查看>>
西西弗的石头----读《哲学家都干了些什么》有感
查看>>
【OCR技术系列之二】文字定位与切割
查看>>
【300】◀▶ IDL - ENVI API
查看>>
Docker初体验
查看>>
UBUNTU LINUX中连接ANDROID 小米真机调试
查看>>
[转] Zend Framework 中 htaccess 的标准配置
查看>>
linux就是这个范儿之融于心而表于行(1)
查看>>
maven安装配置部署建项运行
查看>>
node爬虫
查看>>
接口测试之JMeter初探
查看>>
Docker背后的内核知识——cgroups资源限制(转)
查看>>