Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does Spark-Streaming process data into mysql

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article focuses on "how Spark-Streaming processes data into mysql". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how Spark-Streaming processes data into mysql".

1. Description

The data table is as follows:

Create database test;use test;DROP TABLE IF EXISTS car_gps;CREATE TABLE IF NOT EXISTS car_gps (deployNum VARCHAR (30) COMMENT 'scheduling number', plateNum VARCHAR (10) COMMENT 'license plate number', timeStr VARCHAR (20) COMMENT 'timestamp', lng VARCHAR (20) COMMENT 'longitude', lat VARCHAR (20) COMMENT 'latitude', dbtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'data entry time', PRIMARY KEY (deployNum, plateNum, timeStr)) 2. Write a program

The driver of mysql is introduced first.

Mysql mysql-connector-java 5.1.44 2.1 jdbc writes mysqlpackage com.hoult.Streaming.workimport java.sql. {Connection, DriverManager, PreparedStatement} import java.util.Propertiesimport com.hoult.structed.bean.BusInfoimport org.apache.spark.sql.ForeachWriterclass JdbcHelper extends ForeachWriter [BusInfo] {var conn: Connection = _ var statement: PreparedStatement = _ override def open (partitionId: Long) EpochId: Long): Boolean = {if (conn = = null) {conn = JdbcHelper.openConnection} true} override def process (value: BusInfo): Unit = {/ / write data to mysql table val arr: Array [String] = value.lglat.split ("_") val sql = "insert into car_gps (deployNum,plateNum,timeStr,lng,lat) values (?,?)" Statement = conn.prepareStatement (sql) statement.setString (1, value.deployNum) statement.setString (2, value.plateNum) statement.setString (3, value.timeStr) statement.setString (4, arr (0)) statement.setString (5 Arr (1)) statement.executeUpdate ()} override def close (errorOrNull: Throwable): Unit = {if (null! = conn) conn.close () if (null! = statement) statement.close ()} object JdbcHelper {var conn: Connection = _ val url = "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf8" val username = "root" val password = "123456" def openConnection: Connection = {if (null = conn | | conn. IsClosed) {val p = new Properties Class.forName ("com.mysql.jdbc.Driver") conn = DriverManager.getConnection (url) Username, password)} conn} 2.2 write mysqlpackage com.hoult.Streaming.workimport com.hoult.structed.bean.BusInfoimport org.apache.spark.sql through foreach. {Column, DataFrame, Dataset, SparkSession} object KafkaToJdbc {def main (args: Array [String]): Unit = {System.setProperty ("HADOOP_USER_NAME") "root") / / 1 get sparksession val spark: SparkSession = SparkSession.builder () .master ("local [*]") .appName (KafkaToJdbc.getClass.getName) .getOrCreate () spark.sparkContext.setLogLevel ("WARN") import spark.implicits._ / / 2 define reading kafka data source val kafkaDf: DataFrame = spark.readStream .format ("kafka") .option ("kafka.bootstrap.servers") "linux121:9092") .option ("subscribe", "test_bus_info") .load () / 3 process data val kafkaValDf: DataFrame = kafkaDf.selectExpr ("CAST (value AS STRING)") / / convert to ds val kafkaDs: Dataset [String] = kafkaValDf.as [String] / / parse the latitude and longitude data Write redis / / encapsulated as a case class to facilitate subsequent acquisition of specified field data val busInfoDs: Dataset [BusInfo] = kafkaDs.map (BusInfo (_)). Filter (_! = null) / / write the data to the MySQL table busInfoDs.writeStream .foreach (new JdbcHelper) .outputMode ("append") .start () .awaitTermination ()} 2.4.Create topic and write from the consumer side Enter data kafka-topics.sh-- zookeeper linux121:2181/myKafka-- create-- topic test_bus_info-- partitions 2-- replication-factor 1kafka-console-producer.sh-- broker-list linux121:9092-- topic test_bus_info here I believe you have a deeper understanding of "how Spark-Streaming handles data into mysql", so you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report