In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article is to share with you about Spark Streaming + Spark SQL how to achieve configuration ETL, the editor feels very practical, so share with you to learn, I hope you can get something after reading this article, say no more, follow the editor to have a look.
Traditional Spark Streaming programs require:
Build StreamingContext
Set up checkpoint
Link data source
All kinds of transform
ForeachRDD output
Generally speaking, you may build a large program to go through the above process, such as hundreds of lines of code in a main method, although it is convenient enough to develop small functions, but the reuse is not enough, and it is not good for collaboration, so you need a higher-level development package to support it.
How to develop a Spark Streaming program
As long as I add the following job configuration to the configuration file, I can submit and run as a standard Spark Streaming program:
{"test": {"desc": "Test", "strategy": "streaming.core.strategy.SparkStreamingStrategy", "algorithm": [], "ref": [], "compositor": [{"name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor", "params": [{"metadata.broker.list": "xxx" "auto.offset.reset": "largest", "topics": "xxx"}]}, {"name": "streaming.core.compositor.spark.JSONTableCompositor", "params": [{"tableName": "test"}]}, {"name": "streaming.core.compositor.spark.SQLCompositor" "params": [{"sql": "select a from test"}]}, {"name": "streaming.core.compositor.RDDPrintOutputCompositor", "params": [{}]}], "configParams": {}
The above configuration is equivalent to completing the following process:
Consume data from Kafka
Convert Kafka data to tables
Processing through SQL
Printout
Is it very simple, and it can also support hot loading, dynamically adding job, etc.
Characteristics
The features of this implementation are:
Configuration
Support for multiple Job configurations
Support a variety of data source modules
Support data processing through SQL
Support multiple output modules
Future scalable support includes:
Dynamically add or remove job updates without restarting Spark Streaming
Support for other streaming engines such as Storm
Better multi-job interoperability
Configuration format description
The implementation is based entirely on ServiceframeworkDispatcher, and the core functions only take about three hours.
Here we first sort out a few concepts:
Spark Streaming is defined as an App
Each Action is defined as a Job. An App can contain multiple Job
The configuration file structure is designed as follows:
{"job1": {"desc": "Test", "strategy": "streaming.core.strategy.SparkStreamingStrategy", "algorithm": [], "ref": [], "compositor": [{"name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor", "params": [{"metadata.broker.list": "xxx" "auto.offset.reset": "largest", "topics": "xxx"}]},], "configParams": {}} Job2: {. }}
A complete App corresponds to a configuration file. Each top-level configuration option, such as job1,job2, corresponds to a workflow. They all end up running on an App (Spark Streaming instance).
Strategy is used to define how to organize the calling relationship between compositor,algorithm and ref.
Algorithm as a data source
Compositor data processing link module. In most cases, we are developing for this interface.
Ref is a reference to other job. By cooperating with the appropriate strategy, we organize multiple job into a new job
Each component (compositor,algorithm, strategy) supports parameter configuration
The above mainly parses the shape of the configuration file, and ServiceframeworkDispatcher has given a set of interface specifications, as long as it is implemented.
Module realization
How is the corresponding module implemented? The essence is to convert the above configuration files into Spark Streaming programs through the modules that have been implemented.
Take the specific implementation of SQLCompositor as an example:
Class SQLCompositor [T] extends Compositor [T] {private var _ configParams: util.List [util.Map [Any, Any]] = _ val logger = Logger.getLogger / / Policy engine ServiceFrameStrategy calls this method to pass configuration into override def initialize (typeFilters: util.List [String], configParams: util. List [util. Map [any). Any]]): Unit = {this._configParams = configParams} / / get the configured sql statement def sql = {_ configParams (0). Get ("sql"). ToString} def outputTable = {_ configParams (0). Get ("outputTable"). ToString} / / the main method of execution Roughly get the SQLContext from the previous module (the corresponding table has been registered), / / then set the query statement according to the configuration of the module, and finally get a new dataFrame.// middleResult where T is actually DStream, which we will pass to the next module. The Output module / / params parameter is convenient for all modules to share information. Here we pass the corresponding function to the next module override def result (alg: util.List [process [T]], ref: util.List [process [T]], middleResult: util.List [T], params: util.Map [Any, Any]): util.List [T] = {var dataFrame: DataFrame = null val func = params.get ("table"). AsInstanceOf [(RDD [string]) = > SQLContext] params.put ("sql") (rdd: RDD [string]) = > {val sqlContext = func (rdd) dataFrame = sqlContext.sql (sql) dataFrame}) middleResult}}
The above code completes a SQL module. What if we want to complete a custom .map function? But it can be similar to the following implementation:
Abstract class MapCompositor [Any U] extends Compositor [T] {private var _ configParams: util.List [util.Map [Any, Any]] = _ val logger = Logger.getLogger (typeFilters: util.List [String], configParams: util.List [util.Map [Any, Any]]): Unit = {this._configParams = configParams} override def result (alg: util.List [process [T], ref: util.List [String] MiddleResult: util.List [T], params: util.Map [Any, Any]): util.List [T] = {val dstream = middleResult (0). AsInstanceOf [DStream [string]] val newDstream = dstream.map (f = > parseLog (f)) List (newDstream.asInstanceof [T])} def parseLog (line:String): U} class YourCompositor [Tjue U] extends MapCompositor [override def parseLog (line:String): U = {.... your logical}}
Similarly, you can implement other functions such as filter,repartition.
This approach provides a higher level of API abstraction where users only need to focus on the specific implementation without paying attention to the use of Spark. At the same time, it also provides a set of configuration system to facilitate the construction of data processing flow, and reuse the original module to support the use of SQL for data processing.
This is how Spark Streaming + Spark SQL implements the configuration of ETL. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.