In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
The main content of this article is to explain "what is the method of redis streaming data push to multi-users", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "what is the method of redising streaming data to multi-users"?
1 how to push when there are hundreds and thousands of users? Banning thread pools using single-thread asynchronous synchronous push.
2 the current logic:
Every time the project restarts: initialize the channel, disconnect the server-reconnect. Timer connection when a service cannot be connected.
This creates a problem: too many tasks for asynchronous connections cause the service to crash.
/ * when closing the connection * / @ Overridepublic void channelInactive (ChannelHandlerContext ctx) throws Exception {InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel (). RemoteAddress (); int port = ipSocket.getPort (); String host = ipSocket.getHostString (); String serverUrl = host + ":" + port; String prev = redisTemplate.opsForValue (). Get (SOCKET_CONNECT_PREFIX + serverUrl); Integer cur = Math.toIntExact (System.currentTimeMillis () / 1000) If (null = = prev | | cur-Integer.parseInt (prev) > 20 * 60) {redisTemplate.opsForValue () .set (SOCKET_CONNECT_PREFIX + serverUrl, cur + "); log.info (" server disconnect = "+ host + port +" reconnect after 20 minutes "); final EventLoop eventLoop = ctx.channel () .eventLoop (); Bootstrap bootstrap = defaultProcessHandler.getBootstrap (eventLoop) EventLoop.schedule (()-> defaultProcessHandler.doConnect (bootstrap, host + ":" + port, new AtomicInteger (0)), 20, TimeUnit.MINUTES);} else {log.warn (serverUrl + "less than 20 minutes since the last disconnection = =" + (cur-Integer.parseInt (prev)) + "s");} super.channelInactive (ctx);}
Initialize the connection 10 times at system startup:
3 how to ensure the integrity of the sent data TCP sticky packet problem
The server adds a decoder separated by a newline character
ServerBootstrap.childHandler (new ChannelInitializer () {@ Override protected void initChannel (Channel channel) {/ / this method is called every time the client connects and initializes the channel.
/ / get the pipeline chain (execution chain, handler chain) in the channel channel ChannelPipeline pipeline = channel.pipeline (); pipeline.addLast (new LineBasedFrameDecoder (Short.MAX_VALUE * 10)); pipeline.addLast (new StringDecoder ()); pipeline.addLast (new StringHandler ())
Log.info ("success to initHandler!");}})
4 reconnect after the push service is disconnected, and then continue to push at the location where it was sent last time. ChannelFuture future = channels.get (callbackUrl.getUrl ()); if (null! = future) {try {boolean result = sendMsg (future, pushDataStr); this.redisTemplate.opsForValue (). Set (callbackUrl.getUrl (), offset.toString ()); if (! result) {log.error ("this channel push failed {}", callbackUrl.getUrl (); returnVal = false;}} catch (Exception e) {log.error ("push exception", e) ReturnVal = false;}} return returnVal
When the push is successful, the user's data cursor data will be recorded to redis.
When the push service hangs up, the task is initialized and the location last read by each customer is read from redis. Offest is submitted to the task thread pool.
This problem also solves the problem that after the service is restarted, it can still be read from the location where the last read ended. The start cursor location of the read task: the cursor since the last successful processing by the service.
5 when the server that receives the data is disconnected again, how to ensure the location of the last read?
The BaseReceiver interface handler method returns the current cursor value each time the data is received. When the business processing is successful, return true. Subsequent data reading will be carried out automatically. When the receiving data server of the customer side dies, the automatic reconnection operation will be carried out first. At this time, the thread reading the datahub data is still returning the data but cannot be pushed successfully, so the cursor value will not move backward.
At this point, I believe you have a deeper understanding of "what is the method of redisstreaming data push multi-users". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.