Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Good programmers share big data's architecture.

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Good programmers share big data's architecture:

Flume collects data

MapReduce

HBse (HDFS)

Yarn resource scheduling system

Display platform data platform

1, submit the task

2. Display the result data

Spark analysis engine S3 can perform all kinds of data analysis, but it can be integrated with hive, and spark tasks can be run on Yarn.

Submit tasks to the entry class SC of the cluster

Why use spark: fast, easy to use, universal, high compatibility

Hadoop

Scala

Jdk

Spark

If the result is a fixed-length toBuffer programming that becomes longer

Start the process

Spark cluster startup process and task submission

Primary node master

Multiple child nodes work

Start-all . The sh script starts the master service first and starts work

Master submit registration information work response work will send heartbeat information regularly

Cluster startup process

1. Call the start-all script to start Master

2. After master starts, the preStart method calls a timer to regularly check for timeout worker

3. The startup script parses the slaves configuration file, finds the corresponding node that starts work, and starts worker.

4. After the worker service starts, the prestart method (lifecycle method) is called to register with all master.

5. Master receives the registration information sent by work, and master begins to save the registration information and send its URL response to worker.

6. After receiving the URL of master and updating it, worker starts to use a timer to send heartbeat information to master on a regular basis.

Task submission process

Pipeline for submitting task rdd to Master through client submit (queue: first-in, first-out)

Worker starts the executor child process to get task information from master

Executor registers with the client Driver side

When the client receives the registration information, the client will send the task to Executor for character calculation.

Task submission process

1. First, the Driver side starts the sparkSubmint process through the spark-submit script, and then starts to create important objects (SparkContext). After startup, it begins to send information to Master to start communication.

2. After receiving the sent information, Master begins to generate the task information and put the task information into the queue

3. Master begins to filter out and sort all valid worker, sorting by idle resources

4. Master starts to notify the valid worker to get the task information, and starts the corresponding Executor

5. Worker starts Executor and reverse registers with Driver

6. Driver begins to send the generated task to the corresponding Executor,Executor

RDD generated in WordCount

There are three files on hdfs. The sc.textFile ("path") method generates the first RDD HadoopRDD and the second RDD MapPartitionsRDD flatMap (_ .split () "") generates the third RDD MapPartitionsRDD.

Map ((_, 1)) generates the fourth RDD MapPartitionsRDD reduceByKey, generates the fifth ShuffledRDD saveAsTextFile, generates the sixth RDD MapPartitionsRDD.

RDD can be seen by .toDebugString

Zoning

The Partition followed by the partition itself will not change, it will generate a new RDD partition after modification, because the rdd itself is immutable and larger than the original partition will occur. Shullfer less will not occur.

If the partition followed by coalesce is less than the original partition, it will change because it will not happen. When shuffle is greater, it cannot be changed.

PartitionBy followed by the new divider new full name of the divider org.apache.spark.hparPartition

The client submits Job task information to Master

Master generate task information master generate task information describe the data of the task notify work to create the corresponding Executor

The client gives the job information to work work and lets Executor calculate the data.

Object Demo {

Def main (args: Array [String]): Unit = {

/ / SparkConf: architecture configuration information class, which takes precedence over cluster configuration files

/ / setAppName: specify the application name. If it is not specified, a name similar to that generated by uuid will be generated automatically.

/ / setMaster: specify operation mode: local [1]-use one thread to simulate cluster operation, local [2]-use two clusters to simulate thread cluster operation, local [*]-run as many threads as there are

Val conf= new SparkConf () .setAppName ("") / / setAppName from the name where setMaster runs locally or [] how many threads are called to run

.setMaster ("local [2]") / / you don't need to delete or comment out this step when you package and upload a cluster.

/ / create an entry class (context object) to submit tasks to the cluster

Val sc = new SparkContext (conf)

/ / get the data of hdfs

Val lines = sc.textFile ("hdfs://suansn:9000/wc")

Val words= lines.flatMap (_ .split ("")) / / generate words after segmentation

Val tuples=words.map ((_, 1)) / / generate words into a tuple

Val sum= tuples.reduceBykey (_ + _) / / for aggregation

Val PX = sum.sortBy (_. _ 2jue false) / / flashback search

Print (PX.collect.toBuffer) / / print to the console. You don't need to delete or comment out the cluster when you package and upload the cluster.

PX.saveAsTextFile ("hdfs://suansn:9000/ssss")

Sc.stop / / release resources

}

}

The method provided by RDD is called operator

