Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

V. principle and use of spark--spark streaming

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Spark-streaming Overview 1.1 commonly used real-time computing engines

Real-time computing engine is also called streaming computing engine. At present, there are three commonly used engines:

1. Apache Storm: real streaming computing

2. Spark Streaming: strictly speaking, it is not real streaming computing (real-time computing)

​ treats continuous streaming data as discontiguous RDD

​ essence: is a discrete computation (discontiguous data)

When asked in the ​ interview: talk about its nature first.

​ then said his understanding.

Common methods of ​

Comparison between ​ and other similar technologies

3. Apache Flink: real streaming computing. Contrary to Spark Streaming.

The essence of ​: a streaming computing, although it can be used for offline computing, it essentially simulates discrete data into streaming data to do streaming computing for flink.

1.2 what is spark-streaming

​ Spark Streaming is an extension of the core Spark API to achieve scalable, high-throughput, fault-tolerant real-time data stream processing. Data can be obtained from many sources, such as Kafka,Flume,Kinesis or TCP sockets, and stream data can be processed using complex algorithms developed by advanced functions such as map,reduce,join and window. Finally, the processed data can be pushed to the file system, database and real-time dashboard. Moreover, you can apply the machine learning and graph processing algorithms provided by Spark to the data stream.

Features:

1. Easy to use: integrated in Spark

2. Fault tolerance: the underlying RDD,RDD itself has a fault tolerance mechanism.

3. Support multiple programming languages: Java Scala Python

1.3 spark-streaming architecture

Spark-streaming is used to receive real-time data, and then the processor obtains the data in batches similar to timing sampling. Each batch of data is a RDD, and the final input to the processor is an RDD queue stream, which is actually discretizedstream or DStream. Internally, DStream is represented by a RDD sequence. The DStream object can be used to call various operators for processing.

​ figure 1.1 DStream principle

1.4 case demonstration-NetworkWordCount

First start the netcat server and listen on port 1234

Nc-l 1234 install yum-y install netcat without this command

Then start the spark-streaming sample program, get the data from the local port 1234, and perform wordcount operation

Go to the installation directory of spark and execute the command in the bin directory: bin/run-example streaming.NetworkWordCount localhost 1234

Then enter various strings on the netcat side:

[root@bigdata121 hive-1.2.1-bin] # nc-l 1234king king hello

View the statistics in another window:

-Time: 1567005584000 ms--- (hello,1) (king,2)

This side will be counted right away.

1.5 write your own NetworkWordCount

First of all, pom.xml in maven remember to add the dependency of streaming (for convenience, it is best to add the dependency of all components of spark)

Pom.xml

4.0.0 king sparkTest 1.0-SNAPSHOT UTF-8 2.1.0 2.11.8 2.7.3 org.apache.spark spark-core_2.11 2.1.0 org.apache.spark spark-sql_2.11 2.1.0 org.apache.spark spark-hive_2.11 2.1.0 provided org.apache.spark spark-streaming_2.11 2.1.0 org.apache.spark Spark-mllib_2.11 2.1.0 runtime org.apache.hadoop hadoop-client ${hadoop.version} org.apache.spark spark-streaming-kafka_2.11 1.6.3 mysql Mysql-connector-java 8.0.12 junit junit 4.12 org.apache.hive hive-jdbc 1.2.1 log4j log4j 1.2. 17 org.apache.flume flume-ng-core 1.8.0 org.apache.flume flume-ng-sdk 1.8.0 org.apache.flume flume-ng-configuration 1.8.0 org.scala-tools maven-scala-plugin 2.15.2 compile TestCompile maven-compiler-plugin 3.6.0 1.8 1.8 Org.apache.maven.plugins maven-surefire-plugin 2.19 true

Code:

