In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
The knowledge of this article "how to achieve spark homework" is not quite understood by most people, so the editor summarizes the following content, detailed content, clear steps, and has a certain reference value. I hope you can get something after reading this article. Let's take a look at this "how to achieve spark homework" article.
Send the sample.log data to Kafka, and after Spark Streaming processing, change the data format into the following form: commandid | houseid | gathertime | srcip | destip | srcport | destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid in another queue sent to kafka requires: 1, sample.log = > read the file, send the data to kafka queue 2, obtain data from kafka queue (API 0.10 does not manage offset) Change the data format 3, the processed data is sent to another queue in kafka Analysis 1 use the redis tool class in the course to manage offset2 read log data to send data to topic13 consumption topic, modify the data segmentation method to vertical bar segmentation, and send it to topic2 again
1.OffsetsWithRedisUtils
Package home.oneimport java.utilimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.streaming.kafka010.OffsetRangeimport redis.clients.jedis. {Jedis, JedisPool, JedisPoolConfig} import scala.collection.mutableobject OffsetsWithRedisUtils {/ / define Redis parameter privateval redisHost = "linux123" privateval redisPort = 6379 / / get connections to Redis privateval config = new JedisPoolConfig / / maximum number of free config.setMaxIdle (5) / / maximum number of connections config.setMaxTotal (10) privateval pool = new JedisPool (config, redisHost, redisPort) 10000) private def getRedisConnection: Jedis = pool.getResource privateval topicPrefix = "kafka:topic" / / Key:kafka:topic:TopicName:groupid private def getKey (topic: String, groupid: String) = s "$topicPrefix:$topic:$groupid" / / obtain offsets def getOffsetsFromRedis according to key (topics: Array [String], groupId: String): Map [TopicPartition, Long] = {val jedis: Jedis = getRedisConnection val offsets: Array [mutable.Map [TopicPartition, Long]] = topics.map {topic = > val key = getKey (topic GroupId) import scala.collection.JavaConverters._ / / converts the acquired redis data from map of Java to map of scala The data format is {key: [{partition,offset}]} jedis.hgetAll (key) .asScala .map {case (partition,offset) = > new TopicPartition (topic, partition.toInt)-> offset.toLong}} / / return resource jedis.close () offsets.flatten.toMap} / / Save offsets to Redis def saveOffsetsToRedis (offsets: Array [OffsetRange] GroupId: String): Unit = {/ / get connection val jedis: Jedis = getRedisConnection / / Organization data offsets.map {range = > (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy (_. _ 1) .foreach {case (topic, buffer) = > val key: String = getKey (topic) GroupId) import scala.collection.JavaConverters._ / / also converts map of scala to map of Java and stores val maps in redis: util.Map [String, String] = buffer.map (_. _ 2). ToMap.asJava / / Save data jedis.hmset (key, maps)} jedis.close ()}}
KafkaProducer
Package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.producer. {KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializerimport org.apache.log4j. {Level, Logger} import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf SparkContext} object KafkaProducer {def main (args: Array [String]): Unit = {Logger.getLogger ("org") .setLevel (Level.ERROR) val conf = new SparkConf () .setAppName (this.getClass.getCanonicalName.init) .setMaster ("local [*]") val sc = new SparkContext (conf) / / read sample.log file data val lines: RDD [String] = sc.textFile ("data/sample.log") / / definition Kafka producer parameter val prop = new Properties () / / access address of kafka prop.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG "linux121:9092") / / serialization method of key and value prop.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf [StringSerializer]) prop.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf [StringSerializer]) / / send the read data to mytopic1 lines.foreachPartition {iter = > / / initialize KafkaProducer val producer = new KafkaProducer [String String] (prop) iter.foreach {line = > / / encapsulated data val record = new ProducerRecord [String, String] ("mytopic1", line) / / send data producer.send (record)} producer.close ()}
3.HomeOne
Package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.consumer. {ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer. {KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization. {StringDeserializer, StringSerializer} import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming. {Seconds StreamingContext} object HomeOne {val log = Logger.getLogger (this.getClass) def main (args: Array [String]): Unit = {Logger.getLogger ("org") .setLevel (Level.ERROR) val conf = new SparkConf (). SetAppName (this.getClass.getCanonicalName) .setMaster ("local [*]") val ssc = new StreamingContext (conf) Seconds (5)) / / topic val topics to be consumed: Array [String] = Array ("mytopic1") val groupid = "mygroup1" / / define kafka related parameters val kafkaParams: Map [String, Object] = getKafkaConsumerParameters (groupid) / / obtain offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis (topics, groupid) / / create DStream val dstream: InputDStream [ConsumerRecord [String, String]] = KafkaUtils.createDirectStream (ssc, LocationStrategies.PreferConsistent) / / read data from kafka ConsumerStrategies.Subscribe [String, String] (topics, kafkaParams) FromOffsets)) / / the converted data is sent to another topic dstream.foreachRDD {rdd = > if (! rdd.isEmpty) {/ / get consumption offset val offsetRanges: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges / / process data to topic2 rdd.foreachPartition (process) / / Save offset to Redis OffsetsWithRedisUtils.saveOffsetsToRedis (offsetRanges) Groupid)}} / / start job ssc.start () / / continuously execute ssc.awaitTermination ()} / / send processed data to topic2 def process (iter: Iterator [ConsumerRecord [String, String]]) = {iter.map (line = > parse (line.value)) .filter (! _ .isEmpty) .foreach (line = > sendMsg2Topic (line) "mytopic2")} / / call kafka producer to send message def sendMsg2Topic (msg: String, topic: String): Unit = {val producer = new KafkaProducer [String, String] (getKafkaProducerParameters ()) val record = new ProducerRecord [String, String] (topic, msg) producer.send (record)} / / modify the data format Separate commas into vertical split def parse (text: String): String = {try {val arr = text.replace (",") .split (",") if (arr.length! = 15) return "" arr.mkString ("|")} catch {case e: Exception = > log.error ("error parsing data!" , e) ""}} / / define configuration information for kafka consumers def getKafkaConsumerParameters (groupid: String): Map [String, Object] = {Map [String, Object] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG-> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG-> classOf [StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG-> classOf [StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG-> groupid ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG-> (false: java.lang.Boolean),)} / / define the producer's kafka configuration def getKafkaProducerParameters (): Properties = {val prop = new Properties () prop.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf [StringSerializer]) prop.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ClassOf [StringSerializer]) prop}} 2ax * assume that the data of the airport are as follows: 1, "SFO" 2, "ORD" 3, "DFW" the routes and distances between the two airports are as follows: 1,2meme 18002, 3, 8003, 1, 1400 use GraphX to complete the following requirements: all vertices, all sides, all triplets vertices, number of edges, several airports with distances greater than 1000. Which are sorted by the distance between all airports (in descending order), and output results * /
Code:
Import org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.graphx. {Edge, Graph, VertexId} import org.apache.spark.rdd.RDDobject TwoHome {def main (args: Array [String]): Unit = {/ / initialize val conf = new SparkConf (). SetAppName (this.getClass.getCanonicalName.init) .setMaster ("local [*]") val sc = new SparkContext (conf) sc.setLogLevel ("warn") / / initialization data val vertexArray: Array [(Long) String)] = Array ((1L, "SFO"), (2L, "ORD"), (3L, "DFW")) val edgeArray: array [DFW] = Array (Edge (1L, 2L, 1800), Edge (2L, 3L, 800), Edge (3L, 1L, 1400)) / / construct vertexRDD and edgeRDD val vertexRDD [(VertexId) String)] = sc.makeRDD (vertexArray) val edgeRDD: RDD [Edge [Int]] = sc.makeRDD (edgeArray) / / Construction drawing val graph: Graph [String, Int] = Graph (vertexRDD) EdgeRDD) / / all vertices println ("all vertices:") graph.vertices.foreach (println) / / all edges println ("all edges:") graph.edges.foreach (println) / / all triplets println ("all triple information:") graph.triplets.foreach (println) / / find the number of vertices val vertexCnt = graph.vertices.count () println (s " Total number of vertices: $vertexCnt ") / val edgeCnt = graph.edges.count () println (s" total number of sides: $edgeCnt ") / / println with airport distance greater than 1000 (" edge information greater than 1000: ") graph.edges.filter (_ .attr > 1000) .foreach (println) / / sort by distance between all airports (descending) println (" sort places in descending order) Distance between airports ") graph.edges.sortBy (- _ .attr). Collect (). Foreach (println)}}
Running result
The above is about the content of this article on "how to achieve spark homework". I believe we all have a certain understanding. I hope the content shared by the editor will be helpful to you. If you want to know more about the relevant knowledge, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.