Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Spark Session with structured processing

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

小编给大家分享一下结构化处理之Spark Session的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

创建DataFrame,有三种模式,一种是sql()主要是访问Hive表;一种是从RDD生成DataFrame,主要从ExistingRDD开始创建;还有一种是read/format格式,从json/txt/csv等数据源格式创建。

先看看第三种方式的创建流程。

1、read/format

def read: DataFrameReader = new DataFrameReader(self)

SparkSession.read()方法直接创建DataFrameReader,然后再DataFrameReader的load()方法来导入外部数据源。load()方法主要逻辑如下:

def load(paths: String*): DataFrame = { sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap).resolveRelation()) }

创建对应数据源类型的DataSource,DataSource解析成BaseRelation,然后通过SparkSession的baseRelationToDataFrame方法从BaseRelation映射生成DataFrame。从BaseRelation创建LogicalRelation,然后调用Dataset.ofRows方法从LogicalRelation创建DataFrame。DataFrame实际就是Dataset。

type DataFrame = Dataset[Row]

baseRelationToDataFrame的定义:

def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self, LogicalRelation(baseRelation)) }

Dataset.ofRows方法主要是将逻辑计划转换成物理计划,然后生成新的Dataset。

2、执行

SparkSession的执行关键是如何从LogicalPlan生成物理计划。我们试试跟踪这部分逻辑。

def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>

plan.executeCollect().head.getLong(0)

}

Dataset的count()动作触发物理计划的执行,调用物理计划plan的executeCollect方法,该方法实际上会调用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定义。

3、HadoopFsRelation

需要跟踪下如何从HadoopFsRelation生成物理计划(也就是SparkPlan)

通过FileSourceStrategy来解析。它在FileSourceScanExec上叠加Filter和Projection等操作,看看FileSourceScanExec的定义:

case class FileSourceScanExec( @transient relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], overrideval metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan {。。。}

它的主要执行代码doExecute()的功能逻辑如下:

protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { // in the case of fallback, this batched scan should never fail because of: // 1) only primitive types are supported // 2) the number of columns should be smaller than spark.sql.codegen.maxFields WholeStageCodegenExec(this).execute() } else { val unsafeRows = { val scan = inputRDD if (needsUnsafeRowConversion) { scan.mapPartitionsWithIndexInternal { (index, iter) => val proj = UnsafeProjection.create(schema) proj.initialize(index) iter.map(proj) } } else { scan } } val numOutputRows = longMetric("numOutputRows") unsafeRows.map { r => numOutputRows += 1 r } } }

inputRDD有两种方式创建,一是createBucketedReadRDD,二是createNonBucketedReadRDD。两者没有本质的区别,仅仅是文件分区规则的不同。

private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => createNonBucketedReadRDD(readFile, selectedPartitions, relation) } }createNonBucketedReadRDD调用FileScanRDD :new FileScanRDD(fsRelation.sparkSession, readFile, partitions)以上是"结构化处理之Spark Session的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report