In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
How to analyze the testing methods and implementation of big data's products in Spark, many novices are not very clear about this. In order to help you solve this problem, the following editor will explain it in detail. People with this need can come and learn. I hope you can get something.
As a mainstream distributed computing framework, Spark has been integrated into many products as a solution for ETL. If we want to test such a product, we should have a clear understanding of the principle of distributed computing and be familiar with the use of distributed computing framework to design different test data for various ETL scenarios. Generally speaking, we need to test from the following two angles.
ETL is compatible with a variety of different data (different data sizes, data distributions and data types)
The correctness of data processing by ETL
Test data compatibility
ETL is an abbreviation for a series of operations such as data cleaning, extraction, conversion and so on according to certain rules. So generally speaking, he should be able to handle many different data types. A large part of the bug we encounter in production is that the production environment encounters extreme data that our ETL programs cannot handle. For example:
Data has a large number of fragments
In distributed computing, a piece of data is made up of multiple files scattered on HDFS, which may be scattered on different machines, but HDFS will give users a unified view and make users think that they are working on one file, not many files. This is how HDFS is stored in a distributed file system. Various distributed computing frameworks, such as hadoop's MapReduce, or spark. Taking advantage of this feature, files scattered on each machine are read directly and saved in the memory of that node (ideally, data migration between nodes may still occur if resources are insufficient).
The data read into memory is also partition. Spark reads data in units of 128m by default. If the data is less than this value, it will be stored as a shard, and if it is greater than this value, it will continue to increase the shard. For example, if the size of a file is 130m, when spark reads it, it will be divided into two partition in memory (one 128m and one 2m). If the file is very small, only 10m, it will also be stored in memory as a partition. So if a piece of data is stored in HDFS, the data is made up of 10 files scattered on each node. Then when spark is read, there will be at least 10 partition in memory, and the number of partitions will continue to increase if each file is larger than 128m.
When performing the calculation, the data stored in the memory of multiple nodes will execute the data calculation task concurrently. In other words, our data is stored in memory in multiple nodes, and we perform a computing task for each partition. So for a particularly large data computing task, we will first read the data into different memory of different nodes according to partition, that is, split the data into many small slices and put them in the memory of different machines. Then perform computing tasks on these small fragments. Finally, the results of each calculation task are aggregated. This is the basic principle of distributed computing.
Then the problem comes at this time, this kind of distributed computing framework based on partition. The number of partition determines the number of concurrency. It can be understood that if the data has 100 partition, there will be 100 threads doing computing tasks against that data. So the number of partition represents the degree of parallelism of computing. However, it does not mean that the more partition, the better. If the data is very small, but we split a large number of partition, it will be relatively slow. And the calculation results of all the fragments are finally aggregated in one place. All of this results in the overhead of the network IO (because the data is transmitted before different nodes). Especially in distributed computing, we have shuffle as a performance killer (for those who are not familiar with this concept, please see my previous article). Executing shuffle under a large number of shards will be a disaster, because a large number of network IO will cause the cluster to be under a high load or even paralyzed. We have come across data with only 500m but 7000 shards, and the result is that after multiple ETL programs were executed in parallel against this data, the entire hadoop cluster was paralyzed. This is the result of forgetting to do reparation (re-sharding) during data preprocessing.
Data tilt
The operation of shuffle appears in the above task processing. Shuffle is also called shuffling. When talking about partition and distributed computing principles above, we know that distributed computing is to divide data into many slices and store them on many different nodes, and then concurrently perform the same computing tasks on these slices to achieve the purpose of distributed computing. These tasks are independent of each other. For example, we perform a count operation, that is, to calculate the number of rows of this data. The actual operation is to perform count operations separately for each data fragment, that is, partition. For example, we have three shards that are APowerI BMague C, then when executing count, there are actually three concurrency threads. Each thread calculates the number of rows of a partition. After they are all calculated, they are aggregated into the driver program, that is, the calculation process of the three calculation tasks is independent and does not interfere with each other, and only aggregates after the calculation is completed.
But not all computing tasks can be so independent, for example, you have to perform a groupby sql operation. Just like in the picture above, I have to group the data by words before I can do other statistical calculations, such as counting word frequency or other related operations. So the first thing spark needs to do is to hash the fields of groupby, and transfer the data of the same value to a fixed partition. So just like the figure above, we assign the numbers with the same key value in the data to a partition, so that the data is grouped and isolated from the data fragments.
Then if we want to count the word frequency, we only need to do a count operation. The emergence of shuffle is for computing to be performed efficiently, and by aggregating similar data on the same partition, it is convenient that the computing tasks are still isolated and will not trigger the network IO. This is a design pattern that is convenient for subsequent calculation, that is, it saves the cost of a series of subsequent calculations. But the cost is the cost of shuffle itself, and in many cases the cost of shuffle itself is also very large. In particular, shuffle will have the famous phenomenon of long tail because of the tilt of data.
According to shuffle, similar data will be aggregated on the same partition. But what happens if our data is unevenly distributed? For example, we have to groupby for the occupation field, but what happens if 90W of the 100W rows of data are programmers? You will find that there are 90W rows of data running on the same partition to create a huge partition. This violates the original intention of distributed computing, which is to divide the data into a lot of small data and distribute them in different nodes' memory, and make use of the parallel computing capabilities of multiple nodes to accelerate the computing process.
But now most of our data are aggregated into one partition, which becomes a single point of computing. And there is a big problem here, that is, when we submit tasks to hadoop yarn, the requested resources are fixed and evenly distributed. For example, if I apply for 10 container to calculate this data, the resources of these 10 container are equal, neither more nor less. But the size of our data shard is different. For example, a 90W row shard requires 5 gigabytes of memory, but for other data shards, 1 gigabyte may be enough. Therefore, if we do not know that there is a data skew that leads to a lack of resources applied for, it will cause the task to OOM and fail. And if we apply for 5G resources for each container for huge data fragmentation, it will result in a waste of resources.
Data skew and shuffle are classic problems in the industry and are difficult to deal with. In many big data products, there will be the function of automatically adjusting the application resources according to the data size. And data tilt is the absolute natural enemy of this function. If it is not handled well, it will either become an application for too large resource contracting cluster, or too small resources will cause the task to fail. What we need to do in the testing phase is to simulate this skewed data, and then verify the performance of the ETL program.
Wide meter
A table with too many columns is a wide table. For example, the widest table I have ever seen is 1W column, especially in machine learning systems, because of the need to extract high-dimensional features, many tables are often spliced into a large wide table in the ETL phase. This wide table is the natural enemy of data visualization, for example, our function is to preview 100 rows of data at random. It is a laborious operation to transfer such an amount of data to the front end and render it. In particular, the preview itself also has to perform some calculations. If you add that there are already a large number of fragments of this data, it is necessary to open so many files in the background, plus read the data of such a wide table. It's even possible to OOM. In fact, I've actually seen OOM for this reason. So this test point is that we deliberately build such a wide meter for testing.
The other data types are not explained one by one, and they all have the same literal meaning.
Make a number
The reason why we also use spark as a distributed framework to create numbers, rather than using parquet or hdfs client alone, is because the data we create should not only meet some extreme scenarios, but also ensure that there is a sufficient amount of data. After all, ETL is all for big data scenarios. So taking advantage of spark's distributed computing can create a large amount of data in a short time. For example, I created a data of 100 million rows and 60 gigabytes two days ago, which only took 20 minutes.
Technical details
RDD is the distributed data structure of spark. A RDD is generated when a piece of data is read by spark, and of course RDD contains those partition. There are two ways to create a RDD. One is to read the RDD from an existing file, which is certainly not what we want. So we use the second method to generate RDD from a List in memory. As follows:
Public class Demo {public static void main (String [] args) {SparkConf conf = new SparkConf (). SetAppName ("data produce") .setMaster ("local"); JavaSparkContext sc = new JavaSparkContext (conf); SparkSession spark = SparkSession .builder () .appName ("JavaSpark SQL basic example") .getOrCreate (); List data = new XRange (1000); JavaRDD distData = sc.parallelize (data, 100)
The above is a demo I wrote, and the previous code that initializes spark conf and spark session can be ignored. Looking at the last two lines, XRange is a class I designed after python's xrange. You can help me create a List with an index sequence using a generator-like principle. In fact, we can create a list manually here. And the last line is that we convert a List into a RDD through spark's API. The first parameter of sc.parallelize is List, and the second parameter is the degree of parallelism you want to set, which can also be understood as the number of partition you want to generate this data. In fact, if we want to generate only a thousand rows of index data now, then we can call such an API: distData.saveAsTextFile ("path"); through such an API, we can directly save the file. Of course, this is definitely not what we want, because there is no data we need. So at this time we are going to send out an advanced interface of spark, dataframe. Dataframe is an advanced API developed by spark modeled on the design and development of pandas's dataframe. The function is very similar to pandas, we can think of a dataframe as a table, and it also has a lot of useful API. The most important thing is that we have a DataframeWriter class dedicated to saving dataframe data in a variety of formats and partitions. For example, it can be easily saved to traditional data such as scv,txt, and it can be easily saved to column-stored file formats such as parquet and orc. The operation of partition by is also provided to save as a partition table or a bucket table. In short, it can help us create all kinds of data we need. So how do we convert a RDD into the dataframe we need and populate it with the data we need? Look down:
List fields = new ArrayList (); String schemaString = "name,age"; fields.add (DataTypes.createStructField ("name", DataTypes.StringType, true)); fields.add (DataTypes.createStructField ("age", DataTypes.IntegerType, true)); StructType schema = DataTypes.createStructType (fields); / / Convert records of the RDD (people) to Rows JavaRDD rowRDD = distData.map (record-> {RandomStringField randomStringField = new RandomStringField (); randomStringField.setLength (10); BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField () Return RowFactory.create (randomStringField.gen (), binaryIntLabelField.gen ();}); Dataset dataset = spark.createDataFrame (rowRDD, schema); dataset.persist (); dataset.show (); DataFrameWriter writer = new DataFrameWriter (dataset); writer.mode (SaveMode.Overwrite) .partitionBy ("age"). Parquet ("/ Users/sungaofei/gaofei")
Each data in dataframe is a row, that is, a Row object, and dataframe has strict requirements for each column, that is, each schema. Because it's a watch. So it's the same as the table in the database or the table in pandas. Specify the schema of each column and the data of each row. So first let's define the schema and define the column name and data type of each schema. Then create the schema through DataTypes's API. So we have our column information. Then there is the key to how to convert a RDD into the Row needed by dataframe and populate each row of data. Here we use RDD's map method. In fact, dataframe is also a special RDD, and each line in this RDD is just a ROW object. So we use RDD's map method to populate each row of data and convert that row into a Row object.
JavaRDD rowRDD = distData.map (record-> {RandomStringField randomStringField = new RandomStringField (); randomStringField.setLength (10); BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField (); return RowFactory.create (randomStringField.gen (), binaryIntLabelField.gen ());})
Because there are only two columns defined before when schema is defined, namely name and age. So here I populate the data with a class that randomly generates the String type and a class that randomly generates the int type. Finally, the RowFactory.create method is used to generate a Row from the two data. The map method is actually a way for the user to process each row of data, and the parameter record is to give us the row data as a parameter. Of course, each line of the original RDD in this example is the index sequence number that was initialized when the List was generated. And we don't need it now, so we don't use it. Returns a random string and a number of int types directly. Then we have the RDD where each row of data is a Row object. You can generate the dataframe by calling the following API.
Dataset dataset = spark.createDataFrame (rowRDD, schema)
Pass in row and schema, respectively, to generate a table for dataframe. Finally, DataFrameWriter is used to save the data.
Well, this is the basic principle of making numbers, in fact, it is also quite simple. Of course, in order to strictly control the data distribution, data types, feature dimensions and so on, we need to do a lot of special processing. We won't go into the details here.
Test the correctness of ETL processing
Enter a piece of data and determine whether the output data is correct. It's just that we are processing and testing under a large amount of data, the input data is big data, and the ELT output is also big data, so we need some new testing means. In fact, this testing method is nothing new, it is the technology we have been talking about, that is, spark, a distributed computing framework. We use the spark task to test these ETL programs, which is also to test their own efficiency and performance. If we simply use hdfs client to read files, it is time-consuming to scan such a large amount of data, which is unacceptable. Therefore, it is inevitable for us to use big data technology to test big data's function. Of course, some students may think that I am just testing the function, not the processing performance of the algorithm, there is no need to use such a large amount of data. We can use smaller data, such as 100 rows of data. But in fact, this is also wrong, because in distributed computing, the processing results of a large amount of data and a small amount of data may not be exactly the same. For example, the scenario of randomly splitting data can only test bug with a large amount of data. And another scenario in big data's test is data monitoring, which scans the online data regularly to verify whether the online data is abnormal. This is also a test scenario, and the online data must be massive.
No more nonsense, just look at the following code snippet.
@ Features (Feature.ModelIde) @ Stories (Story.DataSplit) @ Description ("hierarchical split in random split using pyspark authentication") @ Test public void dataRandomFiledTest () {String script = "# coding: UTF-8\ n" + "# input script according to definition of\" interface\ n "+" from trailer import logger\ n "+" from pyspark import SparkContext\ n "+" from pyspark.sql import SQLContext\ n "+"\ n "+"\ n "+" def run (T1) T2, context_string):\ n "+" # T2 is the original data T1 is the data after the data split operator split according to the field\ n "+" # since the data split is a hierarchical split based on the column col_20, the two data are grouped and counted here respectively. Since this column is label\ n "+" #, there are actually only two groups Are 0 and 1\ n "+" t2_row = t2.groupby (t2.col_20). Agg ({\ "*\":\ "count\"}). Cache ()\ n "+" t1_row = t1.groupby (t1.col_20). Agg ({\ "*\":\ "count\"}). Cache ()\ n "+"\ n "+"\ n "+" T2 = t2_row. Filter (t2_row.col_20 = = 1). Collect () [0] [\ "count (1)\"]\ n "+" T2 room1 = t2_row.filter (t2_row.col_20 = = 0). Collect () [0] [\ "count (1)\"]\ n "+"\ n "+" T1 room0 = t1_row.filter (t1_row.col_20 = = 1). Collect () [0] [\ "count (1)\ "]\ n" + "T1 splitter 1 = t1_row.filter (t1_row.col_20 = = 0). Collect () [0] [\" count (1)\ "]\ n" + "# the data splitter is split according to the 1:1 scale of the field. So each grouping of T1 and T2\ n "+" # should be only half of the original data volume\ n "+" if T2 class is not splited correctly 2-T1 data 0 > 1:\ n "+" raise RuntimeError (\ "the 0 the\")\ n "+"\ n "+" if T2 # 1 class is not splited correctly 2-t 1 > 1:\ n "+" raise RuntimeError (\ "the 1 class is not splited correctly\")\ n "+"\ n "+" return [T1] "
The API we use to scan the data table is still the dataframe we mentioned earlier. The code snippet above is the script in which we embed the spark task. T1 and T2 are dataframe, representing the original data and the data split by the data splitting algorithm, respectively. The function of the test is to split layers. That is, the data is extracted proportionally according to a certain column. For example, for 100W rows of data, I split it in layers according to the job field, and I asked for a ratio of 30%. In other words, 30% of the data is extracted from each profession, which is equivalent to a data sampling function. OK, so in the test script, we first group the original table and the sampled table according to this column, namely groupby (col_20). What I choose here is to split it layered by col_20. According to the grouping operation just mentioned, shuffle will be triggered to transfer data of the same occupation to a data shard. Then we do the operation count to count the number of rows in each group. Because this algorithm I split by 1:1, that is, by 50% sampling. So finally, I want to verify that the number of rows in each group of the split data is half that of the original data.
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.