In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
User-defined functions (UDF) are a key feature of most SQL environments and are mainly used to extend the built-in functionality of the system. UDF allows developers to abstract their low-level language implementations of new functions that are applied in higher-level languages such as SQL. Apache Spark is no exception, which provides various options for integrating UDF with Spark SQL workflows.
In this blog post, we will review simple examples of Apache Spark UDF and UDAF (user-defined aggregate functions) implementations on Python, Java, and Scala. We will also discuss important UDF API features and integration points, including current availability between distributions. All in all, we will introduce some important performance considerations to give you an idea of the options for leveraging UDF in your application.
Spark SQL UDFs
UDF converts the numerical values of a single row in the table, generating a single corresponding output value for each row. For example, most SQL environments provide a UPPER function that returns an uppercase version of the string as input.
Custom functions can be defined and registered as UDF in Spark SQL and have associated aliases that can be used for SQL queries. Here we will introduce you to a simple example where we will define a UDF to convert the temperature values in the following JSON data from Celsius to Fahrenheit:
The following example code registers our transformation UDF using the SQL alias CTOF, and then uses it to convert the temperature values for each city from the SQL query. For brevity, the creation of SQLContext objects and other boilerplate code is omitted, and links to a complete list are provided under each code snippet.
Python
Scala
Java
Note that Spark SQL defines the UDF1~UDF22 category and supports UDF with up to 22 input parameters. In the above example, UDF1 is used to process a single temperature value as input. If you fail to update the Apache Spark source code, using arrays (arrays) or structures (structs) as parameters may be helpful for applications that require more than 22 inputs; from a style point of view, this solution may be preferred if you find yourself using UDF6 or later.
Spark SQL UDAF function
User-defined aggregate functions (UDAF) can process multiple rows at the same time and then return a single value as a result, which is usually used with GROUP BY statements such as COUNT or SUM. To keep the example simple, we will implement a UDAF alias SUMPRODUCT that calculates the retail value of all vehicles based on usage groups, given prices, and integer quantities in inventory:
Currently, the implementation definition of Apache Spark UDAF is in the extended inherited UserDefinedAggregateFunction category and is supported by Scala and Java syntax. Once defined, we can illustrate and register our SumProductAggregateFunction UDAF object under the alias SUMPRODUCT and use it from the SQL query, which is roughly the same as the CTOF UDF in the previous example.
Scala
Other UDF support in Apache Spark
Spark SQL supports integration with existing Hive (Java or Scala) functions such as UDF, UDAF, and UDTF. By the way, UDTFs (user-defined table functions) can return multiple columns and rows-which is beyond the scope of this article, but we may cover it in future blog posts. Integrating existing Hive UDF is a valuable alternative to re-implementing and registering the same logic using the method highlighted in the previous example; it is also helpful for PySpark from a performance perspective, which will be discussed in the next section. Using the-jars option of spark-submit, the JAR file containing the Hive UDF function implementation can access the Hive function from HiveContext, and then use CREATE TEMPORARY FUNCTION to declare the function (including a UDF, as done in Hive [1]), as shown in the following example:
Hive UDF definition in Java
Access HiveUDF from Python
Note that as we implemented the UDF and UDAF functions above, Hive UDF can only be called using Apache Spark's SQL query language-that is, it cannot be used with Dataframe API's domain-specific language (DSL).
Alternatively, UDF implemented in the Scala and Java languages can be accessed from PySpark by including the implementation jar file (using the-jars option with spark-submit), and then access the UDF definition through the private reference executor JVM of the SparkContext object, the underlying Scala, or the Java UDF implementation loaded in the jar file. Holden Karau discussed this method in a wonderful speech [2]. Note that some of the Apache Spark private variables used in this technique are not officially targeted to end users. This has the added benefit of allowing the use of UDAF (which must now be defined in Java and Scala) for PySpark, as demonstrated in the following example using the SUMPRODUCT UDAF defined earlier in Scala:
Scala UDAF definition
Scala UDAF from PySpark
UDF-related features are being continuously added to every version of Apache Spark. For example, version 2.0 adds support for UDF in R. For reference, the following table summarizes the key features of each version discussed in this article:
The table summarizes the key features of the relevant versions introduced so far in this blog.
Performance consideration
It is important to understand the performance impact of Apache Spark UDF capabilities. For example, Python UDF (such as our CTOF function) causes data to be serialized between the executor JVM and the Python annotator running UDF logic-which greatly degrades performance compared to the UDF implementation in Java or Scala. Potential solutions to alleviate this serialization bottleneck include the following:
As described in the previous section, access the Hive UDF from PySpark. The Java UDF implementation can be accessed directly by the executor JVM. Note again that this method is only used to access UDF from Apache Spark's SQL query language. The use of this method can also refer to PySpark to access the UDF executed in Java or Scala, as shown in the Scala UDAF example we defined earlier.
In general, UDF logic should be as concise as possible, because it is possible that every line will be called. For example, when scaling to 1 billion lines, a step in the UDF logic takes 100ms to complete, which can quickly lead to significant performance problems.
Another important component of Spark SQL is the Catalyst query optimizer. This feature extends with each release and usually provides significant performance improvements for Spark SQL queries; however, any UDF implementation code may not be well understood for Catalyst (although analyzing the future functionality of bytecode [3] is considered to solve this problem). Therefore, using Apache Spark's built-in SQL query function usually results in the best performance, and should be the first way to consider when avoiding the introduction of UDF. Advanced users looking to leverage Catalyst to integrate more closely with their code can refer to the following Chris Fregly presentation [4], who uses Expression.genCode to optimize UDF code and uses the new Apache Spark 2.0 experimental feature [5], which provides a plug and play API for customizing Catalyst optimizer rules.
Conclusion
UDF is a very useful tool when the built-in functionality of Spark SQL needs to be extended. This blog article provides a walkthrough of UDF and UDAF implementations and discusses their integration steps to take advantage of existing Java Hive UDF in Spark SQL in Spark SQL. UDF can be implemented in Python, Scala, Java, and (in Spark 2.0) R, while UDAF can be implemented in UDAF as well as Scala and Java. When using UDF in PySpark, you must consider the cost of data serialization, and you should consider using the two strategies discussed above to solve this problem. Finally, we discussed Spark SQL's Catalyst optimizer and the performance considerations of insisting on using built-in SQL functions before introducing UDF into the solution.
Code https://github.com/curtishoward/sparkudfexamples
CDH version: 5.8.0 (Apache Spark 1.6.0)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.