Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

SparkStreaming integrates kafka

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Project architecture:

Log data-> flume- > kafka- > spark streaming- > mysql/redis/hbase

Prerequisites:

Install zookeeper install flume install kafakhadoop to achieve high availability (1) implement flume to collect data to kafka start kafak:nohup kafka-server-start.sh\ / application/kafka_2.11-1.1.0/config/server.properties\ 1 > / home/hadoop/logs/kafka_std.log\ 2 > / home/hadoop/logs/kafka_err.log & create a non-kafaktopic:kafka-topics.sh\-- create\-- zookeeper hadoop01:2181,hadoop02:2181 Hadoop03:2181/kafka\-- replication-factor 3\-- partitions 3\-- topic zy-flume-kafka to check whether it has been created successfully: kafka-topics.sh\-- zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka\-- describe\-- topic zy-flume-kafka

Configure the collection scheme of flume

Level 1: exec-avro.conf

Agent1.sources = r1agent1.channels = c1agent1.sinks = k1#define sourcesagent1.sources.r1.type = execagent1.sources.r1.command = tail-F / application/flume-1.8.0-bin/data/sample.log#define channelsagent1.channels.c1.type = memoryagent1.channels.c1.capacity = 1000agent1.channels.c1.transactionCapacity = 100#define sinkagent1.sinks.k1.type = avroagent1.sinks.k1.hostname = hadoop02agent1.sinks.k1.port = 3212#bind sources and sink to channelagent1.sources.r1.channels = c1agent1.sinks.k1.channel = C1

Level 2: avro-kafka.conf

Agent2.sources = r2agent2.channels = c2agent2.sinks = k2#define sourcesagent2.sources.r2.type = avroagent2.sources.r2.bind = hadoop02agent2.sources.r2.port = 3212#define channelsagent2.channels.c2.type = memoryagent2.channels.c2.capacity = 1000agent2.channels.c2.transactionCapacity = 100#define sinkagent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSinkagent2.sinks.k2.brokerList = hadoop01:9092,hadoop02:9092 Hadoop03:9092agent2.sinks.k2.topic = zy-flume-kafkaagent2.sinks.k2.batchSize = 4agent2.sinks.k2.requiredAcks = 1#bind sources and sink to channelagent2.sources.r2.channels = c2agent2.sinks.k2.channel = c2

Start flume

Hadoop02:

Flume-ng agent\-conf/ application/flume-1.8.0-bin/conf/\-name agent2\-conf-file / application/flume-1.8.0-bin/flume_sh/avro-kafka.conf\-Dflume.root.logger=DEBUG,console

Hadoop01:

Flume-ng agent\-conf/ application/flume-1.8.0-bin/conf/\-name agent1\-conf-file / application/flume-1.8.0-bin/flume_sh/exec-avro.conf\-Dflume.root.logger=DEBUG,console

Note: be sure to start the second level before starting the first level.

test

Start a kafakconsumer

Kafka-console-consumer.sh\-bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092\-from-beginning\-topic zy-flume-kafka

Add data to the monitoring file: tail-10 sample.temp > > sample.log

Watch kafkaconsumer: consume data!

(2) realize sparkStreaming to read and process the data in kafka.

There are two ways for   SparkStreaming to integrate kafka:

  -receiver + checkpoint mode

  -direct + zookeeper mode

1) receiver + checkpoint mode

Code:

/ * read data in kafka based on Receiver * / object _ 01SparkKafkaReceiverOps {def main (args: Array [String]): Unit = {/ / determine whether the number of parameters passed by the program is correct / / 2 hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka first zy-flume-kafka if (args = = null | | args.length

< 4) { println( """ |Parameter Errors! Usage: |batchInterval : 批次间隔时间 |zkQuorum : zookeeper url地址 |groupId : 消费组的id |topic : 读取的topic """.stripMargin) System.exit(-1) } //获取程序传入的参数 val Array(batchInterval, zkQuorum, groupId, topic) = args //1.构建程序入口 val conf: SparkConf = new SparkConf() .setMaster("local[2]") .setAppName("_01SparkKafkaReceiverOps") val ssc =new StreamingContext(conf,Seconds(2)) /**2.使用Receiver方式读取数据 * @param ssc * @param zkQuorum * @param groupId * @param topics * @param storageLevel default: StorageLevel.MEMORY_AND_DISK_SER_2 * @return DStream of (Kafka message key, Kafka message value) */ val topics = topic.split("\\s+").map((_,3)).toMap //2.读取数据 val message: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topics) //3.打印数据 message.print() //4.提交任务 ssc.start() ssc.awaitTermination() }} 注意(receiver +checkpoint):  - kafka中的topic和sparkstreaming中生成的RDD分区没有关系,在KafkaUtils.createStream中增加分区数只会增加单个receiver的线程数,不会增加spark的并行度  - 可以创建多个kafka的输入DStream,使用不同的group和topic,使用多个receiver并行接收数据  - 如果启用了HDFS等有容错的存储系统,并且启用了写入日,则接收到的数据已经被复制到日志中。 2)direct +zookeeper方式 代码实现 package com.zy.streamingimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}import org.apache.curator.retry.ExponentialBackoffRetryimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Seconds, StreamingContext}/** * 使用zk来管理的消费的偏移量,确保当SparkStreaming挂掉之后在重启的时候, * 能够从正确的offset偏移量的位置开始消费,而不是从头开始消费 */object SparkStreamingDriverHAOps { //设置zookeeper中存放偏移量的位置 val zkTopicOffsetPath="/offset" //获取zookeeper的编程入口 val client:CuratorFramework={ val client=CuratorFrameworkFactory.builder() .connectString("hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka") .namespace("2019_1_7") .retryPolicy(new ExponentialBackoffRetry(1000,3)) .build() client.start() client } def main(args: Array[String]): Unit = { //屏蔽日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //2 direct zy-flume-kafka if(args==null||args.length"hadoop01:9092,hadoop02:9092,hadoop03:9092", //集群入口 "auto.offset.reset"->

"smallest" / / consumption mode) / / 2. Create the messageval message:DStream of kafka [(String,String)] = createMessage (topic,groupId,ssc,kafkaParams) / / 3. Business processing, here is mainly about how to integrate sparkStreaming with kafka, so there is no business processing message.foreachRDD (rdd= > {if (! rdd.isEmpty ()) {println ("" | # > _)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report