Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Knowledge points-Spark section

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Home page blog address: Spark section

Https://blog.icocoro.me

The way Spark handles max and min of string dates

The way Spark handles data storage in Hive

The way Spark handles new columns map and udf, functions

Spark handles the use of row transfer column pivot

Python 3.5.3

Spark1.6.2

The way Spark handles max and min of string dates

Generally, the date of string type is incorrect when using Spark's agg to find max. API display only supports numeric max and min.

Hive's SQL query engine supports max and min with string dates.

String date is converted to timestamp and then aggregated

Unix_timestamp

Public static Column unix_timestamp (Column s) Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.Parameters:s-(undocumented) Returns: (undocumented) Since:1.5.0from pyspark.sql import functions as Fdf.withColumn ('startuptime_stamp', F.unix_timestamp (' startuptime')) use HiveSQLselect device_id, max (startuptime) as max_startuptime Min (startuptime) how as min_startuptime from app_table group by device_idSpark handles data storage to Hive

Usually, the result data after Spark task processing is stored in the Hive table, which can be saved to the HDFS directory first and then load. It is most convenient to insert the data directly using temporary tables and HiveContext.

SaveAsTextFile & load data

Repartition adjusts according to the actual file size. When the data is relatively small, save it as a file.

Df.map (lambda r: func) .repartition (1) .saveAsTextFile (data_dir)

Delete the partition first, if it already exists

Then overwrite the original data [to facilitate re-running or repairing the data]

Shell is used here, or HiveContext's sql can be used.

Alter table app_table drop if exists partition (datestr='$day_01'); load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition (datestr='$day_01') Hivectx.sql & insertapp_table1_df.registerTempTable ("app_table1_tmp") app_table2_df.registerTempTable ("app_table2_tmp") hivectx.sql ("set spark.sql.shuffle.partitions=1") hivectx.sql ("alter table app_table drop if exists partition (datestr='%s')"% daystr) hivectx.sql ("insert overwrite table app_table partition (datestr='%s') select * from app_table1_tmp"% daystr) hivectx.sql ("insert into app_table partition" (datestr='%s') select * from app_table2_tmp "% daystr) the way Spark handles new columns map and udf, Functions

When dealing with data transformation, Spark usually needs to use operations such as map, flatmap, etc., in which the use of map will generate a new column or modify the value of a column field

Spark also supports custom functions UDF and provides a variety of handlers similar to Hive built-in functions.

Map

Functions and StructType need to be defined

Ignore the details and accuracy of numerical judgment.

From pyspark.sql.types import * def a_func (_): return _ ['id'], _ [' cnt1'], _ ['cnt2'], _ [' cnt1'] / (_ ['cnt1'] + _ [cnt1']) a_schema = StructType ([StructField (' id', StringType (), True), StructField ('cnt1', IntegerType (), True), StructField (' cnt2', IntegerType (), True), StructField ('cnt1_rate') IntegerType (), True)]) a_new_df = sqlctx.createDataFrame (df.select ('id',' cnt1', 'cnt2') .map (a_func), a_schema) udf

Functions and UDF need to be defined

Ignore the details and accuracy of numerical judgment.

Def a_func (cnt1, cnt2): return cnt1 / (cnt1 + cnt2) a_udf = F.udf (a_func, IntegerType ()) a_new_df = df.withColumn ('cnt1_rate', a_udf (df [' cnt1'], df ['cnt2']) functions

Dealing with format conversions such as date strings, etc.

Https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Spark handles the use of row transfer column pivot

When using SQL to query data, in many cases, it is necessary to convert rows into columns to facilitate the display of data and the utilization of different dimensional requirements.

Generally speaking, it can be in the form of subquery case when, continuous join and field completion union.

It can be realized in DataFrame of Spark through the pivot function of GroupedData.

Df.groupBy (['course_name']). Pivot (' daystr'). Sum ('score') df.groupBy ([' course_name']). Pivot ('daystr'). Count ()

Before conversion

Daystr course_name score2017-11-15 yuwen 12017-11-15 yuwen 12017-11-15 shuxue 12017-11-15 yingyu 22017-11-16 yuwen 12017-11-16 shuxue 12017-11-16 yingyu 2

After conversion

Course_name 2017-11-15 2017-11-16yuwen 2 1shuxue 1 1yingyu 2 2course_name 2017-11-15 2017-11-16yuwen 2 1shuxue 1 1yingyu 11

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report