Import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming. {Seconds, StreamingContext} / * * wordcount streaming program * * 1, create streamingContext objects * create DStream streams (discrete streams) * essentially discrete computing * * discrete: turn continuous data into discrete data And real-time immediate processing * offline: not real-time processing * * 2, DStream expression is RDD * the same as operating RDD * * 3, use DStream to cut a continuous database into discrete RDD * / object NetworkWordCount {def main (args: Array [String]): Unit = {/ / set the log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) / / Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) / * * this is the standard way to create StreamingContext objects. You cannot create * / create streamingContext objects through sparkSession objects. Specify master as local [2], which means to use at least two cores, namely, two threads, one for sending data. A processing data val conf = new SparkConf (). SetAppName ("streaming wordcount"). SetMaster ("local [2]") / / the conf object is specified here, and the batch processing interval is 3 seconds, cutting a rdd every 3 seconds, and then processing. Val streamingContext = new StreamingContext (conf, Seconds (3)) / / create a receiving data source. Here, create a socketstream, receive the data, and automatically cut the interior into rdd. / / specify the listening host port val streamText = streamingContext.socketTextStream ("bigdata121", 1234, StorageLevel.MEMORY_ONLY) / / wordcount process val rdd1 = streamText.flatMap (_ .split (")) .map ((_, 1)) .reduceByKey (_ + _) / / print structure rdd1.print () / / start streamingContext, start calculating streamingContext.start () / / wait for the task to end streamingContext.awaitTermination ()}}

Start the netcat service on the bigdata121 virtual machine:

Nc-l 1234

Run the above program in idea and enter characters in netcat with the same structure as the example

Second, the basic principles of streaming and the concept of using 2.1StreamingContext objects 1. StreamingContext will inherently create an instance of SparkContext (the starting point for all Spark functions), which you can access through ssc.sparkContext. 2. Once a StreamingContext is up and running, you cannot set up or add new stream calculations. Once a context is stopped, it cannot be restarted. 4. Only one StreamingContext in a JVM can be active at a time. 5. The stop () method on StreamingContext also stops SparkContext. To stop StreamingContext only (keep SparkContext active), set the optional parameter stopSparkContext of the stop () method to false. 6. As long as the previous StreamingContext is stopped before the next StreamingContext is created (without stopping the SparkContext), the SparkContext can be reused to create multiple StreamingContext.

2.2 discrete flow (DStreams): Discretized Streams

The ​ DStream object can be said to be a data outlet of the entire spark-streaming program, where all the data processed comes from. As mentioned earlier, there are actually individual RDD in this object, which is the essence of DStream. And after the transformation of the operator, DStream is still a DStream object, which is still RDD. So the process of operator transformation is similar to the probability of ordinary RDD. Generally speaking, the conversion between DStream in streaming program is essentially the conversion of RDD in DStream.

2.3operators of DStream

List of operators:

​ figure 2.1 DStream operator

Very similar to ordinary rdd, there are two special operators, transform and updateStateByKey

2.3.1 transformtransform (RDD [T] = > RDD [U]) is an operator used to convert rdd in dstream into a new rdd. So note that the handler in this operator accepts rdd as a parameter, unlike other operators that accept data from rdd as a parameter. Example: val conf = new SparkConf (). SetAppName ("streaming wordcount"). SetMaster ("local [2]") / / the conf object is specified here, and the batch interval is 4 seconds, cutting a rdd every 4 seconds, and then processing. Val streamingContext = new StreamingContext (conf, Seconds (3)) / / create socketstream, receive data, and automatically cut it into one of the function parameters received by rdd val streamText = streamingContext.socketTextStream ("bigdata121", 1234, StorageLevel.MEMORY_ONLY) / / is rdd, and then process the rdd in it, and finally return the new rdd streamText.transform (rdd= > {rdd.flatMap (_ .split ("))}) 2.3.2 updateStateByKey

​ by default, Spark Streaming does not record the previous state, and each time a piece of data is sent, it starts at 0. For example, in word counting, the number of words previously counted will not be added to the next count, which starts at 0. If you want to do an accumulation operation, use this operator to implement this function

UpdateStateByKey ((Seq [T], option [S]) = > Option [S]) the function received by this operator requires two parameters: Seq [T]: after the key is currently grouped, a collection of value of the same key, such as [1m 2m 1J 1] in ("age", [1J 2m 1J 1]) [optionS]: the same key, prior to the sum of value. That is, the return value of the count state before the key is an example of the previous count + the current count: here is an example of changing the previous wordcount to achieve a continuous count of words Do not re-count package SparkStreamExerimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming. {Seconds, StreamingContext} / * Test the cumulative status of updateStateByKey every time * / object TestUpdateState {def main (args: Array [String]): Unit = {/ / set the log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) / / to create the streamingContext object, and specify master as local [2], which means to use at least two cores, one for sending data and one for processing data val conf = new SparkConf (). SetAppName ("streaming wordcount"). SetMaster ("local [2]") / / the conf object is specified here, and the batch processing interval is 4 seconds. Cut a rdd every 4 seconds and then deal with it. Val streamingContext = new StreamingContext (conf, Seconds (3)) / / set the checkpoint and save the previous state. You need to ensure that the directory does not exist streamingContext.checkpoint ("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming") / / create socketstream, receive data, and automatically cut the internal rdd val streamText = streamingContext.socketTextStream ("bigdata121", 1234, StorageLevel.MEMORY_ONLY) / / cutting data. And add a count to val wordPair = streamText.flatMap (_ .split (")). Map ((_, 1)) / / accumulate the function val addFunc = (currentValues:Seq [Int], previousValue:Option [Int]) = > {/ / the current value accumulates the previous value val currentSum = currentValues.sum / /. If the value does not exist, return 0 val pre = previousValue.getOrElse (0) / / the previous and current values add Option (pre + currentSum)} / / update, update the old count to the new count state wordPair.updateStateByKey (addFunc). Print () / start streamingContext, start calculating streamingContext.start () / / wait for the task to end streamingContext.awaitTermination ()}}

An error occurred in the process of running this demo:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.Charsets

It is said that there is no org.apache.commons.io.Charsets class, go into the org.apache.commons.io to have a look, sure enough, it is estimated that the package version is too old, there is no this class, Baidu has this class, version 2.5 has this class, so add a new dependency to pom.xml

Commons-io commons-io 2.5

Keep running, OK.

2.3.3 the foreachRDD operator is similar to forech, but operates on the entire rdd, not some data in rdd. ForeachRDD (RDD [T] = > Unit) is generally used to write the results of rdd to other storage, such as hdfs,mysql and so on. Here is an example of foreachRDD and sql. 2.4 window operation

Application scenarios:

It is generally used to count the data of the last N hours. In such an application scenario, a window is needed.

2.4.1 principle

Schematic:

​ figure 2.2 spark-streaming window operation

The ​ window is actually based on DStream, plus a time range. As shown in the figure, each time the window slides over the originalDStream, the source RDD that falls within the window is combined and the operation is performed to produce the RDD of the windowed DStream. In the above example, the operation is applied to the data of the last 3 time units and slides by 2 time units. Therefore, compared with ordinary DStream operations, ordinary DStream operations are processed by RDD, while windows are processed together by RDD within a time range. And the window is an encapsulation on the next layer of DStream.

When ​ uses windows, there are two key parameters:

Window length (windowlength): the length of the window (in the example in the figure above: 3)

Slide interval (slidinginterval): the interval between two adjacent window operations (that is, the length of time for each slide) (2 in the example above)

It is also important to note that these two parameters must be multiples of the sampling interval of the source DStream (1 in the example above). Because if it is not an integer multiple, it will cause the edge of the window to divide a rdd into two parts, which is not possible. Spark cannot handle half of the rdd,rdd is inseparable.

2.4.2 the related operator window (windowLength, slideInterval)-> calculates a new DStreamcountByWindow (windowLength, slideInterval)-> the number of sliding windows of elements in the return stream reduceByWindow (func, windowLength, slideInterval)-> based on the windowed batch data generated by the source DStream-> returns a single element stream. Use the function func to aggregate the elements of the stream at sliding intervals to create this single-element flow. The function must be associated so that the calculation can be calculated correctly in parallel. ReduceByKeyAndWindow (func, windowLength, slideInterval, [numTasks])-> is applied to a DStream consisting of a pair of (KMagi V) and returns a new DStream consisting of a pair of (KMagne V). The value of each key is aggregated by a given reduce function. Note: by default, this operator takes advantage of Spark's default number of concurrent tasks to group. You can use the numTasks parameter to set different tasks reduceByKeyAndWindow (func, invFunc, windowLength, slideInterval, [numTasks])-> a more efficient version of reduceByKeyAndWindow () above, where the reduce value of each window is incremented using the reduce calculation results of the previous window. This is done by reduce the new data entering the sliding window and "inverse reducing" the old data that leaves the window. One example is to "add and subtract" the value of the key when the window slides. However, it is only applicable to the "reversible subtraction function (invertible reduce functions)", that is, the subtraction function with the corresponding "anti-subtraction" function (as a parameter invFunc). Like reduceByKeyAndWindow, the number of reduce tasks can be configured with optional parameters. Note that checkpoints must be enabled to use this operation. CountByValueAndWindow (windowLength, slideInterval, [numTasks])-> is applied to a DStream consisting of a pair of (KMagol V) and returns a new DStream consisting of a pair of (KMagne V). The value of each key is how often they appear in the sliding window.

What is more commonly used is reduceByKeyAndWindow, which is often used to count and fix the data in the most recent period of time, such as order sales in the last hour. Let's apply this operator to the wordcount example.

2.4.3 example

The window size is 30s, slides the window every 10s, and the count of words is cumulative

Package SparkStreamExerimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming. {Seconds, StreamingContext} / * accumulation of test updateStateByKey progress status * / object TestUpdateState {def main (args: Array [String]): Unit = {/ / set the log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) / / to create the streamingContext object, and specify master as local [2], which means to use at least two cores, one for sending data and one for processing data val conf = new SparkConf (). SetAppName ("streaming wordcount"). SetMaster ("local [2]") / / the conf object is specified here, and the batch processing interval is 4 seconds. Cut a rdd every 4 seconds and then deal with it. Val streamingContext = new StreamingContext (conf, Seconds (1)) / / set checkpoint, save the previous state, need to ensure that the directory does not exist streamingContext.checkpoint ("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3") / / create socketstream, receive data, the internal will be automatically cut into rdd val streamText = streamingContext.socketTextStream ("bigdata121", 1234, StorageLevel.MEMORY_ONLY) / / cutting data And add a count to val wordPair = streamText.flatMap (_ .split (")) .map ((_, 1)) / / add a window operation val windowValue = wordPair.reduceByKeyAndWindow ((x _ wordPair.reduceByKeyAndWindow)) = > x _ ray, Seconds (30), Seconds (10) / / accumulate processing function val addFunc = (currentValues:Seq [Int]) PreviousValue:Option [Int]) = > {/ / the current value accumulates val currentSum = currentValues.sum / / takes out the previous value. If the value does not exist, return 0 val pre = previousValue.getOrElse (0) / / the previous and present values plus Option (pre + currentSum)} / / update Update the old count to the new count status / / wordPair.updateStateByKey (addFunc). Print () windowValue.updateStateByKey (addFunc). Print () / start streamingContext, start calculating streamingContext.start () / / wait for the task to end streamingContext.awaitTermination ()} 2.5 sql operation package SparkStreamExerimport org.apache.log4j. {Level Logger} import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming. {Seconds, StreamingContext} / * convert the DStream of streaming to sql operation * / object StreamingAndSql {def main (args: Array [String]): Unit = {/ / set the log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) val conf = new SparkConf (). SetMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2)) val lines = ssc.socketTextStream ("bigdata121", 1234, StorageLevel.MEMORY_ONLY) val words = lines.flatMap (_ .split (")) / / rdd needs to be converted to df object Can be used for spark sql operation words.foreachRDD (rdd = > {/ / get the conf configuration from rdd to ensure that the configuration is the same as the configuration of rdd val spark = SparkSession.builder () .config (rdd.sparkContext.getConf) .getOrCreate () import spark.sqlContext.implicits._ / / rdd to df And specify the column name val df = rdd.toDF ("word") / / create the view and execute sql df.createOrReplaceTempView ("tmp1") val resultDF = spark.sql ("select word,count (1) from tmp1 group by word") resultDF.show ()}) ssc.start () ssc.awaitTermination ()}}

2.6 checkpoint checkpoint

This is similar to that in rdd, except that checkpoint is done through the StreamingContext object in streaming:

/ / create a streamingContext object and specify master as local [2], which means to use at least two cores, one for sending data and one for processing data val conf = new SparkConf (). SetAppName ("streaming wordcount"). SetMaster ("local [2]") / / the conf object is specified here, and the batch interval is 4 seconds, cutting a rdd every 4 seconds, and then processing. Val streamingContext = new StreamingContext (conf, Seconds (1)) / / set a checkpoint and save the previous state. You need to ensure that there is no streamingContext.checkpoint ("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3") in the directory.

3. Data sources of streaming 3.1 basic data sources

File stream: textFileStream

Socket stream: socketTextStream/sockeStream, I have already given an example, so I will not repeat it here.

RDD queue flow: queueStream

1, textFileStream by monitoring changes in the file system, if there are new files added, then read it and as a data stream need to pay attention to: these files have the same format these files by atomic move or rename files created in dataDirectory if the content is appended to the file, these additional new data will not be read. Example: package SparkStreamExerimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} object StreamingFromFile {def main (args: Array [String]): Unit = {/ / set the log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) val conf = new SparkConf () .setAppName ("spark window operation") .setMaster ("local [2]") val ssc = new StreamingContext (conf) Seconds (4) val fileStream = ssc.textFileStream ("G:\\ test\\ teststreaming") fileStream.print () ssc.start () ssc.awaitTermination ()}} / / = 2. QueueStreamRDD queue flow reads RDD from a queue example: package SparkStreamExerimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming. {Seconds StreamingContext} import scala.collection.mutableobject StreamingFromRDDQueue {def main (args: Array [String]): Unit = {/ / set log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) val conf = new SparkConf () .setAppName ("spark streaming rdd queue") .setMaster ("local [2]") val ssc = new StreamingContext (conf) Seconds (4)) / / create queue val rddQueue = new mutable. Queue [RDD [Int]] () / / add rdd for (x {(event.event.getHeaders.toString) to the queue New String (event.event.getBody.array ())}) eventDStream.print () ssc.start () ssc.awaitTermination ()}}

(3) start:

Start the spark program first and run it directly in ide. Then start flume:flume-ng agent-- conf conf--name A1-- conf-file conf/flume-spark.properties-Dflume.root.logger=INFO,console and modify or add files in the monitoring directory yourself. Then look at the output data in ide

2. Spark pulls data from flume

This approach is more flexible and scalable than the first one.

(1) flume configuration file

A1.sources=r1a1.channels=c1a1.sinks=k1a1.sources.r1.type=TAILDIRa1.sources.r1.filegroups=f1a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*a1.sources.r1.fileHeader=truea1.channels.c1.type=memorya1.channels.c1.capacity=10000a1.channels.c1.transactionCapacity=100# uses a sinka1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSinka1.sinks.k1.hostname=192.168.50.121a1.sinks implemented by spark itself here. .k1.port = 1234a1.sources.r1.channels=c1a1.sinks.k1.channel=c1

In addition, you need to add the jar package spark-streaming-flume-sink_2.11-2.1.0.jar to flume's lib directory, which is the jar package where the SparkSink used above is located. You can add this dependency to idea yourself, download it, and copy it to the local repository directory under flume's lib.

(2) Code

Pom.xml

Similar to the above, except that org.apache.sparkspark-streaming-flume-sink_2.112.1.0 is more dependent.

Code:

Package SparkStreamExerimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming. {Seconds, StreamingContext} object FromFlumePull {def main (args: Array [String]): Unit = {/ / set the log level to ERROR The default is INFO Logger.getLogger ("org.apache.spark") .setLevel (Level.ERROR) val conf = new SparkConf (). SetAppName ("flume through spark sink") .setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (4)) / / create poll streaming Pull data from flume to local processing val flumePollingStream = FlumeUtils.createPollingStream (ssc, "bigdata121", 1234, StorageLevel.MEMORY_ONLY) / * Note here: * event.event.getBody.array () do not toString directly The parsed string is only in the form of [class name] @ [hashCode] * New string (event.event.getBody.array ()) should be used to decode the bytes string according to the default codec rules * because the bytes data is transmitted * / flumePollingStream.map (event= > {(event.event.getHeaders.toString) New String (event.event.getBody.array ())}) .print () ssc.start () ssc.awaitTermination ()}}

(3) start

The startup mode is similar to the above, and it is not repeated here.

(4) problems encountered

Question 1:

The spark-streaming-flume-sink_2.11.jar package has been placed in flume's lib directory, flume's agent startup times error:

29 Aug 2019 17 Spark Sink Processor Thread 59 org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80 WARN [Spark Sink Processor Thread-10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)-Error while processing transaction.java.lang.IllegalStateException: begin () called when transaction is OPEN! At com.google.common.base.Preconditions.checkState (Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin (BasicTransactionSemantics.java:131) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply (TransactionProcessor.scala:114) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply (TransactionProcessor.scala:113) at scala.Option.foreach (Option.scala:236) at org. Apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents (TransactionProcessor.scala:113) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call (TransactionProcessor.scala:243) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call (TransactionProcessor.scala:43) at java.util.concurrent.FutureTask.run (FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) At java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:748) focuses on: java.lang.IllegalStateException: begin () called when transaction is OPEN! It may be a problem with some jar packages of flume, but the details are not clear.

Repeatedly reported this error, and finally looked at the scala package under flume's lib.

Scala-library-2.10.5.jar is this version, and the sparksink package I put in is based on scala 2.11.8, so I wonder if the version of the scala library package is wrong, so copy the scala-library-2.11.8.jar package from the jar directory of spark to flume, rename the original package, and don't let flume use the old one. Then restart flume agent and run normally. So this problem occurs because the dependent package version does not correspond to the problem.

Question 2:

When reading body, there is a difference between direct toString and new String. The former is garbled and the latter restores the original string.

The difference between the use of toString () and new String () is that str.toString calls the toString method of the class of b, the object object. It usually returns a String: [class name] @ [hashCode]. New String (str) is based on the fact that parameter is a byte array and uses the default encoding format of the Java virtual machine to decode the byte array to the corresponding character. If the default encoding format of the virtual machine is ISO-8859-1, the characters corresponding to bytes can be obtained according to the ascii coding table. When and in what way? New String () generally uses character transcoding, and byte [] array when toString () prints objects.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report