In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Home page blog address: Spark section
Https://blog.icocoro.me
The way Spark handles max and min of string dates
The way Spark handles data storage in Hive
The way Spark handles new columns map and udf, functions
Spark handles the use of row transfer column pivot
Python 3.5.3
Spark1.6.2
The way Spark handles max and min of string dates
Generally, the date of string type is incorrect when using Spark's agg to find max. API display only supports numeric max and min.
Hive's SQL query engine supports max and min with string dates.
String date is converted to timestamp and then aggregated
Unix_timestamp
Public static Column unix_timestamp (Column s) Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.Parameters:s-(undocumented) Returns: (undocumented) Since:1.5.0from pyspark.sql import functions as Fdf.withColumn ('startuptime_stamp', F.unix_timestamp (' startuptime')) use HiveSQLselect device_id, max (startuptime) as max_startuptime Min (startuptime) how as min_startuptime from app_table group by device_idSpark handles data storage to Hive
Usually, the result data after Spark task processing is stored in the Hive table, which can be saved to the HDFS directory first and then load. It is most convenient to insert the data directly using temporary tables and HiveContext.
SaveAsTextFile & load data
Repartition adjusts according to the actual file size. When the data is relatively small, save it as a file.
Df.map (lambda r: func) .repartition (1) .saveAsTextFile (data_dir)
Delete the partition first, if it already exists
Then overwrite the original data [to facilitate re-running or repairing the data]
Shell is used here, or HiveContext's sql can be used.
Alter table app_table drop if exists partition (datestr='$day_01'); load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition (datestr='$day_01') Hivectx.sql & insertapp_table1_df.registerTempTable ("app_table1_tmp") app_table2_df.registerTempTable ("app_table2_tmp") hivectx.sql ("set spark.sql.shuffle.partitions=1") hivectx.sql ("alter table app_table drop if exists partition (datestr='%s')"% daystr) hivectx.sql ("insert overwrite table app_table partition (datestr='%s') select * from app_table1_tmp"% daystr) hivectx.sql ("insert into app_table partition" (datestr='%s') select * from app_table2_tmp "% daystr) the way Spark handles new columns map and udf, Functions
When dealing with data transformation, Spark usually needs to use operations such as map, flatmap, etc., in which the use of map will generate a new column or modify the value of a column field
Spark also supports custom functions UDF and provides a variety of handlers similar to Hive built-in functions.
Map
Functions and StructType need to be defined
Ignore the details and accuracy of numerical judgment.
From pyspark.sql.types import * def a_func (_): return _ ['id'], _ [' cnt1'], _ ['cnt2'], _ [' cnt1'] / (_ ['cnt1'] + _ [cnt1']) a_schema = StructType ([StructField (' id', StringType (), True), StructField ('cnt1', IntegerType (), True), StructField (' cnt2', IntegerType (), True), StructField ('cnt1_rate') IntegerType (), True)]) a_new_df = sqlctx.createDataFrame (df.select ('id',' cnt1', 'cnt2') .map (a_func), a_schema) udf
Functions and UDF need to be defined
Ignore the details and accuracy of numerical judgment.
Def a_func (cnt1, cnt2): return cnt1 / (cnt1 + cnt2) a_udf = F.udf (a_func, IntegerType ()) a_new_df = df.withColumn ('cnt1_rate', a_udf (df [' cnt1'], df ['cnt2']) functions
Dealing with format conversions such as date strings, etc.
Https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html
Spark handles the use of row transfer column pivot
When using SQL to query data, in many cases, it is necessary to convert rows into columns to facilitate the display of data and the utilization of different dimensional requirements.
Generally speaking, it can be in the form of subquery case when, continuous join and field completion union.
It can be realized in DataFrame of Spark through the pivot function of GroupedData.
Df.groupBy (['course_name']). Pivot (' daystr'). Sum ('score') df.groupBy ([' course_name']). Pivot ('daystr'). Count ()
Before conversion
Daystr course_name score2017-11-15 yuwen 12017-11-15 yuwen 12017-11-15 shuxue 12017-11-15 yingyu 22017-11-16 yuwen 12017-11-16 shuxue 12017-11-16 yingyu 2
After conversion
Course_name 2017-11-15 2017-11-16yuwen 2 1shuxue 1 1yingyu 2 2course_name 2017-11-15 2017-11-16yuwen 2 1shuxue 1 1yingyu 11
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.