In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "Flink how to write streaming data to redis". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how Flink writes streaming data into redis".
Background
As a high throughput storage system, redis is widely used in production. Today we mainly talk about how to write streaming data to redis and how to solve some problems encountered. There is no official connector to write to redis, so we use the connector provided in bahir-flink [1], another project of apache.
Introducing pom into the explanation of an example
Org.apache.flink
Flink-connector-redis_2.11
1.1.5
Construct a data source
Here we mainly simulate a piece of user information
/ / user,subject,province
Tuple3 tuple = Tuple3.of ("tom", "math", "beijing")
DataStream dataStream = bsEnv.fromElements (tuple); construct redis configuration stand-alone configuration FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder () .setHost ("10.160.85.185")
/ / optional .setPassword ("1234")
.setPort (6379)
.build ()
Cluster configuration InetSocketAddress host0 = new InetSocketAddress ("host1", 6379)
InetSocketAddress host1 = new InetSocketAddress ("host2", 6379)
InetSocketAddress host2 = new InetSocketAddress ("host3", 6379)
HashSet set = new HashSet ()
Set.add (host0)
Set.add (host1)
Set.add (host2)
FlinkJedisClusterConfig config = new FlinkJedisClusterConfig.Builder () .setNodes (set)
.build (); implement RedisMapper
We need to implement a class with the RedisMapper interface whose main function is to map our own input data to the corresponding type of redis.
Let's take a look at the RedisMapper interface. There are three methods in it:
GetCommandDescription: mainly to get what type of data we write, such as list, hash, and so on. GetKeyFromData: mainly extract keygetValueFromData from our input data: extract valuepublic interface RedisMapper extends Function from our input data, Serializable {
/ * *
* Returns descriptor which defines data type.
*
* @ return data type descriptor
, /
RedisCommandDescription getCommandDescription ()
/ * *
* Extracts key from data.
*
* @ param data source data
* @ return key
, /
String getKeyFromData (T data)
/ * *
* Extracts value from data.
*
* @ param data source data
* @ return value
, /
String getValueFromData (T data)
}
The getCommandDescription method returns a RedisCommandDescription object. Let's take a look at the construction method of RedisCommandDescription:
Public RedisCommandDescription (RedisCommand redisCommand, String additionalKey) {
.
}
Public RedisCommandDescription (RedisCommand redisCommand) {
This (redisCommand, null)
}
Taking the data writing hash structure as an example, we construct a RedisCommandDescription whose key is HASH_NAME.
New RedisCommandDescription (RedisCommand.HSET, "HASH_NAME")
The difference between the two construction methods is whether there is a second parameter, additionalKey, which is mainly for SORTED_SET and HASH structures, because these two structures need three variables, while the other structures only need two variables.
In the hash structure, the additionalKey corresponds to the data obtained by the key,getKeyFromData method of hash corresponds to the data obtained by the field,getValueFromData of hash corresponds to the value of hash.
Finally, our data can be written to the corresponding redis sink, and the redis data written is as follows:
Dynamically generate key
We can see that when we construct the hash structure of redis above, key is written dead, that is, only one key can be written. What if my key is generated dynamically?
For example, I have a similar need, streaming data is some student performance information, my key wants the student's name,field is the corresponding subject, and value is the corresponding score for this subject.
At present, the system does not provide such a function, but this is fine, there is nothing that can not be solved by changing the source code.
Let's take a look at the invoke method in RedisSink
Public void invoke (IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData (input)
String value = redisSinkMapper.getValueFromData (input)
Switch (redisCommand) {
.
Case HSET:
This.redisCommandsContainer.hset (this.additionalKey, key, value)
Break
Default:
Throw new IllegalArgumentException ("Cannot process such data type:" + redisCommand)
}
}
We see that for the hash structure, key and value are obtained from our RedisMapper implementation class, but additionalKey is not generated dynamically, we just need to change it here. Just get the additionalKey dynamically.
Public interface RedisMapper extends Function, Serializable {
RedisCommandDescription getCommandDescription ()
String getKeyFromData (T data)
String getValueFromData (T data)
String getAdditionalKey (T data)
}
We add a getAdditionalKey method to the RedisMapper interface and then implement it in the implementation class.
Then obtain the additionalKey dynamically in the invoke method of RedisSink. After modifying the source code, the method is as follows:
@ Override
Public void invoke (IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData (input)
String value = redisSinkMapper.getValueFromData (input)
String additionalKey = redisSinkMapper.getAdditionalKey (input)
Switch (redisCommand) {
.
Case HSET:
This.redisCommandsContainer.hset (additionalKey, key, value)
Break
Default:
Throw new IllegalArgumentException ("Cannot process such data type:" + redisCommand)
}
} Thank you for your reading. The above is the content of "how Flink writes streaming data into redis". After the study of this article, I believe you have a deeper understanding of how Flink writes streaming data into redis, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.