The abstraction of RDD dataset data is a type, providing methods to deal with data distribution only points to the data, immutable if you want other operations, define another RDD. If a file is less than 128m, it is a partition. If it is larger, it will be partitioned according to its size.

A set of fragments calculates the dependency between the functions of each partition RDD a Partitioner, that is, the fragmentation function of RDD. A list that stores the priority location (preferred location) for each Partition.

There are two types of RDD. One operator corresponds to the job of an Action.

1. Type delayed loading of Transformation conversion only records the calculation process and does not execute, only the job generation calculation is triggered after calling an operator of type Action.

If there is no Transformation operator but all Action operators, it is impossible to optimize that the cluster is always busy.

2 、 Action

Sc.parallelize parallel method to create RDD

Val rdd1 = sc.parallelize (3, 4, 6, 5, 8, 7, 9, 2, 1)

Multiply each data by 10

Val rdd2 = rdd1.map (_ * 10)

Array [Int] = Array (30, 40, 60, 50, 80, 70, 90, 20, 10)

Using Partition to calculate mapPartitions

Val rdd2= rdd1.mapPartitions (_ .map (_ * 10)) / / map before _ indicates that the data of each partition is encapsulated into Iterator

Array [Int] = Array (30, 40, 60, 50, 80, 70, 90, 20, 10)

The variation of mapWith / / map can also traverse the element data and return the partition number as input to type An as output

(constructA: Int = > A) (f: (T, A) = > RDD [U])

Parameter list: (constructA: Int = > A, preservesPartitioning: Boolean = false) (f: (T, A) = > U) / / Int = > the partition number of each partition of operation A, preservesPartitioning: Boolean = false whether to record the partition information of rdd (T, A) T elements in rdd

/ / the input of two A's in the step of Corey is realized.

Rdd1.mapWith (I = > iTun10) ((a, b) = > bread2). Collect / / partition number I multiplied by 10 B to receive the element of RDD when A

Array [Int] = Array (2 record2, 2, 12, 12, 12)

FlatMapWith / / Partition sort

(constructA: Int = > A) (f: (T, A) = > Seq [U])

Parameter list: (constructA: Int = > A, preservesPartitioning: Boolean = false) (f: (T, A) = > Seq [U])

