In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What is the application practice of PyFlink in Jumei? many novices are not very clear about it. In order to help you solve this problem, the following editor will explain it in detail. People with this need can come and learn. I hope you can get something.
The following will share with you the application of PyFlink in Shuobao, including background introduction, architecture evolution, technology selection and sharing of ideas to solve a problem.
1. Background introduction
Business scenario
Shuobao has many important business scenarios, one of which is to recommend short videos to users in real time. Among them, the real-time recommendation determines the length of stay on the video, video viewing time, retention and other indicators, and then affects the revenue of advertising space, such as the unit price of advertising. The business of Shuobao has grown rapidly since 2019, and by May 2020, the peak of user behavior data was more than 1 million per second, with 20 billion data per day. This volume of business challenges our existing technical architecture and the real-time performance of data computing.
Real-time challenge
It takes about an hour for our data flow to complete, which is far from the real-time requirement. How to recommend relevant videos in real time according to users' browsing habits more quickly will have a significant impact on the length of time users watch videos, stay time, retention and so on, such as an increase of 10-20% on the existing basis.
We prefer the real-time computing of the data, that is, to change the batch computing (hive) in the original technical architecture into real-time computing (Flink SQL). The architecture diagram is as follows.
two。 Architecture evolution
Architecture evolution
The first layer: the first is offline calculation, which takes 30 minutes to complete, excluding the subsequent model processing.
The second layer: after considering the real-time calculation, we intend to adopt the Flink architecture to deal with it, as shown in the figure.
The third layer: considering the cost of manpower and time, as well as the matching degree of skills of technicians, finally choose the third layer.
Our members are more familiar with Python and SQL, so PyFlink is more suitable for us. We have developed 20 business jobs with PyFlink, currently more than 1 million per second, 20 billion per day, and the business runs smoothly (PyFlink 1.10).
3. Technology selection
In the face of real-time business and architecture upgrade requirements, our team does not have the background of Spark, Flink and other frameworks, but by chance, we watched the live broadcast of teacher Jinzhu and learned that PyFlink is the Python API of Flink and matches the existing language skills of our team's developers. So I want to use PyFlink to upgrade the business in real time.
First acquaintance and difficulty
Although PyFlink and the team's language skills are compared with match, it still involves a lot of problems in the use of Flink environment, documentation, operators, and so on, and encounters many difficulties:
PyFlink has very few knowledge documents, examples, questions, and so on, and there are basically no other references except the official website and Aliyun.
The PyFlink official documentation lacks a lot of details, such as giving the method no parameter format.
The content of PyFlink is not clear, and the official website does not specify which PyFlink does not and which have. There is no clear distinction between Flink and PyFlink.
PyFlink itself and other limitations, such as: left/rigint Join generated retraction can not be written to Kafka, to write needs to rewrite Flink SQL to append mode, or modify the kafka-connector source code to support retraction.
So for a moment, I feel that the study time using PyFlink is also relatively long. People are worried that it will be difficult to meet business development in a short period of time. Opportunity
When my team and I were worried about the progress of development, I also kept an eye on the Flink community and happened to find that the Flink community was working on a "PyFlink support program", so my team and I filled out the PyFlink questionnaire. I also communicated with Mr. Jinzhu by email several times. Finally, I had the honor to participate in the PyFlink Community support Program.
4. OOM error reporting and solution ideas sharing
In fact, it is very convenient to understand the development of PyFlink. After completing the development of the first job, we are gradually familiar with the use of PyFlink. In about 3 weeks, we have completed the development of 20 business logic and entered the testing phase. This fast on the one hand, the team members are constantly familiar with PyFlink, on the other hand, by the community PyFlink team Jinzhu / Fu Dian and other teachers' help and support. Here, I will not share all the contents for you one by one. I will give you a specific example here.
■ background:
From the contact with Flink, there are individual job, there have been running beyond physical memory limits problems. It is useless to adjust tm memory and change the ratio of tm to slos many times, and you will hang up eventually. The final compromise is to increase the number of automatic restarts and restart tasks regularly.
■ phenomenon:
Flink job usually runs steadily for 5-6 days, and then reports this error. All the time and over and over again.
■ details:
Closing TaskExecutor connection container_e36_1586139242205_122975_01_000011 because: Container [pid=45659,containerID=container_e36_1586139242205_122975_01_000011] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.8 GB of 32 GB virtual memory used. Killing container. Dump of the process-tree for container_e36_1586139242205_122975_01_000011:-PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME (MILLIS) SYSTEM_TIME (MILLIS) VMEM_USAGE (BYTES) RSSMEM_USAGE (PAGES) FULL_CMD_LINE |-45659 45657 45659 45659 (bash) 00 115814400 297 / bin/bash-c / usr/local/jdk//bin/java-Xms2764m-Xmx2764m-XX:MaxDirectMemorySize=1332m-XX:+HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=/tmp/test.bin-Dlog.file=/data / emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log-Dlogback.configurationFile=file:./logback.xml-Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner-configDir. 1 > / data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.out 2 > / data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.err |-45705 45659 45659 45659 (java) 13117928 609539 6161567744 1048471 / usr/local/jdk//bin/java-Xms2764m-Xmx2764m-XX:MaxDirectMemorySize=1332m-XX:+HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=/tmp/test.bin-Dlog.file=/data/emr/yarn / logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log-Dlogback.configurationFile=file:./logback.xml-Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner-configDir.
Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143
Our solution:
1. From the content point of view is the oom problem, so at first adjust the tm size, directly to the maximum memory, 2 adjust the ratio of tm and slot, try to achieve 1v1.2. Dump heap memory, analyze the occupancy. 3. Adjust the type of backend state
Result: all the above methods have failed, and they are still bound to report errors after they have been going on for a period of time.
The thinking of the PyFlink team:
1. Analyze the state situation, job situation and job environment parameters of the current job. You can see the backend state situation through flink-conf, and you can know the job diagram and environment parameters through flinkdashboard. two。 Since the memory occupied by rocksdb statebackend defaults to non-managed memory in 1.10, you can set it to managed memory:env.get_state_backend (). _ j_rocks_db_state_backend.getMemoryConfiguration (). SetUseManagedMemory (True) 3 by adding the following code to the PyFlink job. In order to analyze whether OOM is caused by the continuous growth of memory occupied by rocksdb statebackend, the monitoring of rocksdb is enabled. Since we are using rocksdb, the following configuration needs to be added to flink-conf:
State.backend.rocksdb.metrics.block-cache-capacity: truestate.backend.rocksdb.metrics.block-cache-usage: truestate.backend.rocksdb.metrics.num-running- compactions: truestate.backend.rocksdb.metrics.num-running-flushes: truestate.backend.rocksdb.metrics.size-all-mem-tables: true then collects the presentation and analysis through the self-built metrics system, the grafana we use. 4. Through the previous steps, it is observed that the memory of rocksdb is basically stable, the memory footprint is as expected, and it is suspected that it is "a little bit overused by rocksdb, or the jvm overhead is not large enough". Both of these problems can be solved by adjusting the relevant parameters of jvm overhead. So the configuration is added to flink-conf:
Taskmanager.memory.jvm-overhead.min: 1024m
Taskmanager.memory.jvm-overhead.max: 2048m
To use the original words of the boss: rocksdb overused a little bit, or jvm overhead is not big enough, both cases of jvm overhead should be able to solve. 5. Adjust the size of the flink.size and let flink automatically calculate the process.size. After 14 days of waiting after all the adjustments of flink-conf:taskmanager.memory.flink.size: 1024m, the job is running normally, which fully shows that the problem has been solved. At the same time, we began to observe the metrics situation of rocksdb, and found that native memory will be overused, but the rocksdb as a whole remains stable. At present, it can be judged that more native memory is used in a certain place than in flink, which is likely to be dependent on user code or a third party, so increasing the large value of jvm-overhead can solve the problem. 6. The final parameters that need to be modified are:
1) add the following code to the pyflink job: env.get_state_backend (). _ j_rocks_db_state_backend.getMemoryConfiguration (). SetUseManagedMemory (True)
2) modify or add flink-conf:
Taskmanager.memory.jvm-overhead.min: 1024mtaskmanager.memory.jvm-overhead.max: 2048mtaskmanager.memory.process.size: 6144m
In fact, for this business upgrade, in order not to affect the final launch of the business, we initially prepared two sets of plans to be carried out at the same time:
Build and develop a platform based on a cloud platform
Code development based on open source PyFlink
The two programs were carried out at the same time, and finally our team completed the business development and testing quickly based on PyFlink development. Finally, it achieves the stable business support of millions per second / 20 billion per day as I mentioned earlier. The key point is that there are only 2 people involved in the development of this business upgrade.
5. Summary and prospect
Through the study of PyFlink, Shuobao big data team can have the ability of real-time data development in a short time. At present, the 20+PyFlink task is running steadily, and we have connected a number of demand departments, such as recommendation department, operation, advertising, etc.; in a variety of scenarios, PyFlink is used in model portrait computing, AB test system, advertising recommendation, user recall system, etc. It provides solid and stable real-time data for our business. In addition, we will build a real-time computing platform such as Flink on Zeppelin to expand the user base of Flink development and further simplify the cost of Flink development. Flink version 1.11 is also ready for launch, Python UDF features will be further optimized, and the Pandas module will be introduced. Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.