In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In this issue, the editor will bring you the Sink function about how to customize Redis in Flink. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
1. Add redis corresponding pom dependency
Org.apache.bahir flink-connector-redis_2.11 1.0
two。 Main function code:
Package com.hadoop.ljs.flink110.redis;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand Import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import scala.Tuple2;/** * @ author: Created By lujisen * @ company ChinaUnicom Software JiNan * @ date: 2020-05-02 10:30 * @ version: v1.0 * @ description: com.hadoop.ljs.flink110.redis * / public class RedisSinkMain {public static void main (String [] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment ()
DataStream source = senv.socketTextStream ("localhost", 9000); DataStream filter = source.filter (new FilterFunction () {@ Override public boolean filter (String value) throws Exception {if (null = = value | | value.split (","). Length! = 2) {return false;} return true;}}) DataStream keyValue = filter.map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {
String [] split = value.split (",")
Return new Tuple2 (split [0], split [1]);}}); / / FlinkJedisPoolConfig is used to create a stand-alone redis for redis. Cluster redis requires FlinkJedisClusterConfig FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder (). SetHost ("worker2.hadoop.ljs"). SetPort (6379). SetPassword ("123456a?"). Build ()
KeyValue.addSink (new RedisSink (redisConf, new RedisMapper () {@ Override public RedisCommandDescription getCommandDescription () {return new RedisCommandDescription (RedisCommand.HSET, "table1");} @ Override public String getKeyFromData (Tuple2 data) {return data._1 } @ Override public String getValueFromData (Tuple2 data) {return data._2;})); / * start execution * / senv.execute ();}}
3. Function test
1). Scoket sends data on Windows
Verification of 2.redis results
The above is how to customize the Sink function of Redis in Flink shared by Xiaobian. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.