Rdd1.flatMapWith (I = > I, true) ((x, y) = > List ((y, x). Collect / / I is the partition number original does not understand the output true is equivalent to allowing recording partition information Y to get the partition number X is the element of RDD

Array [(Int,Int)] = Array ((0Magne3) (0pje 4) (0je 6) (0pr 5) (1m 8) (1m 7) (1p 9) (1m 2) (1 min 1))

MapPartitions f: Iterator [T] = > Iterator [U]

Rdd1.mapPartitions (_ .toList.recipse.iterator) .collect / / each partition is arranged upside down

Array [Int] = Array (5,6,4,3,1,2,9,7,8)

MapPartitionsWithIndex loop partition and can manipulate the partition number

Parameter list: (F: (Int, Iterator [T]) = > Iterator [U], preservesPartitioning: Boolean = false) / / Iterator [(Int) partition information index: Int partition number

Val func = (index: Int, iter: Iterator [(Int)]) = > {

Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator

}

Val rdd1 = sc.parallelize (List, 2, 3, 4, 5, 6, 7, 8, 9)

Rdd1.mapPartitionsWithIndex (func). Collect

Array [String] = Array ([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

Aggregate / / aggregation operator

(zeroValue: U) (seqOp: (U, T) = > U, combOp: (U, U) = > U): U

Def func1 (index: Int, iter: Iterator [(Int)]): Iterator [String] = {

Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator

}

Val rdd1 = sc.parallelize (List, 2, 3, 4, 5, 6, 7, 8, 9)

Rdd1.mapPartitionsWithIndex (func1). Collect

Array [String] = Array ([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

Rdd1.aggregate (0) (math.max (_, _), _ + _) / / when the first _ gets the initial value 0, the second _ gets the first element of the zero partition, then determines the maximum value and then deduces the local aggregation in turn, and finally the initial value of the global aggregation + the maximum value of the zero partition. Maximum value of partition 1

Int=13

The principle of rdd1.aggregate (5) (math.max (_, _), _ + _) / / is the same as above, but when the initial value is 5, the maximum value of partition 0 is the initial value of 5, the maximum value of partition 1, or the final global aggregation of 9 is 5 + 5. 9.

Int=19

Val rdd2 = sc.parallelize (List ("a", "b", "c", "d", "e", "f"), 2)

Def func2 (index: Int, iter: Iterator [(String)]): Iterator [String] = {

Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator

}

Rdd2.mapPartitionsWithIndex (func2). Collect

Array [String] = Array ([partID:0, val: a], [partID:0, val: B], [partID:0, val: C], [partID:1, val: d], [partID:1, val: e], [partID:1, val: F])

Rdd2.aggregate ("") (_ + _, _ + _) / / both global and local aggregates belong to string concatenation. The initial value is empty.

String = abcdef String = defabc / / two results will occur because it is not sure which partition completes the task first

Rdd2.aggregate (=) (_ + _, _ + _)

String = abc=def

Val rdd3 = sc.parallelize (List ("12", "23", "345", "4567"), 2)

Rdd3.aggregate (") ((xPowery) = > math.max (x.length, y.length). ToString, (xpeny) = > x + y) / / compare the length of each string with the initial value for the first time, then compare the length of the second data with the length after the previous comparison, and finally add the longest string and the initial value of the two partitions in the global aggregation.

String = 24 String = 42

Val rdd4 = sc.parallelize (List ("12", "23", "345", "), 2)

Rdd4.aggregate (") ((xPowery) = > math.min (x.length, y.length). ToString, (xpeny) = > x + y) / / the operation method is the same as the above. The string is the shortest because there is an empty data string of 0 in the second partition. Because the initial value of the first partition is also empty, the string 0 becomes the string 0 after the first tostring is empty, the length is 1, and the global is 10.

String = 10

Val rdd5 = sc.parallelize (List ("12", "23", "345"), 2)

Rdd5.aggregate ("") ((xPowery) = > math.min (x.length, y.length) .toString, (xPerry) = > x + y) same as above

String = 11

AggregateByKey aggregates through the same key

(zeroValue: U, partitioner: Partitioner) (seqOp: (U, V) = > U, combOp: (U, U) = > U): RDD [(K, U)

/ / Partitioner divider

Val pairRDD = sc.parallelize (List (("mouse", 2), ("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12)), 2)

Def func2 (index: Int, iter: Iterator [(String, Int)]): Iterator [String] = {

Iter.toList.map (x = > "[partID:" + index + ", val:" + x + "]") .iterator

}

PairRDD.mapPartitionsWithIndex (func2). Collect

/ / No initial value will be added during global aggregation

PairRDD.aggregateByKey (0) (math.max (_, _), _ + _). Collect / / the same value of key to operate

PairRDD.aggregateByKey (math.max (_, _), _ + _). Collect

Operator of combineByKey / / aggregation

(createCombiner: v = > C, mergeValue: (C, V) = > C, mergeCombiners: (C, C) = > C)

Val rdd1 = sc.textFile ("hdfs://node01:9000/wc") .flatMap (_ .split (")) .map ((_, 1))

Val rdd2 = rdd1.combineByKey (x = > x, (a: Int, b: Int) = > a + b, (m: Int, n: Int) = > m + n)

Rdd2.collect

Val rdd3 = rdd1.combineByKey (x = > x + 10, (a: Int, b: Int) = > a + b, (m: Int, n: Int) = > m + n)

Rdd.collect

Val rdd4 = sc.parallelize (List ("dog", "cat", "gnu", "salmon", "rabbit", "turkey", "wolf", "bear", "bee"), 3)

Val rdd5 = sc.parallelize (List (1, 1, 2, 2), 3)

Val rdd6 = rdd5.zip (rdd4)

Val rdd7 = rdd6.combineByKey (List (_), (x: List [String], y: String) = > x: + y, (m: List [String], n: List [String]) = > m

CountByKey

Val rdd1 = sc.parallelize (List (("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)

Rdd1.countByKey / / the number of value with the same key

Rdd1.countByValue / / think of the whole rdd as Value

FilterByRange / / request for a given range

Val rdd1 = sc.parallelize (List (("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)

Val rdd2 = rdd1.filterByRange ("c", "d")

Rdd2.collect

FlatMapValues

Val rdd3 = sc.parallelize (List (("a", "1 2"), ("b", "3 4")

Rdd3.flatMapValues (_ .split (""))

FoldByKey

Val rdd1 = sc.parallelize (List ("dog", "wolf", "cat", "bear"), 2)

Val rdd2 = rdd1.map (x = > (x.length, x))

Val rdd3 = rdd2.foldByKey ("") (_ + _)

Val rdd = sc.textFile ("hdfs://node01:9000/wc") .flatMap (_ .split (")) .map ((_, 1))

Rdd.foldByKey (0) (_ + _)

ForeachPartition / /

Val rdd1 = sc.parallelize (List (1,2,3,4,5,6,7,8,9), 3)

Rdd1.foreachPartition (x = > println (x.reduce (_ + _) represents the aggregate value of the data for each partition

KeyBy

Val rdd1 = sc.parallelize (List ("dog", "salmon", "salmon", "rat", "elephant"), 3)

Val rdd2 = rdd1.keyBy (_ .length) the length of element data is generated as key element data is generated as value

Rdd2.collect

Keys values

Val rdd1 = sc.parallelize (List ("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

Val rdd2 = rdd1.map (x = > (x.length, x))

Rdd2.keys.collect

Rdd2.values.collect

Checkpoint

Sc.setCheckpointDir ("hdfs://node01:9000/cp")

Val rdd = sc.textFile ("hdfs://node01:9000/wc"). FlatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _)

Rdd.checkpoint prepares the files after checkpoint to be stored without Action operator and without running job

Rdd.isCheckpointed to see if checkpoint is running

Rdd.count randomly transfers the operator of Avtion to submit job

Rdd.isCheckpointed

Rdd.getCheckpointFile views the location of checkpoint's file storage

Repartition, coalesce, partitionBy

Val rdd1 = sc.parallelize (1 to 10,3)

Val rdd2 = rdd1.coalesce (2, false)

Rdd2.partitions.length

CollectAsMap Array translates map (kv) pair

Val rdd = sc.parallelize (List (("a", 1), ("b", 2)

Rdd.collectAsMap

In a certain time range, ask all users to stay in the TOP2 with the longest stay time after all the base stations.

Idea: get the log logs generated by users and split them up

The total amount of time the user stays at the base station

Basic information of past base stations

Join latitude and longitude information into user data

Find out the TOP2 of how long the user stays in some base stations

Object Demo {

Def main (args: Array [String]): Unit = {

/ / template code

Val conf = new SparkConf ()

.setAppName ("ML")

.setMaster ("local [2]")

Val sc= new SparkContext (conf)

/ / obtain the log of the user accessing the base station

Val files=sc.textFile ("address")

/ / split the user's log

Val userInfo=files.map (line= > {

Val fields=line.split (,)

Val phone = fields (0) / / user's mobile phone number

Val time = fields (1) .toLong// timestamp

Val lac = fields (2) / / Base Station ID

Val eventType = fields (3) / / event type

Val time_long = if (eventType.equals ("1"))-time else time

((phone,lac), time_long)

})

/ / the total length of time that the user stays at the same base station

Val sumedUserAndTime = userInfo.reduceByKey (_ + _)

/ / in order to facilitate Join with the basic information of the base station, you need to adjust the data, and use the base station ID as the key

Val lacAndPhoneAndTime sumedUserAndTime.map (tup = > {

Val phone = tup._1._1 / / user's mobile phone number

ID of val lac= tup._1._2// base station

Val time = tup._2 / / the total time a user stays at a base station

(lac, (phone,time))

})

/ / obtain the basic information of the base station

Val lacInfo= sc.textFile ("path")

/ / split the base station basic data

Val lacAndXY=lacInfo.map (line = > {

Val fields = line.split (",")

Val lac= fields (0) / / Base Station ID

Val x = files (1) / / longitude

Val y = fields (2) / / latitude

(lac, (XBI y))

})

/ / join the latitude and longitude information to the user's access information

Val joined=lacAndPhoneAndTime join lacAndXY

/ / in order to facilitate the sorting calculation of the group in the future, it is necessary to integrate the data.

Val phoneAndTimeAndXY=joined,map (tup= > {

Val phone = tup._2._1._1// mobile phone number

Val lac = tup._1// ID

Val time = tup._2._1._2

Val xy = tup._2._2 / / Latitude and Longitude

(phone,time,xy)

})

/ / grouping by user's mobile phone number

Val grouped=phoneAndTimeAndXY.groupBy (_. 1)

/ / sort within the group by duration

/ / val sorted = grouped.map (x = > (x.recording Magazine x.resume 2.toList.sortBy (_ .2) .reverse))

Val sorted = grouped.mapValues (_ .toList.sortBy (_ .2) .reverse)

/ / integrate data

Val filterede=sorted.map (tup = > {

Val phone= tup._1

Val list = tup._2

Val filteredList=list.map (x = > {)

Val time = x.room2

Val xy = x.room3

List (time,xy)

})

(phone,filteredList)

})

Val res = filterede.mapValues (_ .take (2))

Sc.stop ()

}

}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report