In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces how to print log files to local in kafka-Storm, the content is very detailed, interested friends can refer to, hope to be helpful to you.
Reading premise:
1: you may need to know something about the logback logging system
2: you may need to have a preliminary understanding of kafka
3: please refer to the business diagram of the system carefully before you check the code.
Because kafka has its own interface to "Hadoop", if you need to migrate files in kafka directly to HDFS, please refer to another blog post in this ID:
Business system-kafka-Storm [Log Localization]-2: pass logs directly to HDFS through kafka
1: system design diagram of a formal environment system:
Through kafka cluster, under 2 identical topic, through kafka-storm and he kafka-hadoop,2 Consumer, we divert 2 pipes for the same data:
One: real-time channel
Second: offline channel
In the process of log localization, in the early stage, due to log cleaning, the filtering work is placed in the Storm cluster, that is, the logs retained in the local locla. Is the data that we cleaned in the Storm cluster.
That is:
As shown in the following figure:
In kafka, generally speaking, there is the following code to handle:
Here we focus on two kinds of logs, and there are two Consumer for processing.
Package com.mixbox.kafka.consumer;public class logSave {public static void main (String [] args) throws Exception {Consumer_Thread visitlog = new Consumer_Thread (KafkaProperties.visit); visitlog.start (); Consumer_Thread orderlog = new Consumer_Thread (KafkaProperties.order); orderlog.start ();}}
Here, we save different data to different files according to different original fields.
Package com.mixbox.kafka.consumer;import java.io.UnsupportedEncodingException;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata / * * @ author Yin Shuai * / public class Consumer_Thread extends Thread {/ / in fact, we will generate a recording machine based on the passed topic name / / private Logger _ log_order = LoggerFactory.getLogger ("order"); / / private Logger _ log_visit = LoggerFactory.getLogger ("visit"); private Logger _ log = null; private final ConsumerConnector _ consumer; private final String _ topic Public Consumer_Thread (String topic) {_ consumer = kafka.consumer.Consumer .createJavaconsumerConnector (createConsumerConfig ()); this._topic = topic; _ log = LoggerFactory.getLogger (_ topic); System.err.println ("name of Java" + _ topic) } private static ConsumerConfig createConsumerConfig () {Properties props = new Properties (); props.put ("zookeeper.connect", KafkaProperties.zkConnect); / / here our group ID is logSave props.put ("group.id", KafkaProperties.logSave); props.put ("zookeeper.session.timeout.ms", "100000") Props.put ("zookeeper.sync.time.ms", "1000"); props.put ("auto.commit.interval.ms", "1000"); return new ConsumerConfig (props);} public void run () {Map topicCountMap = new HashMap (); topicCountMap.put (_ topic, new Integer (1)) Map consumerMap = _ consumer .createMessageStreams (topicCountMap); for (KafkaStream kafkaStream: consumerMap.get (_ topic)) {ConsumerIterator iterator = kafkaStream.iterator (); while (iterator.hasNext ()) {MessageAndMetadata next = iterator.next () Try {/ / here we split a Consumer to process the visit log logFile (next) System.out.println ("message:" + new String (next.message (), "utf-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace () } private void logFile (MessageAndMetadata next) throws UnsupportedEncodingException {_ log.info (new String (next.message (), "utf-8"));}}
A simple little tips:
Logback.xml, remind you that the configuration file here is too shallow. If necessary, please fill in by yourself.
F:/opt/log/test.%d {yyyy-MM-dd} .log% d {yyyy-MM-dd HH:mm:ss.SSS} [% thread]%-5level % logger {36} -% msg%n e:/logs/error/error.log ERROR ACCEPT DENY e:/logs/yuanshi-%d {yyyy-MM-dd} .log 10 % d {yyyy-MM-dd HH:mm:ss.SSS} [% thread]%-5level% logger {36} -% msg%n E:\ logs\ file\ file.log INFO ACCEPT DENY e:/logs/venality-%d {yyyy-MM-dd} .log 10 % d {yyyy-MM-dd HH:mm:ss.SSS} [% thread]%-5level% logger {36} -% msg%n E:\ logs\ visitlog\ visit.log % msg%n INFO E:\ logs\ visit.log.%d {yyyy-MM-dd} E:\ logs\ orderlog\ order.log% msg%n INFO E:\ logs\ order.log.%d {yyyy-MM-dd} so much about how to print log files to local in kafka-Storm I hope the above content can be of some help to you and learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.