In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Background
Generally speaking, machine learning teams of large companies will try to build large-scale machine learning models. If you look at Baidu, Toutiao, Ali and other sharing, you have mentioned such models. Of course, everyone is talking about deep learning now, but in the recommendation and search scenarios, as far as I know, ROI is not very high, we still refer to the routine of wide&deep, in which deep is not very deep. The large-scale model is a very general framework, the advantage of this model is that it is very easy to add features, so the essence is to spell the quality and quantity of features, such as Baidu, headlines claimed to feature to hundreds of billions of scale. Some friends may not understand how large-scale features come from, to take a simple example, suppose you have millions of products, and then you have hundreds of user-side profile, the two do a cross feature, it is easy to scale over 1 billion. After the feature scale is large, we need PS to train. Thank you very much for Tencent's open source Angel, saving small companies like ours that do not have enough resources. Our practice results are very good.
There are many materials about large-scale machine learning on the Internet, most of which focus on why to do large-scale machine learning models and Parameter Server-related materials, but in practice, we find that large-scale feature preprocessing also has many problems to be solved. Once I talked with Mingfeng (formerly in Ali and later went to Tencent to do an open source PS:angel) about why there is no open source in this part of the work. The conclusion is that this part of the work is closely related to the business, and it makes it clear that there are not many technical highlights, so it belongs to coolie work, so there is no motivation for open source.
This paper summarizes the difficulties of the feature processing system of Mogujie search recommendation in the practice of large-scale machine learning model. Our technology selection is spark, and although the machine learning part of spark does not support large-scale (our experience is that the characteristics of the LR model can be up to 3000W), it is very suitable for feature processing. Thank you very much to the little friend @ Xuande in the group for contributing this article.
Overall flow chart
The characteristic of this methodology is that although the scale of the feature is very large, it is very sparse. We encode the feature set with onehot, and the storage requirement of each sample is very small. Because the scale is too large, coding has become a serious problem.
Continuous statistical features: in the field of e-commerce, statistical ctr and gmv are very important features.
Problems encountered in feature construction
1. It is too slow to replace the eigenvalues with the corresponding numerical index.
The eigenvalues in the combined training samples need to be replaced with the corresponding numerical index to generate the feature format of onehot.
Table 1 of the feature index mapping is in the following format:
In order to achieve this kind of calculation, we need to do unique coding for all the features, and then join the index table back to the original log table to replace the original features. The subsequent process uses the encoded values as onehot, but this part is easy to OOM and has performance problems. So we set out to optimize the process.
First of all, we think of the point is to broadcast the index table, so that there is no need to go merge join, no shuffle operation on the sample table, index table in a relatively small time, about the size of 4KW, broadcast is no problem, the actual internal implementation is map-side join, so the speed is also very fast, time reduced to an hour.
When the size of the index table reaches 5KW, when the whole table is broadcast directly, the gc on driver is very serious, and executor is also very unstable, which is difficult to understand at that time. Loading this part of data into memory alone takes up only about 20% of executor memory. Why is gc so serious? Later, I took a look at the principle of saprk and solved the doubts in my heart. Because spark2.x has removed HTTPBroadcast, the only implementation is TorrentBroadcast. The implementation principle is similar to the commonly used BT download technology. The basic idea is to divide the data into data blocks. If the executor fetches some data blocks, then the executor can be regarded as a data server. As more and more executor are fetched from the data and more data server are added, the data can quickly spread to all executor.
During the broadcast, a redundant copy of the data will be sent to blockManager for other executor to read. The principle is shown in the figure:
In the process of broadcasting, both the driver side and the executor side will have a short time to double the memory footprint.
Dirver end
The driver first sequenced the data into byteArray, cut the data block into small pieces and then broadcast it. In the process of cutting, the memory will continue to approach twice the size of byteArray, until the byteArray is released after cutting.
Executor end
Executor loading broadcast data is the reverse process of driver. Each time it gets a data block, it stores it in blockManager and informs driver's blockManagerMaster that the block has an extra storage space for other executor to download. After executor takes all the block from other places, first apply for an Array [Byte], deserialize the block data and get the original data. In this process and driver side application, memory will continue to close to twice the data size until deserialization is completed.
By understanding the implementation of spark broadcasting, we can explain the serious gc problem of broadcast 5kw dimension.
With the iteration of the experimental features, the number of columns in Table 2 will continue to increase, the processing time will increase linearly with the increase in the number of columns, and the scale of the feature index will increase, which will lead to more and more serious gc problems in the process of broadcasting until OOM appears frequently.
Two problems need to be solved at this stage.
1. Need to efficiently broadcast the data from Table 1 to each executor
two。 You cannot use join columns to implement alternative index values
Combining these two problems, we come up with a solution. Table 1 is sorted according to the eigenvalues, and then re-coded, using an array of length max (index values) to store, with the index value as the subscript, and the corresponding elements as eigenvalues. After broadcasting it to executor, each column of each row of the log is actually the corresponding eigenvalue. Go to the above array to find the corresponding index value and replace it.
Use subscript data storage table 1, the eigenvalues are calculated according to the average length of 64 characters, each character takes up 1 byte, 50 million dimensional features need 3.2 GB of memory, the actual performance of the broadcast ok, 100 million dimensional features need to occupy 6.4 GB of memory, according to the broadcast will have double memory consumption, gc will be more serious. We also thought of a way to hash the string into long,long, which takes up only 8 bytes, which saves a lot of space compared to storing strings. A problem with hash is that there may be conflicts. Because the 8-byte hash mapping space has-2 ^ 63 to 2 ^ 63-1, we use BKDRHash. The actual conflict rate is very low. In the business acceptable range, this method can greatly save memory. 100 million features take up only 800 megabytes of memory, so there is no pressure to broadcast. Accordingly, when traversing Table 2, the eigenvalues need to be searched after using the same algorithm hash. After this round of optimization, in the case of the same resources, the processing time of 1 billion rows and 5KW dimension features has been reduced to half an hour, and the memory situation is relatively stable.
After this situation has been running for a period of time, the feature scale has reached 100 million. It is found that the time of this step has risen to 45 minutes. After analyzing the distribution of the next feature, it is found that the continuous feature discretization appears frequently in the log. Because it is a continuous statistical value, it is very dense, and almost every piece of data has its own appearance. However, there are not many such features in Table 1, so the corresponding index values of these features can be saved by cache, but there is no need to search after hash, and a lot of time can be saved with a small amount of space. In the actual implementation, determine whether the eigenvalues you need to find are in line with the above situation. If so, directly use the eigenvalues of Table 2-> the index values of Table 1. The actual cache hit rate is 99.98888%. The actual time consumption is also significantly reduced from the previous 45 minutes to 17 minutes. Of course, cache is not a silver bullet. When calculating hash, cache is misused, so the calculation of this step becomes slow, because there are too many combinations of hash, the cache hit rate is only about 10%, and the computational complexity of hash is not high. When actually using the cache, it is necessary to count the hit ratio of the cache.
2. Some experience of Spark
1. Make good use of spark UI's SQL preview, do similar feature processing ETL tasks to pay more attention to the SQL, when doing this kind of feature processing work, this function is definitely a sharp weapon, early implementation time is in a hurry, test cases are relatively few, when checking the actual operation of logic errors, you can use the previous analysis of the data combined with the SQL option flow chart to locate the location of the data error.
two。 Use spark UI to find out skewed tasks, find Stages that takes a long time, and click in to see Aggregated Metrics by Executor.
3. You don't have to pay too much attention to a single task. If some Executor takes more time than others, it is usually caused by data cleaning (some slow nodes are not excluded)
4. Use UI to confirm whether caching is needed. If a task repeats steps very frequently and the data locality of the task is RACK_LOCAL, consider caching its upstream results. For example, when we count the frequency of single-column features here
5. The upstream data will be cached, but the amount of data is relatively large, so we choose to cache it to disk. There is something wrong with the method of automatically allocating memory and disk implemented by spark. I don't know if it's our posture or his implementation that has bug.
Summary of production version
Billion-level feature dimension, billions of samples (the whole sample is sampled, the effect loss is not obvious), run in about 20 minutes. However, this time data reference is of little significance, and it has something to do with running resources and machine performance, and the big manufacturers have too much advantage in this area. The core of this paper is to solve the problem of poor data processing performance or spark OOM when the index of feature coding reaches 100 million in the process of feature processing.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.