In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Good programmers share big data's architecture:
Flume collects data
MapReduce
HBse (HDFS)
Yarn resource scheduling system
Display platform data platform
1, submit the task
2. Display the result data
Spark analysis engine S3 can perform all kinds of data analysis, but it can be integrated with hive, and spark tasks can be run on Yarn.
Submit tasks to the entry class SC of the cluster
Why use spark: fast, easy to use, universal, high compatibility
Hadoop
Scala
Jdk
Spark
If the result is a fixed-length toBuffer programming that becomes longer
Start the process
Spark cluster startup process and task submission
Primary node master
Multiple child nodes work
Start-all . The sh script starts the master service first and starts work
Master submit registration information work response work will send heartbeat information regularly
Cluster startup process
1. Call the start-all script to start Master
2. After master starts, the preStart method calls a timer to regularly check for timeout worker
3. The startup script parses the slaves configuration file, finds the corresponding node that starts work, and starts worker.
4. After the worker service starts, the prestart method (lifecycle method) is called to register with all master.
5. Master receives the registration information sent by work, and master begins to save the registration information and send its URL response to worker.
6. After receiving the URL of master and updating it, worker starts to use a timer to send heartbeat information to master on a regular basis.
Task submission process
Pipeline for submitting task rdd to Master through client submit (queue: first-in, first-out)
Worker starts the executor child process to get task information from master
Executor registers with the client Driver side
When the client receives the registration information, the client will send the task to Executor for character calculation.
Task submission process
1. First, the Driver side starts the sparkSubmint process through the spark-submit script, and then starts to create important objects (SparkContext). After startup, it begins to send information to Master to start communication.
2. After receiving the sent information, Master begins to generate the task information and put the task information into the queue
3. Master begins to filter out and sort all valid worker, sorting by idle resources
4. Master starts to notify the valid worker to get the task information, and starts the corresponding Executor
5. Worker starts Executor and reverse registers with Driver
6. Driver begins to send the generated task to the corresponding Executor,Executor
RDD generated in WordCount
There are three files on hdfs. The sc.textFile ("path") method generates the first RDD HadoopRDD and the second RDD MapPartitionsRDD flatMap (_ .split () "") generates the third RDD MapPartitionsRDD.
Map ((_, 1)) generates the fourth RDD MapPartitionsRDD reduceByKey, generates the fifth ShuffledRDD saveAsTextFile, generates the sixth RDD MapPartitionsRDD.
RDD can be seen by .toDebugString
Zoning
The Partition followed by the partition itself will not change, it will generate a new RDD partition after modification, because the rdd itself is immutable and larger than the original partition will occur. Shullfer less will not occur.
If the partition followed by coalesce is less than the original partition, it will change because it will not happen. When shuffle is greater, it cannot be changed.
PartitionBy followed by the new divider new full name of the divider org.apache.spark.hparPartition
The client submits Job task information to Master
Master generate task information master generate task information describe the data of the task notify work to create the corresponding Executor
The client gives the job information to work work and lets Executor calculate the data.
Object Demo {
Def main (args: Array [String]): Unit = {
/ / SparkConf: architecture configuration information class, which takes precedence over cluster configuration files
/ / setAppName: specify the application name. If it is not specified, a name similar to that generated by uuid will be generated automatically.
/ / setMaster: specify operation mode: local [1]-use one thread to simulate cluster operation, local [2]-use two clusters to simulate thread cluster operation, local [*]-run as many threads as there are
Val conf= new SparkConf () .setAppName ("") / / setAppName from the name where setMaster runs locally or [] how many threads are called to run
.setMaster ("local [2]") / / you don't need to delete or comment out this step when you package and upload a cluster.
/ / create an entry class (context object) to submit tasks to the cluster
Val sc = new SparkContext (conf)
/ / get the data of hdfs
Val lines = sc.textFile ("hdfs://suansn:9000/wc")
Val words= lines.flatMap (_ .split ("")) / / generate words after segmentation
Val tuples=words.map ((_, 1)) / / generate words into a tuple
Val sum= tuples.reduceBykey (_ + _) / / for aggregation
Val PX = sum.sortBy (_. _ 2jue false) / / flashback search
Print (PX.collect.toBuffer) / / print to the console. You don't need to delete or comment out the cluster when you package and upload the cluster.
PX.saveAsTextFile ("hdfs://suansn:9000/ssss")
Sc.stop / / release resources
}
}
The method provided by RDD is called operator
The abstraction of RDD dataset data is a type, providing methods to deal with data distribution only points to the data, immutable if you want other operations, define another RDD. If a file is less than 128m, it is a partition. If it is larger, it will be partitioned according to its size.
A set of fragments calculates the dependency between the functions of each partition RDD a Partitioner, that is, the fragmentation function of RDD. A list that stores the priority location (preferred location) for each Partition.
There are two types of RDD. One operator corresponds to the job of an Action.
1. Type delayed loading of Transformation conversion only records the calculation process and does not execute, only the job generation calculation is triggered after calling an operator of type Action.
If there is no Transformation operator but all Action operators, it is impossible to optimize that the cluster is always busy.
2 、 Action
Sc.parallelize parallel method to create RDD
Val rdd1 = sc.parallelize (3, 4, 6, 5, 8, 7, 9, 2, 1)
Multiply each data by 10
Val rdd2 = rdd1.map (_ * 10)
Array [Int] = Array (30, 40, 60, 50, 80, 70, 90, 20, 10)
Using Partition to calculate mapPartitions
Val rdd2= rdd1.mapPartitions (_ .map (_ * 10)) / / map before _ indicates that the data of each partition is encapsulated into Iterator
Array [Int] = Array (30, 40, 60, 50, 80, 70, 90, 20, 10)
The variation of mapWith / / map can also traverse the element data and return the partition number as input to type An as output
(constructA: Int = > A) (f: (T, A) = > RDD [U])
Parameter list: (constructA: Int = > A, preservesPartitioning: Boolean = false) (f: (T, A) = > U) / / Int = > the partition number of each partition of operation A, preservesPartitioning: Boolean = false whether to record the partition information of rdd (T, A) T elements in rdd
/ / the input of two A's in the step of Corey is realized.
Rdd1.mapWith (I = > iTun10) ((a, b) = > bread2). Collect / / partition number I multiplied by 10 B to receive the element of RDD when A
Array [Int] = Array (2 record2, 2, 12, 12, 12)
FlatMapWith / / Partition sort
(constructA: Int = > A) (f: (T, A) = > Seq [U])
Parameter list: (constructA: Int = > A, preservesPartitioning: Boolean = false) (f: (T, A) = > Seq [U])
Rdd1.flatMapWith (I = > I, true) ((x, y) = > List ((y, x). Collect / / I is the partition number original does not understand the output true is equivalent to allowing recording partition information Y to get the partition number X is the element of RDD
Array [(Int,Int)] = Array ((0Magne3) (0pje 4) (0je 6) (0pr 5) (1m 8) (1m 7) (1p 9) (1m 2) (1 min 1))
MapPartitions f: Iterator [T] = > Iterator [U]
Rdd1.mapPartitions (_ .toList.recipse.iterator) .collect / / each partition is arranged upside down
Array [Int] = Array (5,6,4,3,1,2,9,7,8)
MapPartitionsWithIndex loop partition and can manipulate the partition number
Parameter list: (F: (Int, Iterator [T]) = > Iterator [U], preservesPartitioning: Boolean = false) / / Iterator [(Int) partition information index: Int partition number
Val func = (index: Int, iter: Iterator [(Int)]) = > {
Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator
}
Val rdd1 = sc.parallelize (List, 2, 3, 4, 5, 6, 7, 8, 9)
Rdd1.mapPartitionsWithIndex (func). Collect
Array [String] = Array ([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
Aggregate / / aggregation operator
(zeroValue: U) (seqOp: (U, T) = > U, combOp: (U, U) = > U): U
Def func1 (index: Int, iter: Iterator [(Int)]): Iterator [String] = {
Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator
}
Val rdd1 = sc.parallelize (List, 2, 3, 4, 5, 6, 7, 8, 9)
Rdd1.mapPartitionsWithIndex (func1). Collect
Array [String] = Array ([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
Rdd1.aggregate (0) (math.max (_, _), _ + _) / / when the first _ gets the initial value 0, the second _ gets the first element of the zero partition, then determines the maximum value and then deduces the local aggregation in turn, and finally the initial value of the global aggregation + the maximum value of the zero partition. Maximum value of partition 1
Int=13
The principle of rdd1.aggregate (5) (math.max (_, _), _ + _) / / is the same as above, but when the initial value is 5, the maximum value of partition 0 is the initial value of 5, the maximum value of partition 1, or the final global aggregation of 9 is 5 + 5. 9.
Int=19
Val rdd2 = sc.parallelize (List ("a", "b", "c", "d", "e", "f"), 2)
Def func2 (index: Int, iter: Iterator [(String)]): Iterator [String] = {
Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator
}
Rdd2.mapPartitionsWithIndex (func2). Collect
Array [String] = Array ([partID:0, val: a], [partID:0, val: B], [partID:0, val: C], [partID:1, val: d], [partID:1, val: e], [partID:1, val: F])
Rdd2.aggregate ("") (_ + _, _ + _) / / both global and local aggregates belong to string concatenation. The initial value is empty.
String = abcdef String = defabc / / two results will occur because it is not sure which partition completes the task first
Rdd2.aggregate (=) (_ + _, _ + _)
String = abc=def
Val rdd3 = sc.parallelize (List ("12", "23", "345", "4567"), 2)
Rdd3.aggregate (") ((xPowery) = > math.max (x.length, y.length). ToString, (xpeny) = > x + y) / / compare the length of each string with the initial value for the first time, then compare the length of the second data with the length after the previous comparison, and finally add the longest string and the initial value of the two partitions in the global aggregation.
String = 24 String = 42
Val rdd4 = sc.parallelize (List ("12", "23", "345", "), 2)
Rdd4.aggregate (") ((xPowery) = > math.min (x.length, y.length). ToString, (xpeny) = > x + y) / / the operation method is the same as the above. The string is the shortest because there is an empty data string of 0 in the second partition. Because the initial value of the first partition is also empty, the string 0 becomes the string 0 after the first tostring is empty, the length is 1, and the global is 10.
String = 10
Val rdd5 = sc.parallelize (List ("12", "23", "345"), 2)
Rdd5.aggregate ("") ((xPowery) = > math.min (x.length, y.length) .toString, (xPerry) = > x + y) same as above
String = 11
AggregateByKey aggregates through the same key
(zeroValue: U, partitioner: Partitioner) (seqOp: (U, V) = > U, combOp: (U, U) = > U): RDD [(K, U)
/ / Partitioner divider
Val pairRDD = sc.parallelize (List (("mouse", 2), ("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12)), 2)
Def func2 (index: Int, iter: Iterator [(String, Int)]): Iterator [String] = {
Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator
}
PairRDD.mapPartitionsWithIndex (func2). Collect
/ / No initial value will be added during global aggregation
PairRDD.aggregateByKey (0) (math.max (_, _), _ + _). Collect / / the same value of key to operate
PairRDD.aggregateByKey (math.max (_, _), _ + _). Collect
Operator of combineByKey / / aggregation
(createCombiner: v = > C, mergeValue: (C, V) = > C, mergeCombiners: (C, C) = > C)
Val rdd1 = sc.textFile ("hdfs://node01:9000/wc") .flatMap (_ .split (")) .map ((_, 1))
Val rdd2 = rdd1.combineByKey (x = > x, (a: Int, b: Int) = > a + b, (m: Int, n: Int) = > m + n)
Rdd2.collect
Val rdd3 = rdd1.combineByKey (x = > x + 10, (a: Int, b: Int) = > a + b, (m: Int, n: Int) = > m + n)
Rdd.collect
Val rdd4 = sc.parallelize (List ("dog", "cat", "gnu", "salmon", "rabbit", "turkey", "wolf", "bear", "bee"), 3)
Val rdd5 = sc.parallelize (List (1, 1, 2, 2), 3)
Val rdd6 = rdd5.zip (rdd4)
Val rdd7 = rdd6.combineByKey (List (_), (x: List [String], y: String) = > x: + y, (m: List [String], n: List [String]) = > m
CountByKey
Val rdd1 = sc.parallelize (List (("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)
Rdd1.countByKey / / the number of value with the same key
Rdd1.countByValue / / think of the whole rdd as Value
FilterByRange / / request for a given range
Val rdd1 = sc.parallelize (List (("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)
Val rdd2 = rdd1.filterByRange ("c", "d")
Rdd2.collect
FlatMapValues
Val rdd3 = sc.parallelize (List (("a", "1 2"), ("b", "3 4")
Rdd3.flatMapValues (_ .split (""))
FoldByKey
Val rdd1 = sc.parallelize (List ("dog", "wolf", "cat", "bear"), 2)
Val rdd2 = rdd1.map (x = > (x.length, x))
Val rdd3 = rdd2.foldByKey ("") (_ + _)
Val rdd = sc.textFile ("hdfs://node01:9000/wc") .flatMap (_ .split (")) .map ((_, 1))
Rdd.foldByKey (0) (_ + _)
ForeachPartition / /
Val rdd1 = sc.parallelize (List (1,2,3,4,5,6,7,8,9), 3)
Rdd1.foreachPartition (x = > println (x.reduce (_ + _) represents the aggregate value of the data for each partition
KeyBy
Val rdd1 = sc.parallelize (List ("dog", "salmon", "salmon", "rat", "elephant"), 3)
Val rdd2 = rdd1.keyBy (_ .length) the length of element data is generated as key element data is generated as value
Rdd2.collect
Keys values
Val rdd1 = sc.parallelize (List ("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
Val rdd2 = rdd1.map (x = > (x.length, x))
Rdd2.keys.collect
Rdd2.values.collect
Checkpoint
Sc.setCheckpointDir ("hdfs://node01:9000/cp")
Val rdd = sc.textFile ("hdfs://node01:9000/wc"). FlatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _)
Rdd.checkpoint prepares the files after checkpoint to be stored without Action operator and without running job
Rdd.isCheckpointed to see if checkpoint is running
Rdd.count randomly transfers the operator of Avtion to submit job
Rdd.isCheckpointed
Rdd.getCheckpointFile views the location of checkpoint's file storage
Repartition, coalesce, partitionBy
Val rdd1 = sc.parallelize (1 to 10,3)
Val rdd2 = rdd1.coalesce (2, false)
Rdd2.partitions.length
CollectAsMap Array translates map (kv) pair
Val rdd = sc.parallelize (List (("a", 1), ("b", 2)
Rdd.collectAsMap
In a certain time range, ask all users to stay in the TOP2 with the longest stay time after all the base stations.
Idea: get the log logs generated by users and split them up
The total amount of time the user stays at the base station
Basic information of past base stations
Join latitude and longitude information into user data
Find out the TOP2 of how long the user stays in some base stations
Object Demo {
Def main (args: Array [String]): Unit = {
/ / template code
Val conf = new SparkConf ()
.setAppName ("ML")
.setMaster ("local [2]")
Val sc= new SparkContext (conf)
/ / obtain the log of the user accessing the base station
Val files=sc.textFile ("address")
/ / split the user's log
Val userInfo=files.map (line= > {
Val fields=line.split (,)
Val phone = fields (0) / / user's mobile phone number
Val time = fields (1) .toLong// timestamp
Val lac = fields (2) / / Base Station ID
Val eventType = fields (3) / / event type
Val time_long = if (eventType.equals ("1"))-time else time
((phone,lac), time_long)
})
/ / the total length of time that the user stays at the same base station
Val sumedUserAndTime = userInfo.reduceByKey (_ + _)
/ / in order to facilitate Join with the basic information of the base station, you need to adjust the data, and use the base station ID as the key
Val lacAndPhoneAndTime sumedUserAndTime.map (tup = > {
Val phone = tup._1._1 / / user's mobile phone number
ID of val lac= tup._1._2// base station
Val time = tup._2 / / the total time a user stays at a base station
(lac, (phone,time))
})
/ / obtain the basic information of the base station
Val lacInfo= sc.textFile ("path")
/ / split the base station basic data
Val lacAndXY=lacInfo.map (line = > {
Val fields = line.split (",")
Val lac= fields (0) / / Base Station ID
Val x = files (1) / / longitude
Val y = fields (2) / / latitude
(lac, (XBI y))
})
/ / join the latitude and longitude information to the user's access information
Val joined=lacAndPhoneAndTime join lacAndXY
/ / in order to facilitate the sorting calculation of the group in the future, it is necessary to integrate the data.
Val phoneAndTimeAndXY=joined,map (tup= > {
Val phone = tup._2._1._1// mobile phone number
Val lac = tup._1// ID
Val time = tup._2._1._2
Val xy = tup._2._2 / / Latitude and Longitude
(phone,time,xy)
})
/ / grouping by user's mobile phone number
Val grouped=phoneAndTimeAndXY.groupBy (_. 1)
/ / sort within the group by duration
/ / val sorted = grouped.map (x = > (x.recording Magazine x.resume 2.toList.sortBy (_ .2) .reverse))
Val sorted = grouped.mapValues (_ .toList.sortBy (_ .2) .reverse)
/ / integrate data
Val filterede=sorted.map (tup = > {
Val phone= tup._1
Val list = tup._2
Val filteredList=list.map (x = > {)
Val time = x.room2
Val xy = x.room3
List (time,xy)
})
(phone,filteredList)
})
Val res = filterede.mapValues (_ .take (2))
Sc.stop ()
}
}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.