In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In this issue, the editor will bring you about how to use Streaming to achieve ultra-low latency in Structured. The article is rich in content and analyzed and described from a professional point of view. I hope you can get something after reading this article.
To run a supported query in continuous processing mode, you only need to specify a continuous trigger and take the required checkpoint interval as a parameter.
Object ContinuousProcessing {
Def main (args: Array [String]): Unit = {
Val sparkConf = new SparkConf () .setAppName (this.getClass.getName) .setmaster ("yarn-client")
.set ("yarn.resourcemanager.hostname", "mt-mdh.local")
.set ("spark.executor.instances", "2")
.set ("spark.default.parallelism", "4")
.set ("spark.sql.shuffle.partitions", "4")
.setJars (List ("/ Users/meitu/Desktop/sparkjar/bigdata.jar")
, "/ opt/jars/spark-streaming-kafka-0-102.11-2.3.1.jar"
, "/ opt/jars/kafka-clients-0.10.2.2.jar"
, "/ opt/jars/kafka_2.11-0.10.2.2.jar"
, "/ opt/jars/spark-sql-kafka-0-102.11-2.0.2.jar"))
Val spark = SparkSession
.builder
.appName ("StructuredKafkaWordCount")
.config (sparkConf)
.getOrCreate ()
Spark
.readStream
.format ("kafka")
.option ("kafka.bootstrap.servers", "mt-mdh.local:9093")
.option ("subscribe", "StructuredSource")
.load ()
.selectExpr ("CAST (key AS STRING)", "CAST (value AS STRING)")
.writeStream
.format ("kafka")
.option ("kafka.bootstrap.servers", "mt-mdh.local:9093")
.option ("topic", "StructuredSink")
.option ("checkpointLocation", "/ sql/checkpoint")
.trigger (Trigger.Continuous ("1 second")) / / only change in query
.start ()
.awaitTermination ()
}
}
A checkpoint interval of 1 second means that the continuous processing engine records the progress of the query every second. The generated checkpoint is in a format compatible with the microbatch engine, so you can restart any query using any trigger. For example, if the query supports microbatch and continuous processing, you can actually use continuous processing triggers to start microbatch triggers, and vice versa.
Please note that whenever you switch to continuous mode, you will get at least one fault tolerance guarantee.
Supported queries
Starting with Spark 2.3, the continuous processing mode supports only the following types of queries.
Operations: only map-like operations of dataset/dataframe are supported in continuous mode, that is, projection (select,map,flatMap,mapPartitions, etc.) and selection (where,filter, etc.) are supported.
All SQL functions are supported except aggregate functions (because aggregation is not yet supported), current_timestamp (), and current_date () (deterministic calculations that use time are challenging).
Sources
Kafka Source: all operations are supported.
Rate source: suitable for testing. The only options supported in continuous mode are numPartitions and rowsPerSecond.
Sinks
Kafka sink: all options are supported.
Console sink: suitable for debugging. All operations are supported. Note that the console will print each checkpoint interval you specify in the continuous trigger.
For more detailed information about sink and source, please refer to the official website of the input Source and output receiver section. Although the console receiver is well suited for testing, using Kafka as the source and receiver can best observe end-to-end low latency processing.
Matters needing attention
The continuous processing engine starts a number of long-running tasks that constantly read data from the source, process the data, and write to the receiver continuously. The number of tasks required for a query depends on the number of partitions that the query can read in parallel from the source. Therefore, before you start processing queries continuously, you must ensure that there are enough cores in the cluster to perform all tasks in parallel. For example, if you are reading a Kafka topic with 10 partitions, the cluster must have at least 10 cores for the query to execute properly.
Stopping continuous processing flows may result in false task termination warnings. These can be safely ignored.
There are currently no failed tasks to automatically retry. Any failure will cause the query to stop and require a manual restart from the checkpoint.
The above is how to use Streaming to achieve ultra-low latency in Structured shared by Xiaobian. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.