In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Lu Chunli's work notes are not as good as bad notes.
Flume1.6.0 adds full support for kafka:
Flume Sink and Source for Apache KafkaA new channel that uses Kafka
Kafka Source (http://flume.apache.org/FlumeUserGuide.html#kafka-source)
Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic.
File Channel (http://flume.apache.org/FlumeUserGuide.html#file-channel)
HBase Sink (http://flume.apache.org/FlumeUserGuide.html#hbasesink)
The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.
The topic generated by Kafka is myhbase
[hadoop@nnode kafka0.8.2.1] $bin/kafka-topics.sh-- create-- zookeeper nnode:2181,dnode1:2181,dnode2:2181-- replication-factor 1-- partitions 1-- topic myhbaseCreated topic "myhbase". [Hadoop @ nnode kafka0.8.2.1] $bin/kafka-topics.sh-- list-- zookeeper nnode:2181,dnode1:2181,dnode2:2181myhbasemykafkamytopic-marked for deletiontest-marked for deletion [hadoop@nnode kafka0.8.2.1] $
HBase table structure
[hadoop@nnode kafka0.8.2.1] $hbase shellHBase Shell; enter 'help' for list of supported commands.Type "exit" to leave the HBase ShellVersion 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015 Table name: t_inter_log column Family: cf
Flume profile
Vim conf/kafka-hbase.conf# read from kafka and write to hbaseagent.sources = kafka-sourceagent.channels = mem-channelagent.sinks = hbase-sink# sourceagent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.kafka-source.zookeeperConnect = nnode:2181,dnode1:2181 Dnode2:2181agent.sources.kafka-source.groupId = flumeagent.sources.kafka-source.topic = myhbaseagent.sources.kafka-source.kafka.consumer.timeout.ms = 10cm channelagent.channels.mem-channel.type = memory# sinkagent.sinks.hbase-sink.type = hbaseagent.sinks.hbase-sink.table = t_inter_logagent.sinks.hbase-sink.columnFamily = cf# agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer# assembleagent.sources.kafka-source.channels = Mem-channelagent.sinks.hbase-sink.channel = mem-channel
Start Kafka
[hadoop@nnode kafka0.8.2.1] # bin/kafka-server-start.sh config/server.properties
Start flume-ng
[hadoop@nnode flume1.6.0] $bin/flume-ng agent-- conf conf--name agent-- conf-file conf/kafka-hbase.conf-Dflume.root.logger=INFO,console
Implementing producer through Java Api
Package com.lucl.kafka.simple;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.apache.log4j.Logger;/** *
Copyright: Copyright (c) 2015
* *
Date: 2015-11-17 21:42:50
* *
Description: JavaApi for kafka producer
* * @ author luchunli * * @ version 1.0 * * / public class SimpleKafkaProducer {private static final Logger logger = Logger.getLogger (SimpleKafkaProducer.class); / * / private void execMsgSend () {Properties props = new Properties (); props.put ("metadata.broker.list", "192.168.137.117 metadata.broker.list"); props.put ("serializer.class", "kafka.serializer.StringEncoder") Props.put ("key.serializer.class", "kafka.serializer.StringEncoder"); props.put ("request.required.acks", "0"); ProducerConfig config = new ProducerConfig (props); logger.info ("set config info (" + config + ") ok."); Producer procuder = new Producer (config); String topic = "myhbase" String columnFamily = "cf"; String column = "count"; for (int I = 1 I scan 't_inter_log'ROW COLUMN+CELL 0 row (s) in 0.0140 secondshbase (main): 005VR 0 >
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.