In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the Flink source code streaming data into hive example analysis, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian with you to understand.
Flow chart
Data stream processing
This time, we mainly analyze how flink writes streaming data similar to kafka to the hive table. Let's start with a simple code:
/ / construct hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/ Users/user/work/hive/conf"; / / a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog (name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog ("myhive", hive); tEnv.useCatalog ("myhive") TEnv.getConfig () .setSqlDialect (SqlDialect.HIVE); tEnv.useDatabase ("db1"); tEnv.createTemporaryView ("kafka_source_table", dataStream) String insertSql = "insert into hive.db1.fs_table SELECT userId, amount," + "DATE_FORMAT (ts, 'yyyy-MM-dd'), DATE_FORMAT (ts,' HH'), DATE_FORMAT (ts, 'mm') FROM kafka_source_table"; tEnv.executeSql (insertSql)
At startup, the system will first parse sql and obtain the corresponding attributes, and then load all subclasses of TableFactory, including TableSourceFactory and TableSinkFactory, through java's SPI mechanism, and then determine which factory class to use according to the property cycle parsed from sql. The specific operation is implemented in the method of the TableFactoryUtil class.
For example, for the above sql, after parsing, you find that you are going to write a hive sink with the table name hive.db1.fs_table. So after calling the TableFactoryUtil#findAndCreateTableSink (TableSinkFactory.Context context) method, the system gets the subclass HiveTableFactory of TableSinkFactory, and then calls the corresponding createTableSink method to create the corresponding sink, that is, HiveTableSink.
Let's take a brief look at the variables and structure of HiveTableSink.
/ * Table sink to write to Hive tables. * / public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {private static final Logger LOG = LoggerFactory.getLogger (HiveTableSink.class); private final boolean userMrWriter; / / is bounded to distinguish between batch processing and streaming private final boolean isBounded; private final JobConf jobConf; private final CatalogTable catalogTable; private final ObjectIdentifier identifier; private final TableSchema tableSchema; private final String hiveVersion; private final HiveShim hiveShim; private LinkedHashMap staticPartitionSpec = new LinkedHashMap (); private boolean overwrite = false; private boolean dynamicGrouping = false
We see that it implements three interfaces: AppendStreamTableSink, PartitionableTableSink and OverwritableTableSink, which determine the function of hive sink. The data can only be in append mode, the data can be partitioned, and the data can be overwritten.
Class of these variables, look at the name will probably know what it means, do not explain, talk about HiveShim, we see in the constructor hiveShim is related to the version of hive, so in fact, we can understand this class as a layer of encapsulation for different hive versions of operation.
HiveShim = HiveShimLoader.loadHiveShim (hiveVersion)
The way tablesink handles data streams is consumeDataStream. Let's focus on the analysis.
Hive basic information acquisition
First, you will connect to the hive Metabase through the configuration of hive to get the basic information of the hive table.
String [] partitionColumns = getPartitionKeys (). ToArray (new String [0]); String dbName = identifier.getDatabaseName (); String tableName = identifier.getObjectName (); try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create (new HiveConf (jobConf, HiveConf.class), hiveVersion)) {Table table = client.getTable (dbName, tableName); StorageDescriptor sd = table.getSd ()
Get the information about the table of hive, that is, the Table object.
Get some storage information about the table, the StorageDescriptor object, which contains the storage path, storage format of the hive table, and so on.
Flow and critical judgment
Next, determine whether writing to hive is batch or streaming.
If (isBounded) {. / / batch} else {. / / streaming}
Since this time we mainly analyze the stream processing of flink, we will skip batch for a while and enter else, that is, stream processing.
Here, some basic configurations are defined:
Bucket allocator TableBucketAssigner, in short, is how to determine the partition of the data, such as by time, or by the value of the field, and so on.
Scrolling strategy, how to generate the next file, by time, or the size of the file, and so on.
Construct bulkFactory. Currently, only parquet and orc column storage formats use bulkFactory
/ / barrel allocator TableBucketAssigner assigner = new TableBucketAssigner (partComputer) / / Scroll policy TableRollingPolicy rollingPolicy = new TableRollingPolicy (true, conf.get (SINK_ROLLING_POLICY_FILE_SIZE). GetBytes (), conf.get (SINK_ROLLING_POLICY_ROLLOVER_INTERVAL). ToMillis ()) / / construct bulkFactory Optional bulkFactory = createBulkWriterFactory (partitionColumns, sd)
The createBulkWriterFactory method is mainly used to construct factory classes that are written into column storage formats. Currently, only parquet and orc formats are supported. First, some parameters used to construct factory classes, such as field type, name, etc., are defined, and then different factory classes are constructed according to different types. If it is in parquet format, the final ParquetWriterFactory factory class will be constructed. If it is in orc format, OrcBulkWriterFactory or OrcNoHiveBulkWriterFactory will be constructed according to the version of hive.
Write format judgment
If you are using MR's writer or row format, enter the if logic, use HadoopPathBasedBulkFormatBuilder, if it is a column storage format, enter the else logic, and use StreamingFileSink to write data.
If (userMrWriter | |! bulkFactory.isPresent ()) {HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory (recordWriterFactory) Builder = new HadoopPathBasedBulkFormatBuilder (new Path (sd.getLocation ()), hadoopBulkFactory, jobConf, assigner). WithRollingPolicy (rollingPolicy). WithOutputFileConfig (outputFileConfig) LOG.info ("Hive streaming sink: Use MapReduce RecordWriter writer.") } else {builder = StreamingFileSink.forBulkFormat (new org.apache.flink.core.fs.Path (sd.getLocation ()), new FileSystemTableSink.ProjectionBulkFactory (bulkFactory.get ()) PartComputer) .withBucketAssigner (assigner) .withRollingPolicy (rollingPolicy) .withOutputFileConfig (outputFileConfig) LOG.info ("Hive streaming sink: Use native parquet&orc writer.");}
In big data processing, column storage has better query efficiency than row storage, so this time we focus on column storage to talk about how StreamingFileSink writes column data. We can see from the code that when constructing the buckets builder, we used the bucket assigner that we just generated, the configuration of the output, and the strategy of file scrolling.
Construct a partition submission operator
At the end of the HiveTableSink#consumeDataStream method, we enter the FileSystemTableSink#createStreamingSink method, which mainly does two things, one is to create an operator StreamingFileWriter for stream writing, and the other is to construct an operator StreamingFileCommitter for submitting partition files when there is a partition column and the partition file commit policy is configured in the configuration file, which is fixed with only one degree of concurrency.
StreamingFileWriter fileWriter = new StreamingFileWriter (rollingCheckInterval, bucketsBuilder); DataStream writerStream = inputStream.transform (StreamingFileWriter.class.getSimpleName (), TypeExtractor.createTypeInfo (CommitMessage.class), fileWriter) .setParallelism (inputStream.getParallelism ()) DataStream returnStream = writerStream; / / save committer when we don't need it. If (partitionKeys.size () > 0 & & conf.contains (SINK_PARTITION_COMMIT_POLICY_KIND)) {StreamingFileCommitter committer = new StreamingFileCommitter (path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf) ReturnStream = writerStream .transform (StreamingFileCommitter.class.getSimpleName (), Types.VOID, committer) .setParallelism (1) .setMaxParallelism (1);}
We see that in the code, inputStream converts the submitted data into CommitMessage format through the transform method, and then sends it to its downstream StreamingFileCommitter operator, which means that StreamingFileCommitter will receive the data collected in StreamingFileWriter.
Detailed explanation of StreamingFileWriter
This StreamingFileWriter can be understood as an operator-level sink for writing to a file, which wraps the StreamingFileSink and then adds some other operations, such as submitting partition information, and so on. Let's take a brief look at the structure of this class and talk briefly about the role of each method.
Public class StreamingFileWriter extends AbstractStreamOperator implements OneInputStreamOperator, BoundedOneInput {@ Override public void initializeState (StateInitializationContext context) throws Exception {. } @ Override public void snapshotState (StateSnapshotContext context) throws Exception {. } @ Override public void processWatermark (Watermark mark) throws Exception {. } @ Override public void processElement (StreamRecord element) throws Exception {. } / * Commit up to this checkpoint id, also send inactive partitions to downstream for committing. * / @ Override public void notifyCheckpointComplete (long checkpointId) throws Exception {. } @ Override public void endInput () throws Exception {. } @ Override public void dispose () throws Exception {. }}
InitializeState: the method to initialize the state, where you construct the buckets to write to the file, the StreamingFileSinkHelper to write to the file, and so on.
SnapshotState: this method is mainly called every time checkpoint is performed.
The processWatermark method can be seen by name and deals with watermarks, such as sending watermarks downstream and so on.
ProcessElement: the core method of processing elements. Every piece of data will be processed in this method.
NotifyCheckpointComplete, which is called every time the checkpoint is completed. Here, some information about the partition to be submitted is collected for the partition submission.
EndInput: no more data is coming in, which is called at the end of the input.
Dispose: called at the end of the operator's life cycle.
Brief introduction of StreamingFileSink
StreamingFileSink Let's give a brief description, as we can see by the name, this is a sink for writing streaming data to the file system, which integrates checkpoint to provide exactly once semantics.
There is a concept of bucket in StreamingFileSink, which we can understand as a directory in which data is written, and multiple files can be written under each bucket. It provides a concept of BucketAssigner for generating bucket, and each incoming data determines which bucket to write when it is written. the default implementation is DateTimeBucketAssigner, which generates a bucket every hour.
It uses StreamingFileSink#forRowFormat or StreamingFileSink#forBulkFormat for corresponding processing according to different write formats.
In addition, the sink provides a RollingPolicy scrolling strategy to determine the data, such as how big the file arrives or how long it takes to close the current file and open the next new file.
For specific data written in ORC format, you can refer to this article: flink 1.11streaming data is written to file in ORC format. Since this time we are mainly talking about the process of writing hive as a whole, this sink will not be explained in detail.
Partition information submission
The StreamingFileWriter#notifyCheckpointComplete call commitUpToCheckpoint triggers the partition commit operation when the checkpoint is complete.
Private void commitUpToCheckpoint (long checkpointId) throws Exception {helper.commitUpToCheckpoint (checkpointId); CommitMessage message = new CommitMessage (checkpointId, getRuntimeContext (). GetIndexOfThisSubtask (), getRuntimeContext (). GetNumberOfParallelSubtasks (), new ArrayList (inactivePartitions)) Output.collect (new StreamRecord (message)); inactivePartitions.clear ();}
Here, we see that the CommitMessage object is constructed using inactivePartitions, and then the submission data is collected using output.collect, that is, the data collected here mentioned above will be sent to the StreamingFileCommitter operator for processing.
When did the data in inactivePartitions be added, that is, when will the partition to be submitted be generated? We trace the code and find that a listener is added to the buckets that writes to the file, triggers the listener after the bucket becomes inactive, and then adds the corresponding bucket id to the inactivePartitions collection.
@ Override public void initializeState (StateInitializationContext context) throws Exception {.. Buckets.setBucketLifeCycleListener (new BucketLifeCycleListener () {@ Override public void bucketCreated (Bucket bucket) {} @ Override public void bucketInactive (Bucket bucket) {inactivePartitions.add (bucket.getBucketId ()) });}
What triggers a notification that bucket becomes inactive? We can see from the code comments that after all the records that the bucket has received so far have been submitted, the bucket will become inactive.
Submit Partition operator
This is a single parallelism operator that is used to submit partition information written to the file system. The specific processing steps are as follows:
Collect partition information to be submitted from the upstream
Determine whether all subtasks have received partitioned data under a certain checkpoint
Gets the partition commit trigger. (partition-time and process-time are currently supported)
Use partition commit policies to submit partition information in turn (multiple partition policies can be configured)
Here we focus on how the StreamingFileCommitter#processElement method processes each submitted data.
@ Override public void processElement (StreamRecord element) throws Exception {CommitMessage message = element.getValue (); for (String partition: message.partitions) {trigger.addPartition (partition);} if (taskTracker = = null) {taskTracker = new TaskTracker (message.numberOfTasks) } boolean needCommit = taskTracker.add (message.checkpointId, message.taskId); if (needCommit) {commitPartitions (message.checkpointId);}}
We see that we receive the CommitMessage element from the upstream, then get the partition to be submitted from it, add it to the PartitionCommitTrigger (variable trigger), and then use taskTracker to determine whether each subtask of the checkpoint has received the partition data, and finally submit the partition information through the commitPartitions method.
Go to the commitPartitions method to see how the partition is submitted.
Private void commitPartitions (long checkpointId) throws Exception {List partitions = checkpointId = = Long.MAX_VALUE? Trigger.endInput (): trigger.committablePartitions (checkpointId); if (partitions.isEmpty ()) {return;} try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore ()) {for (String partition: partitions) {LinkedHashMap partSpec = extractPartitionSpecFromPath (new Path (partition)) LOG.info ("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); Path path = new Path (locationPath, generatePartitionPath (partSpec)); PartitionCommitPolicy.Context context = new PolicyContext (new ArrayList (partSpec.values ()), path) For (PartitionCommitPolicy policy: policies) {if (policy instanceof MetastoreCommitPolicy) {((MetastoreCommitPolicy) policy) .setMetastore (metaStore);} policy.commit (context) }}
Get all the partitions to be committed under the checkpoint from trigger and put them in a List collection partitions. If the submitted partition is not empty, loop through the partition commit policy PartitionCommitPolicy to be configured, and then commit the partition.
Partition commit trigger
At present, the system provides two kinds of triggers for partition submission, PartitionTimeCommitTigger and ProcTimeCommitTigger, which are used to deal with when the partition is submitted.
ProcTimeCommitTigger mainly depends on the creation time and delay of the partition. When the processing time is greater than 'partition creation time' +' delay', the partition will be committed.
PartitionTimeCommitTigger depends on the watermark and commits this partition when the value of the watermark is greater than partition-time + delay.
Partition submission policy
At present, the system provides an interface PartitionCommitPolicy for submitting partition information. At present, the system provides the following schemes.
One is METASTORE, which is mainly used to submit hive partitions, such as creating hive partitions, etc.
The other is SUCCESS_FILE, which is to write a success file to the corresponding partition directory.
In addition, the system also provides an external custom implementation for user-defined partition submission, such as merging small files after the partition is submitted. When customizing the submission policy, you need to implement the PartitionCommitPolicy interface and set the submission policy to custom.
I have also seen some examples on the Internet that implement this interface to merge small files, but I personally think it is actually a bit imperfect, because this merge of small files may involve a lot of problems:
How to ensure transactions when merging and how to avoid dirty reading during merging?
Transaction consistency, how to roll back if the merge goes wrong
Whether the performance of merging small files can keep up, at present, flink only provides a single parallelism commit operator.
How to merge writes with multiple concurrency
So for the time being, I didn't think of a perfect solution for flink to merge small files.
Thank you for reading this article carefully. I hope the article "sample Analysis of streaming data in Flink Source Code written into hive" shared by the editor will be helpful to you. At the same time, I also hope that you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.