Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Production SparkStreaming data Zero loss Best practices (including code)

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

MySQL creates a table to store offset mysql > use testmysql > create table hlw_offset (topic varchar (32), groupid varchar (50), partitions int, fromoffset bigint, untiloffset bigint, primary key (topic,groupid,partitions)) Maven dependency package 2.11.82.3.12.5.0 Murray-org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2 .11 ${spark.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-8cm 2.11 ${spark.version} mysql mysql-connector-java 5.1.27 org.scalikejdbc scalikejdbc_2.11 2.5.0 org.scalikejdbc scalikejdbc-config_2.11 2.5.0 com.typesafe config 1.3.0 Org.apache.commons commons-lang3 3.5 implementation idea 1) StreamingContext2) get data from kafka (offset-- from external storage > data from kafka according to offset) 3) logically process according to business 4) store the processing results in external storage-save offset5) startup program Wait for the program to finish the code implementation

The SparkStreaming body code is as follows

Import kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka. {HasOffsetRanges, KafkaUtils} import org.apache.spark.streaming. {Seconds, StreamingContext} import scalikejdbc._import scalikejdbc.config._object JDBCOffsetApp {def main (args: Array [String]): Unit = {/ / create SparkStreaming entry val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("JDBCOffsetApp") val ssc = new StreamingContext (conf) Seconds (5)) / / kafka consumption theme val topics = ValueUtils.getStringValue ("kafka.topics"). Split (","). ToSet / / kafka parameter / / the custom ValueUtils utility class is applied here To get the parameters in application.conf Convenient for later modification val kafkaParams = Map [String,String] ("metadata.broker.list"-> ValueUtils.getStringValue ("metadata.broker.list"), "auto.offset.reset"-> ValueUtils.getStringValue ("auto.offset.reset") "group.id"-> ValueUtils.getStringValue ("group.id") / / first use scalikejdbc to read offset information from MySQL database / / +-+ / / | topic | groupid | partitions | fromoffset | untiloffset | / / +-+ / / MySQL table structure as above Read out the columns "topic", "partitions" and "untiloffset" / / form fromOffsets: Map [TopicAndPartition, Long] Later createDirectStream uses DBs.setup () val fromOffset = DB.readOnly (implicit session = > {SQL ("select * from hlw_offset") .map (rs = > {(TopicAndPartition (rs.string ("topic"), rs.int ("partitions"), rs.long ("untiloffset")}). List (). Apply ()}). ToMap / / if there is no offset information in MySQL table, start consumption from 0 If so, start with the existing offset. Val messages = if (fromOffset.isEmpty) {println ("consume from scratch.") KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder] (ssc,kafkaParams,topics)} else {println ("consumption from existing records...") Val messageHandler = (mm:MessageAndMetadata [String,String]) = > (mm.key (), mm.message ()) KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder, (String,String)] (ssc,kafkaParams,fromOffset MessageHandler)} messages.foreachRDD (rdd= > {if (! rdd.isEmpty ()) {/ / output the amount of data of rdd println ("data statistics record is:" + rdd.count ()) / / the method of obtaining rdd offset information given in the official case OffsetRanges is an array of offsetRange / / trait HasOffsetRanges {/ / def offsetRanges: Array [OffsetRange] / /} val offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges offsetRanges.foreach (x = > {/ / output topics for each consumption, partition Start offset and end offset println (s "--${x.topic}, ${x.partition}, ${x.fromOffset}, ${x.untilOffset}--") / / saves the latest offset information to the MySQL table DB.autoCommit (implicit session = > {SQL ("replace into hlw_offset (topic,groupid,partitions,fromoffset,untiloffset) values (?) ) .bind (x.topicMagneValueUtils.getStringValue ("group.id"), x.partitionmeme x.fromOffset.x.untilOffset) .update () .apply ()}) ssc.start () ssc.awaitTermination ()}

The custom ValueUtils utility classes are as follows

Import com.typesafe.config.ConfigFactoryimport org.apache.commons.lang3.StringUtilsobject ValueUtils {val load = ConfigFactory.load () def getStringValue (key:String, defaultValue:String= "") = {val value = load.getString (key) if (StringUtils.isNotEmpty (value)) {value} else {defaultValue}}

The application.conf content is as follows

Metadata.broker.list = "192.168.137.251 group.id 9092" auto.offset.reset = "smallest" group.id = "hlw_offset_group" kafka.topics = "hlw_offset" serializer.class = "kafka.serializer.StringEncoder" request.required.acks = "1" # JDBC settingsdb.default.driver = "com.mysql.jdbc.Driver" db.default.url= "jdbc:mysql://hadoop000:3306/test" db.default.user= "root" db.default.password= "123456"

Custom kafka producer

Import java.util. {Date, Properties} import kafka.producer. {KeyedMessage, Producer, ProducerConfig} object KafkaProducer {def main (args: Array [String]): Unit = {val properties = new Properties () properties.put ("serializer.class", ValueUtils.getStringValue ("serializer.class") properties.put ("metadata.broker.list", ValueUtils.getStringValue ("metadata.broker.list") properties.put ("request.required.acks") ValueUtils.getStringValue ("request.required.acks") val producerConfig = new ProducerConfig (properties) val producer = new Producer [String,String] (producerConfig) val topic = ValueUtils.getStringValue ("kafka.topics") / / generate 100 pieces of data at a time var I = 0 for (I select * from hlw_offset Empty set (0.00 sec)

Generate 500pieces of data through kafka producer

Start the SparkStreaming program

/ / console output result: consumption from scratch. The statistics record is as follows: 500 select select off set 05pm 0500 select off set-View MySQL table, successful record by mysql > select * set +-+ | topic | groupid | partitions | fromoffset | untiloffset | +- +-+ | hlw_offset | hlw_offset_group | 0 | 0 | 500 | +- -+

Close the SparkStreaming program, use kafka producer to produce 300pieces of data, and start the spark program again (if the spark starts consuming from 500, the offset is read successfully and the semantics is read only once)

/ / console result output: consumption starts from existing records. The statistical record of the data is as follows: 300 murmuri hlwang offsetpr 0500800 Meltel-

View updated offset MySQL data

Mysql > select * from hlw_offset +-+ | topic | groupid | partitions | fromoffset | untiloffset | +- +-+ | hlw_offset | hlw_offset_group | 0 | 500 | 800 | +- -+

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report