In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the introduction of the function of Apache Spark window". In the daily operation, I believe that many people have doubts about the introduction of the function of Apache Spark window. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "introduction to the function of Apache Spark window". Next, please follow the editor to study!
Create Spark DataFrame
Now, let's create a sample Spark DataFrame, which we will use throughout the blog. First, let's load the required libraries.
Import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._
Now, we will create a DataFrame with some virtual data that will be used to discuss various window functions.
Case class Salary (depName: String, empNo: Long, salary: Long) val empsalary = Seq (Salary ("sales", 1, 5000), Salary ("personnel", 2, 3900), Salary ("sales", 3, 4800), Salary ("sales", 4, 4800), Salary ("personnel", 5, 3500), Salary ("develop", 7, 4200), Salary ("develop", 8, 6000), Salary ("develop", 9, 4500), Salary ("develop", 10, 5200) Salary ("develop", 11, 5200). ToDF ()
This is what our DataFrame looks like:
Window set function
Let's look at some aggregate window functions and see how they work.
First, we need to define the specification of the window. Suppose we want to get the summary data according to the department. Therefore, in this example, we will define the window based on the department name (column: depname).
Create a window specification for aggregate functions
Val byDepName = Window.partitionBy ("depName")
Apply aggregate functions on the window
Now, within the department (column: depname), we can apply various aggregate functions. So let's try to find the maximum and minimum wages for each department. Here, we select only the desired columns (depName,max_salary and min_salary) and delete the duplicate records.
Val agg_sal = empsalary .withColumn ("max_salary", max ("salary") .over (byDepName)) .withColumn ("min_salary", min ("salary") .over (byDepName)) agg_sal.select ("depname", "max_salary", "min_salary") .dropDuplicates () .show ()
Output:
+-+ | depname | max_salary | min_salary | +-+ | develop | 6000 | 4200 | | sales | 5000 | 4800 | | personnel | 3900 | 3500 | +-+
Now let's see how it works. We have partitioned the data by department name:
Now, when we execute the aggregate function, it will be applied to each partition and return the total value (in this case, min and max).
Note: the available summary functions are maximum, minimum, sum, average, and count.
Window ranking function
In this section, we will discuss several types of ranking functions.
Create a window specification for sorting functions
Now, we have to rank our employees according to their salaries in the department. The highest-paid employees will be ranked first, and the lowest-paid employees will be ranked last. Here, we will partition the data based on the department (column: depname), and within the department, we will sort the data in descending order according to salary.
For each department, the records will be sorted in descending order according to salary.
1. Level function: level
This function returns the level of each record in the partition and skips any subsequent levels after repetition:
Val rank_df = empsalary.withColumn ("rank", rank () .over (winSpec)) rank_df.show ()
Output:
+-+ | depName | empNo | salary | rank | +-- + | develop | 8 | 6000 | 1 | develop | 11 | 5200 | 2 | develop | 10 | 5200 | | develop | 9 | 4500 | 4 | develop | 7 | 4200 | 5 | sales | 1 | 5000 | 1 | sales | 4 | 4800 | 2 | sales | 3 | 4800 | 2 | personnel | 2 | 3900 | 1 | personnel | 5 | 3500 | 2 | +-- +
Here we can see that some levels are repeated and some levels are lost. For example, in the development department, we have 2 employees with level = 2 and no employees with level = 3, because the rating function will retain the same level for the same value and skip the next level accordingly.
two。 Density level: densed_rank
Val dense_rank_df = empsalary.withColumn ("dense_rank", dense_rank () .over (winSpec)) dense_rank_df.show ()
Output:
+-+ | depName | empNo | salary | desnse_rank | +-+ | develop | 8 | 6000 | 1 | develop | 10 | 5200 | 2 | develop | 11 | 5200 | 2 | develop | 9 | 4500 | 3 | develop | 7 | 4200 | 4 | sales | 1 | 5000 | 1 | sales | 3 | 4800 | 2 | sales | 4 | 4800 | 2 | personnel | 2 | 3900 | 1 | personnel | 5 | 3500 | 2 | +-+
Here, we can see that some levels are repeated, but the rankings are not as lost as when we use the level feature. For example, in the development department, we have 2 employees with a level of 2. The density_rank function retains the same level for the same value, but does not skip the next level.
3. Line number function: row_number
If the sorted column values of two rows are the same, it is uncertain which row number will be assigned to each row with the same value.
Val row_num_df = empsalary.withColumn ("row_number", row_number () .over (winSpec)) row_num_df.show ()
Output:
+-+ | depName | empNo | salary | row_number | +-+ | develop | 8 | 6000 | 1 | develop | 10 | 5200 | 2 | develop | 11 | 5200 | 3 | develop | 9 | 4500 | 4 | develop | 7 | 4200 | 5 | | sales | 1 | 5000 | 1 | sales | 3 | 4800 | 2 | sales | 4 | 4800 | 3 | personnel | 2 | 3900 | 1 | | personnel | 5 | 3500 | 2 | +-+
4. Percentage ranking function: percent_rank
Val percent_rank_df = empsalary.withColumn ("percent_rank", percent_rank () .over (winSpec)) percent_rank_df.show ()
Output:
+-+ | depName | empNo | salary | percent_rank | +-+ | develop | 8 | 6000 | 6000 | develop | 10 | 5200 | develop | 11 | 5200 | 0.25 | | develop | 9 | 4500 | | develop | 7 | 4200 | sales | 1 | 5000 | sales | 3 | 4800 | sales | 4 | 4800 | | personnel | 2 | 3900 | personnel | 5 | 3500 | 1.0 | +-+ |
5. N-tile function: ntile
This feature can further subdivide the window into n groups according to the window specification or partition. For example, if you need to further divide the department into three categories, you can specify ntile as 3.
Val ntile_df = empsalary.withColumn ("ntile", ntile (3) .over (winSpec)) ntile_df.show ()
Output:
+-+ | depName | empNo | salary | ntile | +-+ | develop | 8 | 6000 | 1 | develop | 10 | 5200 | 1 | develop | 11 | 5200 | 2 | develop | 9 | 4500 | 2 | develop | 7 | 4200 | 3 | sales | 1 | 5000 | 1 | sales | 3 | 4800 | 2 | sales | 4 | 4800 | 3 | | personnel | 2 | 3900 | 1 | | personnel | 5 | 3500 | 2 | +-+
Window analysis function
Next, we will discuss analysis functions such as cumulative distribution, lag, and lead.
1. Cumulative distribution function: cume_dist
This function provides a cumulative distribution of values for the window / partition.
Val winSpec = Window.partitionBy ("depName"). OrderBy ("salary") val cume_dist_df = empsalary.withColumn ("cume_dist", cume_dist (). Over (winSpec)) cume_dist_df.show ()
Define the window specification and apply the cume_dist function to obtain the cumulative distribution.
Output:
+-+ | depName | empNo | salary | cume_dist | +-+ | develop | 7 | 4200 | 4200 | | develop | 9 | 4500 | | develop | 10 | 5200 | develop | 11 | 5200 | | develop | 8 | 6000 | sales | 4 | 4800 | 0.666666666666666666 | | sales | 3 | 4800 | 0.666666666666666666 | | sales | 1 | 5 | personnel | 5 | 3,500 | personnel | 2 | 3900 | 1.0 | +-| +-+
two。 Lag function: lag
This function returns this value before offsetting the row from DataFrame.
The lag function takes three arguments (lag (col,count = 1, default = None)), and col: defines the column on which the function needs to be applied. Count: how many lines need to be reviewed. Default: define default values.
Val winSpec = Window.partitionBy ("depName"). OrderBy ("salary") val lag_df = empsalary.withColumn ("lag", lag ("salary", 2) .over (winSpec)) lag_df.show ()
Output:
+-+ | depName | empNo | salary | lag | +-- + | develop | 7 | 4200 | null | develop | 9 | 4500 | null | develop | 10 | 4200 | develop | 11 | 5200 | 4500 | develop | 8 | 6000 | 5200 | sales | 4 | 4800 | null | sales | 3 | null | sales | 1 | 5000 | personnel | 5 | null | personnel | 2 | 3900 | null | +- -- +
For example, let's look for 2 lines of salary before the current line.
For depname = develop,salary = 4500. There is no such line, which is 2 lines before the line. So it will be empty.
For department name = development, salary = 6000 (highlighted in blue). If we advance two rows, we will get 5200 salary (highlighted in green).
3. Lead function: lead
This function returns the value after the offset row of DataFrame.
Val winSpec = Window.partitionBy ("depName"). OrderBy ("salary") val lead_df = empsalary.withColumn ("lead", lead ("salary", 2) .over (winSpec)) lead_df.show ()
The lead function takes three arguments (lead (col,count = 1, default = None)) col: defines the column on which the function needs to be applied. Count: for the current row, how many rows do we need to look forward / backward. Default: define default values.
Output:
+-+ | depName | empNo | salary | lag | +-- + | develop | 7 | 4200 | 5200 | develop | 9 | 4500 | 5200 | develop | 10 | 5200 | | develop | 11 | 5200 | null | develop | 8 | 6000 | null | sales | 3 | 4800 | sales | 4 | null | sales | 1 | 5000 | null | personnel | 5 | null | personnel | 2 | 3900 | null | +- -- +
Let's try to find the salary of the forward / last two lines from the current line.
For depname = developer, salary = 4500 (highlighted in blue). If we go forward / backward two lines, we will get 5200 salary (highlighted in green).
For depname = personnel, salary = 3500. There is no row forward 2 rows / after this row in this partition. So we will get a null value.
Custom window definition
By default, the boundaries of the window are defined by the partition column, and we can specify the order through the window specification. For example, for a development department, the window begins with the minimum salary and ends with the maximum salary.
But what if we want to change the boundaries of the window? The following features can be used to define windows within each partition.
1. RangeBetween
Using the rangeBetween function, we can explicitly define boundaries. For example, starting with the current salary, define it as 100, then define it as 300, and see what it means. Starting at 100 means that the window will start at 100 units and end with 300 values from the current value (including start and end values).
Val winSpec = Window.partitionBy ("depName") .orderBy ("salary") .rangeBetween (100L, 300L)
Define window specifications
The L after the start and end values indicates that the value is of type Scala Long.
Val range_between_df = empsalary.withColumn ("max_salary", max ("salary") .over (winSpec)) range_between_df.show ()
Apply custom window specification
Output:
+-+ | depName | empNo | salary | max_salary | +-+ | develop | 7 | 4200 | 4500 | develop | 9 | 4500 | null | develop | 10 | 5200 | null | develop | 11 | 5200 | null | develop | 8 | 6000 | null | sales | 3 | 4800 | 5000 | | sales | 4 | 4800 | 5000 | | sales | 1 | 5000 | null | | personnel | 5 | 3500 | null | | personnel | 2 | 3900 | null | +-+
Now, let's try to understand the output.
For depname = developer,salary = 4200, the window will start with (current value + start), that is, 4200 + 100 = 4300. The end of the window will be (current value + end), that is, 4200 + 4500.
Since only one salary value is between 4300 and 4500, including 4500 of the development department, we use 4500 as max_salary as 4200 (check output above).
Similarly, for depname = develop,salary = 4500, the window will be (start: 4500 + 4600, end: 4500 + 300 = 4800). But the development department does not have a salary value between 4600 and 4800, so the maximum value will not be empty (check output above).
There are some special boundary values that can be used.
Window.currentRow: specifies the current value in a row.
Window.unboundedPreceding: this can be used to make the window start indefinitely.
Window.unbounded below: this method can be used to make a window have an infinite end.
For example, we need to find the maximum wage from the employee's salary, which is greater than 300. Therefore, we define the start value as 300L and the end value as Window.unboundedFollowing:
Val winSpec = Window.partitionBy ("depName"). OrderBy ("salary") .rangeBetween (300L, Window.unboundedFollowing) val range_unbounded_df = empsalary.withColumn ("max_salary", max ("salary") .over (winSpec) range_unbounded_df.show ()
Output:
+-+ | depName | empNo | salary | max_salary | +-+ | develop | 7 | 4200 | 6000 | develop | 9 | 4500 | 6000 | develop | 10 | 5200 | 6000 | develop | 11 | 5200 | 6000 | develop | 8 | 6000 | null | sales | 3 | 4800 | Null | | sales | 4 | 4800 | null | | sales | 1 | 5000 | null | | personnel | 5 | 3500 | 3900 | | personnel | 2 | 3900 | null | +-+
Therefore, for depname = personnel, the salary is 3500. The window will be (start: 3500 + 300 = 3800, end: borderless). Therefore, the maximum value in this range is 3900 (check the output above).
Similarly, for depname = sales,salary = 4800, the window will be (start: 4800 + 5100, end: borderless). Because the value of the sales department is not greater than 5100, the result is empty.
2.rowsBetween
With rangeBetween, we use the values of the sort order to define the beginning and end of the window. However, we can also use the relative row position to define the beginning and end of the window.
For example, we want to create a window where the window begins with a line before the current line and ends with a line after the current line.
Define a custom window specification
Val winSpec = Window.partitionBy ("depName") .orderBy ("salary") .rowsBetween (- 1,1)
Apply custom window specification
Val rows_between_df = empsalary.withColumn ("max_salary", max ("salary") .over (winSpec)) rows_between_df.show ()
Output:
+-+ | depName | empNo | salary | max_salary | +-+ | develop | 7 | 4200 | 4500 | develop | 9 | 4500 | 5200 | develop | 10 | 5200 | 5200 | develop | 11 | 5200 | 6000 | develop | 8 | 6000 | 6000 | sales | 3 | 4800 | 4800 | | sales | 4 | 4800 | 5000 | | sales | 1 | 5000 | 5000 | | personnel | 5 | 3500 | 3900 | | personnel | 2 | 3900 | 3900 | +-+
Now, let's try to understand the output.
For depname = development, salary = 4500, a window is defined that is one line before and after the current line (highlighted in green). Therefore, the salary in the window is (4200, 4500, 5200), with a maximum of 5200 (check output above).
Similarly, for depname = sales,salary = 5000, a window is defined before and after the current line. Since there are no rows after this line, the window will have only 2 lines (highlighted in green) with a salary of 4800 million 5000 and a max of 5000 (check output above).
We can also use special boundaries Window.unboundedPreceding,Window.unboundedFollowing and Window.currentRow as we did with rangeBetween before.
Note: rowsBetween doesn't need sorting, but I use it to keep the results consistent with each run.
At this point, the study of "introduction to the function of Apache Spark window" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.