In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Guide: nowadays, the pace of life is accelerating day by day, enterprises are facing the increasing mass of information, and the problem of low efficiency of information screening and processing is increasing day by day. Due to the lack of refinement of user marketing, many outdated or unpreferred message pushes in enterprise App affect the user experience to a great extent, and even lead to the loss of users. In this context, Youxin Financial Services implements a global data system strategy, through the opening and integration of various business lines of the group data, the use of big data, artificial intelligence and other technologies to build unified data assets, such as ID-Mapping, user tags and so on. It is against this background that the Youxin Financial Services user profile project was established to realize the group strategy of "data-driven business and operation". At present, the system supports more than 1 billion daily data processing and access to hundreds of compliance data sources. I. Technology selection
The traditional offline data storage computing scheme based on Hadoop ecology has been widely used in the industry, but due to the high latency of offline computing, more and more data application scenarios have changed from offline to real-time. Here is a table to compare the current mainstream real-time computing frameworks.
The fault tolerance mechanism of Apache Storm needs to ACK each piece of data, so its throughput is greatly affected, and there will be problems in the scenario of large data throughput, so it is not suitable for the needs of this project.
The overall ecology of Apache Spark is more perfect, and it is temporarily leading in the integration and application of machine learning, but the bottom layer of Spark is still in the form of Micro Batching processing.
Apache Flink has obvious advantages in streaming computing: first of all, its streaming computing belongs to the real sense of single processing, that is, each piece of data will trigger the calculation. On this point, it is obviously different from the micro-batch flow processing of Spark. Secondly, the fault-tolerant mechanism of Flink is relatively lightweight and has little impact on throughput, so that Flink can achieve high throughput. Finally, Flink also has the advantages of high ease of use and simple deployment. In contrast, we finally decided to adopt a Flink-based architecture.
Second, the architecture of user profile service
The user profile system currently provides real-time label data services for the group's online business. For this reason, our service needs to open up a variety of data sources, carry out real-time uninterrupted data cleaning, clustering and analysis of massive digital information, so as to abstract them into tags, and finally provide high-quality label services for applications. Against this background, the overall architecture of our user portrait system is as follows:
The overall structure is divided into five layers:
Access layer: access the original data and process it, such as Kafka, Hive, files, etc. Calculation layer: Flink is selected as the real-time computing framework to clean and correlate the real-time data. Storage layer: we store the cleaned data, and we layer and build the model of real-time user portraits, and store the data of different application scenarios in Phoenix,HBase,HDFS,Kafka and so on. Service layer: provides unified data query services to support multi-dimensional computing services from underlying detail data to aggregation layer data. Application layer: unified query service is used to support each business line data scenario. At present, the business mainly includes user interest score, user quality score, user factual information and other data. Third, the process of user profile data processing
After the completion of the overall architecture design, we also designed a detailed processing scheme for the data. In the data processing stage, in view of the high throughput and high stability of Kafka, our user profile system adopts Kafka as the distributed publish and subscribe message system. In the data cleaning phase, Flink is used to achieve user uniqueness identification, behavior data cleaning and so on, and redundant data is removed. This process supports interactive computing and a variety of complex algorithms, as well as real-time / offline data computing. At present, we have iterated two versions of our data processing process, and the specific plan is as follows:
Version 1.0 data processing flow data access, calculation and storage three-tier processing flow
There are two overall data sources:
Historical data: a huge amount of historical business data accessed from external data sources. After access, it is processed by ETL and entered into the underlying data table of the user's portrait. Real-time data: real-time business data accessed from external data sources, such as user behavior data, risk control data, etc.
According to the requirements of different business indicators, we directly extract data from the group data warehouse and fall into the Kafka, or directly from the business side to CDC (Capture Data Change) into the Kafka. In the computing layer, the data is imported into Flink, ID-Mapping, user tag fragments and other data are generated through DataStream, and then the generated data is stored in JanusGraph (JanusGraph is a graph database medium stored with HBase as the back end) and Kafka, and the user tag fragment data falling into Kafka is consumed by Flink. Aggregate to generate the latest user tag fragments (the user tag fragments are generated by the user profile system after obtaining fragmented data blocks from multiple channels).
Data service layer processing flow
The service layer will store the fragmented data of user tags in the storage layer, and perform TinkerPop OLAP calculation to generate a full user Yids list file through JanusGraph Spark On Yarn mode. Yid is the group user ID identity defined in the user portrait system. Combined with Yids list file, read HBase in batch in Flink to aggregate complete user profile data, generate HDFS file, then generate user score prediction tag through Flink batch operation of the newly generated data, and then put the user score prediction tag into Phoenix, and then the data can be obtained through unified data service interface. This process is fully illustrated in the following figure.
ID-Mapping data structure
In order to realize the integration of user tags and the strong connection between user ID, we regard the user ID logo as the vertex of the graph and the ID pair relationship as the edge of the graph. For example, users who have identified the browser Cookie have logged on to the company website with their mobile phone number to form a corresponding relationship. In this way, all user ID identities constitute a large picture, in which each small connected subgraph / connected branch is all the identity ID information of a user.
The ID-Mapping data is constructed by the graph structure model, and the graph nodes include UserKey, Device, IdCard, Phone and other types, which represent the user's business ID, device ID, ID card and phone information respectively. The generation rule of edges between nodes is to analyze the node information contained in the data stream and connect the nodes in a certain priority order, so as to generate the edges between nodes. For example, the Android_ID of the user's mobile phone system is identified, and then the user logs in to the company App using the mailbox, and the ID pair of the relationship is formed when the business line UID is found in the system, and then the system prioritizes according to the node type to generate the diagram of Android_ID, mail and UID. The data graph structure model is shown in the following figure:
Gephi
Performance bottleneck of data processing process in version 1.0
The data processing flow of version 1.0 meets our daily needs well at the beginning of the system, but with the growth of the amount of data, the scheme encounters some performance bottlenecks:
First of all, this version of data processing is realized by using a self-developed Java program. With the increase in the amount of data, the JVM memory size of the self-developed JAVA program is uncontrollable due to the sudden increase in the amount of data, and its maintenance cost is very high, so we decided to migrate all the processing logic to Flink in the new version. Second, there are many large connected subgraphs in ID-Mapping during the process of generating user tags (as shown in the following figure). This is usually due to the random discretization of user behavior data, which leads to confusion in the connection between some nodes. This not only increases the difficulty of data maintenance, but also causes some data to be "contaminated". In addition, this kind of unusually large subgraph will seriously reduce the query performance of JanusGraph and HBase.
Gephi
Finally, the data in this version is serialized by Protocol Buffer (PB) and stored in HBase, which will result in too many times of merging / updating user portrait tag fragments, which makes a tag need to read JanusGraph and HBase multiple times, which will undoubtedly increase the HBase reading pressure. In addition, because the data is serialized by PB, its original storage format is unreadable, which makes it more difficult to troubleshoot the problem.
In view of these problems, we propose a version 2.0 solution. In version 2.0, we try to solve the above three problems by using optimization schemes such as HBase column storage and modifying graph data structure.
Version 2.0 data processing process version process optimization point
As shown in the following figure, version 2.0 data processing processes are mostly inherited from version 1.0. The new version of the data processing process is optimized in the following aspects:
Version 2.0 data processing flow
The offline recording mode of historical data is changed from JAVA service to using Flink. Optimize the user portrait data structure model, mainly to modify the connection mode of the edge. Previously, we will determine the type of nodes and connect multiple nodes according to the preset priority order, while the new scheme adopts a UserKey-centric connection method. After this modification, the previous large connected subgraph (figure 6) is optimized to the following small connected subgraph (figure 8). At the same time, the problem of data pollution is solved and the data accuracy is ensured. In addition, the situation that a piece of data needs to be read more than a dozen times HBase on average in version 1.0 has also been greatly alleviated. After adopting the new scheme, the average HBase of a piece of data needs to be read only three times, thus reducing the reading pressure of HBase by six or seven times (the optimization here is the optimization of the data computing layer).
Gephi
In the old version, Protocol Buffer was used as the storage object of user profile data, and the user profile data was generated and stored in HBase as a column. The new version uses Map to store user profile tag data, each pair of KV in Map is a separate tag, and KV is a separate column after it is stored in HBase. The new version storage mode uses HBase to expand and merge columns, directly generate complete user profile data, remove the Flink merge / update user profile label process, and optimize the data processing process. After using this scheme, the tag data stored in HBase has the function of impromptu query. Data with ad hoc query refers to the function that you can view the data details of specified tags directly with specific conditions in HBase. It is the basic condition that data governance can verify data quality, data life cycle, data security and other functions. In the data service layer, we use Flink to read the Hive external table of HBase in batches to generate user quality grading data, and then store it in Phoenix. Compared with the old scheme, the full read HBase of Spark leads to too much reading pressure, which leads to the problem of cluster node downtime, the new scheme can effectively reduce the reading pressure of HBase. After our online verification, the read load of the new scheme on HBase has been reduced by tens of times (here optimization is different from 2 optimization, which belongs to service layer optimization). IV. Problems
At present, most of the data in the online user portrait system comes from the real-time data of Kafka. With the increasing amount of data, the pressure of the system is also increasing, so that there are some problems, such as Flink back pressure and Checkpoint timeout, which lead to the failure of Flink to submit Kafka displacement, thus affecting the data consistency. These online problems have led us to focus on the reliability, stability, and performance of Flink. In view of these problems, we made a detailed analysis and combined with our own business characteristics, explored and practiced some corresponding solutions.
CheckPointing process Analysis and performance Optimization Scheme CheckPointing process Analysis
The following figure shows the flow chart of checkpointing execution in Flink:
Checkpointing execution flow in Flink
Coordinator issues Barrier to all Source nodes. After receiving all the Barrier from the input, Task writes its state to persistent storage and continues to pass Barrier to its downstream. When the Task completes the state persistence, the stored state address is notified to the Coordinator. When Coordinator summarizes the status of all the Task and writes the storage path of the data to the persistent storage, the CheckPointing is completed. Performance optimization scheme
Through the above process analysis, we can improve the performance of Checkpointing in three ways. These options are:
Choose appropriate Checkpoint storage mode, reasonably increase operator (Task) parallelism, shorten operator chain (Operator Chains) length, choose appropriate Checkpoint storage mode.
CheckPoint storage methods include MemoryStateBackend, FsStateBackend and RocksDBStateBackend. As can be seen from the official documentation, the performance and security of different StateBackend vary greatly. In general, MemoryStateBackend is suitable for test environments, and RocksDBStateBackend is the best choice for online environments.
There are two reasons for this: first, RocksDBStateBackend is external storage, and the other two Checkpoint storage methods are JVM heap storage. Limited by the size of JVM heap memory, Checkpoint state size and security may be limited; second, RocksDBStateBackend supports incremental checkpoints. The incremental checkpoint mechanism (Incremental Checkpoints) records only changes to previously completed checkpoints, rather than generating a complete state. Incremental checkpoints can significantly reduce checkpointing time compared to full checkpoints, but at the cost of longer recovery time.
Reasonably increase the parallelism of operator (Task)
Checkpointing needs to collect the data state of each Task. The more state data for a single Task, the slower the Checkpointing. So we can shorten the Task time by increasing the parallelism of CheckPointing and reducing the number of individual Task state data.
Shortening the length of operator chain (Operator Chains)
The longer the chain of Flink operators (Operator Chains), the more Task, the more state data, and the slower the Checkpointing. By shortening the length of operator chain, the number of Task can be reduced, thus the total amount of state data in the system can be reduced, and the purpose of optimizing Checkpointing can be achieved indirectly. The merging rules of the chain of Flink operators are shown below:
The degree of parallelism of upstream and downstream nodes is consistent. The degree of entry of downstream nodes is 1. The upstream and downstream nodes are all in the same Slot Group. The Chain policy of upstream nodes is ALWAYS. The Chain policy of upstream nodes is ALWAYS or HEAD. The way of data partitioning between two nodes is that Forward users do not disable Chain.
Based on the above rules, we merge some Task with high correlation at the code level, so that the average operator chain length is reduced by at least 60% to 70%.
Flink back pressure generation process analysis and solution back pressure generation process analysis
During the Flink run, each operator consumes an intermediate / transition state stream, transitions them, and then produces a new stream. The analogy of this mechanism is that Flink uses blocking queues as bounded buffers. As with blocking queues in Java, once the queue reaches its capacity limit, slower consumers block producers from sending new messages or events to the queue. The following figure shows the data transfer between two operators in Flink and how the back pressure is perceived:
First, the event in Source enters the Flink and is processed by operator 1 and serialized into Buffer, and then operator 2 reads the event from the Buffer. When the processing power of operator 2 is insufficient, the data in operator 1 can not be put into Buffer, thus forming the back pressure. Back pressure may occur for the following two reasons:
The processing power of the downstream operator is insufficient; the data is skewed. Back pressure solution
In practice, we solve the problem of back pressure in the following ways. First of all, shortening the operator chain will reasonably merge the operators and save resources. Secondly, shortening the operator chain will also reduce the switching between Task (threads), the serialization / deserialization of messages and the exchange times of data in the buffer, thus improving the overall throughput of the system. Finally, the data that is not needed or not needed for the time being is filtered according to the data characteristics, and then the data is processed separately according to the business requirements. For example, some data sources need real-time processing, and some data can be delayed. Finally, by using the keyBy keyword, control the size of the Flink time window, and merge as much data as possible in the upstream operator processing logic to reduce the processing pressure of the downstream operators.
Optimization result
After the above optimization, with hundreds of millions of data per day, user portraits can be used for real-time information processing without continuous back pressure, and the average Checkpointing duration is stable within 1 second.
Thinking and prospect of future work end-to-end real-time stream processing
At present, part of the user's portrait data is obtained from the Hive data warehouse, the data warehouse itself is Tunable 1 mode, the data delay is large, so in order to improve the real-time data, end-to-end real-time stream processing is very necessary.
End-to-end means that one end collects the original data, the other side presents and applies these logarithms in the way of report / label / interface, and the middle real-time stream connects the two ends. In the follow-up work, we plan to switch all the existing non-real-time data sources to real-time data sources, and then import them to Phoenix/JanusGraph/HBase after being processed by Kafka and Flink. One of the advantages of forcing all data source data into Kafka is that it can improve the stability and availability of the whole process: first, as a buffer of the downstream system, Kafka can avoid the abnormal influence of the downstream system on the calculation of real-time flow and play the role of "cutting peak and filling valley". Secondly, since version 1.4, Flink officially supports accurate end-to-end processing semantics with Kafka, which is more guaranteed in terms of consistency.
Original text link
This article is the original content of Aliyun and may not be reproduced without permission.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.