In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how to use Python API in Apache Flink, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.
I. the past Life, present Life and Future Development of Apache Flink Python API Why did 1.Flink choose to support Python
Apache Flink is a unified open source big data computing engine, which opens a new ML interface and a new Python API architecture in Flink version 1.9.0. So why should Flink add support for Python, which will be analyzed in detail below.
The most popular development language
Python itself is an excellent development language, and according to RedMonk statistics, apart from Java and JavaScript, it ranks third in popularity.
RedMonk is a well-known developer-centric industry analysis company, its more detailed analysis information, after you get my PPT, you can click the link for detailed review. Okay, so what does the popularity of Python have to do with the unified big data computing engine that we share with you today, Apache Flink? With this question, let's think about the famous open source components related to big data. For example, the earliest batch framework Hadoop? Stream computing platform Storm, the recent unusually hot Spark? HBase of Hive,KV storage in XOR other domains? These are very famous open source projects, so these projects are supported by Python API without exception.
Support for many open source projects
Based on the fact that the ecology of Python is relatively perfect, Apache Flink has put a lot of effort into introducing a brand-new Pyflink in version 1.9. In addition to big data, artificial intelligence and Python also have a very close relationship.
ML's preferred language
From the statistics above, we can find that Python API itself already accounts for 0.129% of the language required for machine learning jobs. Python seems to be more popular than R language. Python as an interpretive language, the design philosophy of grammar is "one way and only one way to do one thing". Its simplicity and ease of use make it the most popular language in the world, and it has a good ecological construction in the field of big data computing. At the same time, Python has a good prospect in machine learning and machine learning, so we launched a new Python API with a new architecture in the recently released Apache Flink 1.9.
Flink is a unified computing engine. The community attaches great importance to and pays close attention to Flink users. In addition to Java language or Scala language, the community hopes to provide a variety of portals and ways to make it more convenient for more users to use Flink, and reap the value of Flink in big data's computing power. So since Flink 1.9, the Flink community has launched Python API with a completely new technology system, and has supported most of the commonly used operators, such as JOIN,AGG,WINDOW.
2.Python API-RoadMap
Although Python can use Java's User-defined Function in Flink 1.9, there is still a lack of definition of Python native's User-defined function, so we plan to support Python User-defined function in Flink 1.10. And the technology increases the support to the data analysis tool class library Pandas, and increases the support to DataStream API and ML API in Flink 1.11.
II. Python API architecture and development environment to build 1.Python Table API architecture
The new Python API architecture is divided into the user API part, the communication part between PythonVM and Java VM, and the part that finally submits the job to the Flink cluster to run. So how do PythonVM and JavaVM communicate? We will have a Gateway of Python on the Python side to maintain a link to communicate with Java, and a GateWayServer in the Java section to receive calls from the Python section.
With regard to the architecture of Python API, before 1.9, Flink's DataSet and DataStream already had support for Python API, but had two different sets of API for DataSet API and DataStream API. For Flink, a unified streaming computing engine, a unified architecture is very important. And for the existing Python DataSet API and DataStream API, the technical architecture of JPython is adopted, but JPython itself can not support the current Python 3.x series very well, so after the release of Flink 1.9, it is decided to abandon the original Python API architecture and appear as a brand-new technical architecture. This new set of Python API is based on Table API.
The communication between Table API and Python API uses a simple method, using Python VM and Java VM to communicate. In the process of writing or calling Python API, communicate with Java API in some way. Manipulating Python API is like manipulating Java's Table API. The following can be ensured in the new architecture:
There is no need to create a new set of operators, which can be easily consistent with the functions of Java's Table API.
Thanks to the existing Java Table API optimization model, the API written by Python can be optimized by using the Java API optimization model, which can ensure that the Job written by Python's API can also have the ultimate performance.
As shown in the figure, when Python initiates an object request for Java, an object is created in the Java segment and stored in a storage structure, and an ID is assigned to the Python side. After getting the ID of the Java object, the Python side can operate on the object, that is, the Python side can manipulate any Java object, which is why the new architecture can ensure that the functions of Python Table API and Java Table API are consistent, and can overuse the existing optimization model.
Under the new architecture and communication model, Python API calls Java API only by passing the name and parameters of the calling method to Java VM in the ID that holds the Java object, so the way to develop Python Table API in this architecture is exactly the same as that of Java Table API. Next, I will show you in detail how to develop a simple Python API job.
2.Python Table API-Job development
Generally speaking, a Python Table Job is generally divided into four parts, first of all, according to the current situation, we have to decide whether the Job is run in batch mode or stream mode. Of course, users may not consider the subsequent version, but the current version 1.9 still needs to be considered.
After deciding how to execute Job in the first step, we need to understand where the data comes from, how to define Source, structural data types, and so on. Then you need to write the calculation logic, and then you need to calculate the data, but the final result of the calculation needs to be persisted to some system. Finally, define Sink, similar to Source, we need to define Sink Schema, as well as each field type.
The following will share in detail how to write each step in Python API. First of all, we create an execution environment, for the execution environment itself, we first need an ExecutionEnvironment, basically we need a TableEnvironment. So in TableEnvironment, there is a parameter Table Config,Table Config that has some configuration parameters during execution that can be passed to the RunTime layer. In addition, it also provides some personalized configuration items that can be used in actual business development.
After getting the Environment, you need to define the data source table. Take the CSV format file as an example, separated by "comma", and use Field to indicate which fields are in the file. Then you will see that it is currently separated by a comma, and there is only one field called word and the type is String.
After defining and describing the data source data structure converted to Table data structure, that is to say, what kind of data structure and data type will be after the conversion to the Table API level? Now you will add fields and field types through with_schema. There is only one field here, and the data type is also String, which is finally registered as a table and registered in catlog, which can be used by the later query calculation.
Create a result table, and when the calculation is completed, you need to store these results in the persistence system. Take WordCount as an example, the storage table will first have a word and its count fields, one is the word of type String, the other is the count of Bigint, and then register it as Sink.
After writing and registering Table Sink, let's take a look at how to write logic. Its practical Python API writing WordCount is as simple as Table API. Because compared to DataSream, Python API only needs one line to write a WordCount. For example, group by, first scan the Source table, then group by a Word, then Select word and add aggregate statistics Count, and finally insert the most data result into the result table.
3.Python Table API-Environment Building
So how can WordCount really get up and running? First of all, you need to build a development environment. Different machines may install different versions of the software. Here are some version requirements and requirements, including the version on the sample machine in parentheses.
The second step is to build a binary distribution of Java to build from the source code, so this page is to get our trunk code from the source code and pull the 1.9 branch. Of course, you can use Mater, but Master is not stable enough, so it is recommended that you use 1.9 branches to do it in the process of learning. Next, carry on the actual combat exercise link, first verify the correctness of the PPT. First compile the code, as shown in the following example:
/ / download the source code git clone https://github.com/apache/flink.git// pull the 1.9branch cd flink; git fetch origin release-1.9git checkout-b release-1.9 origin/release-1.9// to build a binary distribution package mvn clean install-DskipTests-Dfast
After the compilation is complete, you need to find the distribution package in the appropriate directory:
Cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0tar-zcvf flink-1.9.0.tar.gz flink-1.9.0
After building the API for Java to verify, we will build a release package for Python.
Because most Python users know that we need pip install to integrate or install the dependent libraries we need with the local Python environment.
The same is true for Flink. PyFlink also needs to package a resource that Pypip can identify to install. In actual use, you can also copy it according to this command and try it in your own environment.
Cd flink-Python;Python setup.py sdist
The process is just to include the Java package, and then package some Java packages of your own PyFlink module with the Python package, which will have an apache-flink-1.9.dev0.tar.gz in the dist directory.
Cd dist/
The apache-flink-1.9.dev0.tar.gz in the dist directory is the PyFlink package that we can use for pip install. In version 1.9, in addition to Flink Table, there is also Flink Table Blink. Flink supports two plan at the same time. If you can try, we are free to switch between Flink's original Planner or Blink's Planner. After you have finished packing, you can try to install the package into our actual environment.
Next is a very simple command, first check the correctness of the command, before execution, we use pip to check list, we want to see if there is any in the existing package, now try to reinstall the package you just packaged. In the actual use process, if the upgrade version, there should also be this process, to install the new package.
Pip install dist/*.tar.gzpip list | grep flink
After the installation is complete, you can use the WordCount example you just wrote to verify that the environment is correct. Verify the correctness just now, how to verify it? For your convenience, you can clone the enjoyment.code warehouse directly.
Git clone https://github.com/sunjincheng121/enjoyment.code.gitcd enjoyment.code; Python word_count.py
Then experience and try. In this directory, the WordCount example we just developed. Use Python directly or verify that the environment is OK. At this time, Flink Python API will start a Cluster of Mini, execute the WordCount Job just now, and submit it to a Mini Cluster for execution. Now the Run process has actually been implemented on the cluster. In fact, in this code is to read a Source file, write the results to the CSV file, in the current directory, there is a Sink CSV. For specific steps, you can view the current situation and planning of Flink Chinese community video Apache Flink Python API.
The configuration of IDE is in the normal development process, in fact, most of us develop locally. It is recommended that you still use Pychram to develop Python-related logic or Job.
At the same time, due to the existence of a large number of screenshots, but also put these contents into the blog, you can scan the QR code to follow and view some detailed notes, blog address: https://enjoyment.cool. There is a very key point here, we should pay attention to, that is, there may be a variety of Python environments in your environment, and the environment you choose at this time must be the pip install environment just now. For details of the operation, please see the current situation and planning of Apache Flink Python API.
4.Python Table API-Job submission
Are there any other ways to submit a Job? This is a CLI way, that is, to actually submit to an existing cluster. Start a cluster first. The directory you build is usually under the target directory. If you want to start a cluster, you can start it directly. One thing to say here is that there is a Web Port outside one of the clusters, and the addresses of its ports are configured in flink-conf.yaml. According to the command in PPT, you can check the log to see if it started successfully, and then visit it from an external website. If the cluster starts normally, let's see how to submit the Job.
Flink submits the job through run. The sample code is as follows:
. / bin/flink run-py ~ / training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
To execute on the command line, in addition to using the PY parameter, you can also specify the module of Python, as well as other dependent resource files, JAR, and so on.
Version 1.9 also provides you with a more convenient way to write Python API and get results in an interactive way of Python Shell. There are two ways to execute, the first is Local, and the second is Remote, but there is no essential difference between the two. First, let's take a look at Local. The command is as follows:
Bin/pyflink-shell.sh local
Start a mini Cluster, when the output, there will be a Python Flink CLI will also have some sample programs for everyone to experience, according to the above case will be able to achieve the correct output and submission, you can write either Streaming or Batch. For detailed steps, please refer to the video operation.
By now, you should have a general understanding of the Python API architecture on Flink 1.9, as well as how to build a Python API environment. And use a simple WordCount example to experience how to execute programs in IDE and how to submit Job in a Flink run and interactive way. I also experienced some of the existing interactive ways to use Flink Python API. So after introducing some environment building of the whole Flink and a simple example. Let's take a look at all the core operators in 1.9 in detail.
3. Introduction and application of Flink Python API kernel operator 1.Python Table API operator
The process of creating a Job is shared above. The first is to choose whether to execute Streaming or Batch;, which is the second table to be used, Source, Schema, and data type. The third is to develop logic and use Count functions when writing WordCount. Finally, there are a lot of aggregate functions built into Python API, which can use count,sum, max,min, and so on.
So in the current version of Flink 1.9, it has been able to meet most of the general requirements. Except for the count I just mentioned. It is also supported in Flink Table API operator 1.9. With regard to Flink Table API operators, whether it is Python Table API or Java's Table API, there are the following types of operations. The operations on the first single stream, such as SELECT and Filter, can also be aggregated on the stream, including windows window aggregation of windowing functions and column operations, such as add_columns and drop_columns at the bottom.
In addition to single-stream, there are double-stream operations, such as dual-stream JOIN, dual-stream minus, union, these operators provide good support in Python Table API. Python Table API is almost identical to Java Table API from a functional point of view in Flink 1.9. let's look at the actual code to see how the above operators are written and how to develop Python operators.
2.Python Table API operator-Watermark definition
Careful students may notice that we have not mentioned a characteristic of flow-> timing. The characteristic of the flow is that the order in which it comes may be out of order, and this disorder is an objective state on the flow. In Flink, Watermark mechanism is generally used to solve the problem of disorder.
How to define Watermark in Python API? Suppose you have a JSON data, a field String,time field datetime. To define Watermark at this point is to add the rowtime column when you add Schema. Rowtime must be of type timestamps.
Watermark can be defined in many ways. In the figure above, watermarks_periodic_bounded sends Watermark periodically, with 60, 000 units in milliseconds. If the data is out of order, it can handle the disorder within one minute, so the larger the value, the higher the acceptance of data disorder, but the higher the delay of a little bit of data. You can check out my blog: http://1t.click/7dM about the principle of Watermark.
3.Python Table API-Java UDF
Finally, I would like to share with you the application of Java UDF in Flink 1.9. although UDF of Python is not supported in 1.9, Flink provides you with the ability to use Java UDF in Python. In Flink 1.9, the Table module is optimized and refactored. At present, the development of Java UDF only needs to introduce Flink common dependency to carry out Python API development.
Next with a specific example to introduce the use of Java UDF to develop Python API UDF, suppose we develop a string length of UDF, in Python we need to use the name of register_java_function,function in Java is the full path of the package. Then, when using it, you can complete the call to UDF with the registered name, which can be found in my Blog: http://1t.click/HQF for details.
Then how to implement it? You can use the Flink run command to execute it, and you need to carry UDF's JAR package with you.
Java UDF only supports Scalar Function? In fact, it supports both Scalar Function, Table Function and Aggregate Function in Java UDF. As follows:
Thank you for reading this article carefully. I hope the article "how to use Python API in Apache Flink" shared by the editor will be helpful to everyone. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.