Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize configured ETL with Spark Streaming + Spark SQL

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article is to share with you about Spark Streaming + Spark SQL how to achieve configuration ETL, the editor feels very practical, so share with you to learn, I hope you can get something after reading this article, say no more, follow the editor to have a look.

Traditional Spark Streaming programs require:

Build StreamingContext

Set up checkpoint

Link data source

All kinds of transform

ForeachRDD output

Generally speaking, you may build a large program to go through the above process, such as hundreds of lines of code in a main method, although it is convenient enough to develop small functions, but the reuse is not enough, and it is not good for collaboration, so you need a higher-level development package to support it.

How to develop a Spark Streaming program

As long as I add the following job configuration to the configuration file, I can submit and run as a standard Spark Streaming program:

{"test": {"desc": "Test", "strategy": "streaming.core.strategy.SparkStreamingStrategy", "algorithm": [], "ref": [], "compositor": [{"name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor", "params": [{"metadata.broker.list": "xxx" "auto.offset.reset": "largest", "topics": "xxx"}]}, {"name": "streaming.core.compositor.spark.JSONTableCompositor", "params": [{"tableName": "test"}]}, {"name": "streaming.core.compositor.spark.SQLCompositor" "params": [{"sql": "select a from test"}]}, {"name": "streaming.core.compositor.RDDPrintOutputCompositor", "params": [{}]}], "configParams": {}

The above configuration is equivalent to completing the following process:

Consume data from Kafka

Convert Kafka data to tables

Processing through SQL

Printout

Is it very simple, and it can also support hot loading, dynamically adding job, etc.

Characteristics

The features of this implementation are:

Configuration

Support for multiple Job configurations

Support a variety of data source modules

Support data processing through SQL

Support multiple output modules

Future scalable support includes:

Dynamically add or remove job updates without restarting Spark Streaming

Support for other streaming engines such as Storm

Better multi-job interoperability

Configuration format description

The implementation is based entirely on ServiceframeworkDispatcher, and the core functions only take about three hours.

Here we first sort out a few concepts:

Spark Streaming is defined as an App

Each Action is defined as a Job. An App can contain multiple Job

The configuration file structure is designed as follows:

{"job1": {"desc": "Test", "strategy": "streaming.core.strategy.SparkStreamingStrategy", "algorithm": [], "ref": [], "compositor": [{"name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor", "params": [{"metadata.broker.list": "xxx" "auto.offset.reset": "largest", "topics": "xxx"}]},], "configParams": {}} Job2: {. }}

A complete App corresponds to a configuration file. Each top-level configuration option, such as job1,job2, corresponds to a workflow. They all end up running on an App (Spark Streaming instance).

Strategy is used to define how to organize the calling relationship between compositor,algorithm and ref.

Algorithm as a data source

Compositor data processing link module. In most cases, we are developing for this interface.

Ref is a reference to other job. By cooperating with the appropriate strategy, we organize multiple job into a new job

Each component (compositor,algorithm, strategy) supports parameter configuration

The above mainly parses the shape of the configuration file, and ServiceframeworkDispatcher has given a set of interface specifications, as long as it is implemented.

Module realization

How is the corresponding module implemented? The essence is to convert the above configuration files into Spark Streaming programs through the modules that have been implemented.

Take the specific implementation of SQLCompositor as an example:

Class SQLCompositor [T] extends Compositor [T] {private var _ configParams: util.List [util.Map [Any, Any]] = _ val logger = Logger.getLogger / / Policy engine ServiceFrameStrategy calls this method to pass configuration into override def initialize (typeFilters: util.List [String], configParams: util. List [util. Map [any). Any]]): Unit = {this._configParams = configParams} / / get the configured sql statement def sql = {_ configParams (0). Get ("sql"). ToString} def outputTable = {_ configParams (0). Get ("outputTable"). ToString} / / the main method of execution Roughly get the SQLContext from the previous module (the corresponding table has been registered), / / then set the query statement according to the configuration of the module, and finally get a new dataFrame.// middleResult where T is actually DStream, which we will pass to the next module. The Output module / / params parameter is convenient for all modules to share information. Here we pass the corresponding function to the next module override def result (alg: util.List [process [T]], ref: util.List [process [T]], middleResult: util.List [T], params: util.Map [Any, Any]): util.List [T] = {var dataFrame: DataFrame = null val func = params.get ("table"). AsInstanceOf [(RDD [string]) = > SQLContext] params.put ("sql") (rdd: RDD [string]) = > {val sqlContext = func (rdd) dataFrame = sqlContext.sql (sql) dataFrame}) middleResult}}

The above code completes a SQL module. What if we want to complete a custom .map function? But it can be similar to the following implementation:

Abstract class MapCompositor [Any U] extends Compositor [T] {private var _ configParams: util.List [util.Map [Any, Any]] = _ val logger = Logger.getLogger (typeFilters: util.List [String], configParams: util.List [util.Map [Any, Any]]): Unit = {this._configParams = configParams} override def result (alg: util.List [process [T], ref: util.List [String] MiddleResult: util.List [T], params: util.Map [Any, Any]): util.List [T] = {val dstream = middleResult (0). AsInstanceOf [DStream [string]] val newDstream = dstream.map (f = > parseLog (f)) List (newDstream.asInstanceof [T])} def parseLog (line:String): U} class YourCompositor [Tjue U] extends MapCompositor [override def parseLog (line:String): U = {.... your logical}}

Similarly, you can implement other functions such as filter,repartition.

This approach provides a higher level of API abstraction where users only need to focus on the specific implementation without paying attention to the use of Spark. At the same time, it also provides a set of configuration system to facilitate the construction of data processing flow, and reuse the original module to support the use of SQL for data processing.

This is how Spark Streaming + Spark SQL implements the configuration of ETL. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report