Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to customize the Sink function of Redis in Flink

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

In this issue, the editor will bring you the Sink function about how to customize Redis in Flink. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

1. Add redis corresponding pom dependency

Org.apache.bahir flink-connector-redis_2.11 1.0

two。 Main function code:

Package com.hadoop.ljs.flink110.redis;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand Import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import scala.Tuple2;/** * @ author: Created By lujisen * @ company ChinaUnicom Software JiNan * @ date: 2020-05-02 10:30 * @ version: v1.0 * @ description: com.hadoop.ljs.flink110.redis * / public class RedisSinkMain {public static void main (String [] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment ()

DataStream source = senv.socketTextStream ("localhost", 9000); DataStream filter = source.filter (new FilterFunction () {@ Override public boolean filter (String value) throws Exception {if (null = = value | | value.split (","). Length! = 2) {return false;} return true;}}) DataStream keyValue = filter.map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {

String [] split = value.split (",")

Return new Tuple2 (split [0], split [1]);}}); / / FlinkJedisPoolConfig is used to create a stand-alone redis for redis. Cluster redis requires FlinkJedisClusterConfig FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder (). SetHost ("worker2.hadoop.ljs"). SetPort (6379). SetPassword ("123456a?"). Build ()

KeyValue.addSink (new RedisSink (redisConf, new RedisMapper () {@ Override public RedisCommandDescription getCommandDescription () {return new RedisCommandDescription (RedisCommand.HSET, "table1");} @ Override public String getKeyFromData (Tuple2 data) {return data._1 } @ Override public String getValueFromData (Tuple2 data) {return data._2;})); / * start execution * / senv.execute ();}}

3. Function test

1). Scoket sends data on Windows

Verification of 2.redis results

The above is how to customize the Sink function of Redis in Flink shared by Xiaobian. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report