In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article will explain in detail how to use sql to write streaming data to hive in flink. The content of the article is of high quality, so Xiaobian shares it with you as a reference. I hope you have a certain understanding of relevant knowledge after reading this article.
Modify hive configuration
The previous article introduced using sql to write streaming data to the file system. This time we will introduce using sql to write files to hive. If you want to write to an existing hive table, you need to add at least the following two attributes. Writing to hive is still the same as writing to the file system, so refer to the previous article for other specific configurations.
alter table table_name set TBLPROPERTIES ('is_generic'='false');
alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore');
//if you want to use eventtime partition
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time');
case explanation
Let's show you how to use java to build a flink program to write hives.
Introducing related pom
org.apache.flink
flink-connector-hive_${scala.binary.version}
${flink.version}
org.apache.hive
hive-exec
3.1.2
construct hive catalog //construct hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
Create hive table
If the corresponding hive table does not exist in the current system, you can create the table by executing the corresponding DDL table creation statement in the program. If it already exists, omit this code and use the hive command above to modify the existing table and add the corresponding attributes.
CREATE EXTERNAL TABLE `fs_table`(
`user_id` string,
`order_amount` double)
PARTITIONED BY (
`dt` string,
`h` string,
`m` string)
stored as ORC
TBLPROPERTIES (
'sink.partition-commit.policy.kind'='metastore',
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
Insert stream data into hive, String insertSql = "insert into fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
For the complete code please refer to:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java
Detailed explanation of pit problems encountered
For the above program and sql, if eventtime is configured,'sink.partition-commit.trigger'='partition-time' is configured in this program, and finally it is found that the program cannot commit partitions.
Analysis of the source code, the problem is out of this method, org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions Put the code first:
@Override
public List committablePartitions(long checkpointId) {
if (! watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s. ",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List needCommit = new ArrayList();
Iterator iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
//Extract partition time by partition value.
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
//Determine whether the watermark is greater than partition creation time + delay time
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
The system extracts the corresponding partition creation time through the partition value, and then compares it. For example, we set the pattern h:$m:00 , and at some point we are writing data to/2020-07-06/18/20/this partition. Then the program will get the pattern 2020-07-06 18:20:00 according to the partition value. This value is obtained according to the DATA_FORMAT function in sql.
This value is with the time zone, which is what I want. For example, our time zone is set to East Eight Zone, 2020-07-06 18:20:00. This time is the time of East Eight Zone. In exchange for standard UTC time, it is minus eight hours, that is, 2020-07-06 10:20:00. When the toMills function in the source code processes the time of East Eight Zone, it does not add any time zone processing. In fact, this should be the time of East Eight Zone as UTC time. The calculated value is 8 hours larger than the actual value, resulting in no submission of the partition.
If the partition we construct in the data source is UTC time, that is, the time without partition, then this logic is no problem, but this does not conform to our actual situation, for example, for partition 2020-07-06 18:20:00, I hope my partition must be the time of East Eight, rather than UTC time 2020-07-06 10:20:00, which is 8 hours smaller than East Eight.
So for the above situation, there are two solutions, one is to customize a partition extraction class, the second is to modify the source code, change the current default time partition extraction class. Personally, I think it is better to modify the default class, because the current configuration and concept of writing files and hive is a bit too much. I don't want to add too much configuration to increase the difficulty of users. I should try my best to use default values to make the program run well.
Let's take a look at the StreamingFileSink class in flink. By default, DateTimeBucketAssembler is used when constructing partition buckets. The partition path is constructed with the concept of time zone, and the local time zone is used by default.
public DateTimeBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
modifications
This problem, I do not know whether it is a bug, I submitted an ISSUE to the official, but the official did not adopt, but I think it does not conform to my habits, so I modified this function, so that partition.time-extractor.timestamp-pattern extracted partiiton is with time zone, default is the local time zone. If it is a non-local time zone, you can specify the time zone by using the parameter partition.time-extractor.time-zone. We can get the valid time zone by using the following code:
Set zoneIds = ZoneId.getAvailableZoneIds();
zoneIds.stream().forEach(System.out::println);
For example, we use Asia/Shanghai by default in East District 8.
I'm based on the community's flink tag release-1.11.0-rc4, and I changed the code to put it on github.
About how to use sql in flink to write streaming data to hive to share here, I hope the above content can be of some help to everyone, you can learn more knowledge. If you think the article is good, you can share it so that more people can see it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.