Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Flume+Kafka+Storm+Redis builds big data real-time processing system: real-time statistics website PV, UV+ display

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

1 the common methods of big data's treatment

As mentioned earlier in my other article, "big data Collection, cleaning, processing: a complete case of offline data Analysis using MapReduce", the following diagram is still given:

The previous article is an offline data analysis case based on MapReduce, which processes the user access log generated by the website and analyzes the PV, UV and other data of the website on a certain day. Corresponding to the diagram above, it takes the data processing method of offline processing, and here is about to introduce another way of data processing, that is, online processing based on Storm, in the complete case given below. We will complete the following tasks:

1. How to build our real-time processing system (Flume+Kafka+Storm+Redis) 2. Real-time processing of the site's user access log, and statistics of the site's PV, UV3. The real-time analyzed PV and UV are dynamically displayed on our front page.

If you already know something about the big data components mentioned above, or are interested in how to build big data's real-time processing system, you can read the following.

It should be noted that the core is how to build a real-time processing system, and the case given here is real-time statistics of a website's PV and UV. In practice, based on everyone's different working environment, the business is different, so the complexity of the business system is also different. Relatively speaking, it is relatively simple to count the business of PV and UV. But it is also enough for us to have a basic and clear understanding of big data's real-time processing system. Yes, it is no longer so mysterious.

2 architecture of real-time processing system

The overall architecture of our real-time processing system is as follows:

That is, we can see from the above architecture that it consists of the following parts:

Flume cluster Kafka cluster Storm cluster

From the point of view of building a real-time processing system, what we need to do is how to get the data through different cluster systems (which can be well illustrated from the diagram above), that is, we need to do the pre-integration of each system, including the integration of Flume and Kafka, and the integration of Kafka and Storm. Of course, whether or not to use clusters in each environment depends on the actual needs of individuals. In our environment, Flume, Kafka, and Storm all use clusters.

3 Flume+Kafka integration

3.1 Integration thinking

For Flume, the key is how to collect data and send it to Kafka, and because of the way we use Flume clusters here, the configuration of Flume clusters is also critical. For Kafka, the key is how to receive data from Flume. On the whole, the logic should be relatively simple, that is, we can create a topic in Kafka for our real-time processing system, and then Flume can send the collected data to the topic.

3.2Integration process: Flume cluster configuration and Kafka Topic creation 3.2.1 Flume cluster configuration

In our scenario, two Flume Agent are deployed on two Web servers to collect log data on the Web server, and then the data is sent to another Flume Agent, so here we need to configure three Flume Agent.

3.2.1.1 Flume Agent01

The Flume Agent is deployed on a Web server to collect the generated Web logs, and then send them to Flume Consolidation Agent to create a new configuration file, flume-sink-avro.conf, with the following configuration contents:

# the main function is to listen for the new data in the file, collect the data, and output them to avro## Note: the operation of Flume agent is mainly to configure the A1 under source channel sink##, which is the code name of agent. Source is called R1 channel, called C1 sink, called k1###a1.sources = r1a1.sinks = k1a1.channels = k1a1.channels for source configuration description, new data in the listening file execa1.sources.r1.type = execa1.sources.r1.command = tail-F / home / uplooking/data/data-clean/data-access.log# configuration description for sink using avro logs for data consumption a1.sinks.k1.type = avroa1.sinks.k1.hostname = uplooking03a1.sinks.k1.port = 4444 channel configuration description using files as temporary caching of data this kind of security is higher a1.channels.c1.type = filea1.channels.c1.checkpointDir = / home/uplooking/data/flume/checkpointa1.channels. C1.dataDirs = / home/uplooking/data/flume/data# associates source R1 with sink K1 via channel C1 a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

After the configuration is completed, start Flume Agent to listen to the log file:

$flume-ng agent-- conf conf-n A1-f app/flume/conf/flume-sink-avro.conf > / dev/null 2 > & 1 & 3.2.1.2 Flume Agent02

The Flume Agent is deployed on a Web server to collect the generated Web logs, and then send them to Flume Consolidation Agent to create a new configuration file, flume-sink-avro.conf, with the following configuration contents:

# the main function is to listen for the new data in the file, collect the data, and output them to avro## Note: the operation of Flume agent is mainly to configure the A1 under source channel sink##, which is the code name of agent. Source is called R1 channel, called C1 sink, called k1###a1.sources = r1a1.sinks = k1a1.channels = k1a1.channels for source configuration description, new data in the listening file execa1.sources.r1.type = execa1.sources.r1.command = tail-F / home / uplooking/data/data-clean/data-access.log# configuration description for sink using avro logs for data consumption a1.sinks.k1.type = avroa1.sinks.k1.hostname = uplooking03a1.sinks.k1.port = 4444 configuration description for channel using files as temporary caching of data this kind of security is higher a1.channels.c1.type = filea1.channels.c1.checkpointDir = / home/uplooking/data/flume/checkpointa1.channels. C1.dataDirs = / home/uplooking/data/flume/data# associates source R1 with sink K1 via channel C1 a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

After the configuration is completed, start Flume Agent to listen to the log file:

$flume-ng agent-- conf conf-n A1-f app/flume/conf/flume-sink-avro.conf > / dev/null 2 > & 1 & 3.2.1.3 Flume Consolidation Agent

This Flume Agent is used to receive the data sent by the other two Agent, and then send it to Kafka to create a new configuration file, flume-source_avro-sink_kafka.conf, as follows:

# the main function is to listen to the new files in the directory, collect the data, and output them to kafka## Note: the operation of Flume agent is mainly to configure the A1 under source channel sink##, which is the code name of agent. Source called R1 channel called C1 sink called k1###a1.sources = r1a1.sinks = k1a1.channels = source configuration description listening avroa1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port Configuration description for sink using kafka for data consumption a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = f-k-sa1.sinks.k1.brokerList = uplooking01:9092 Uplooking02:9092,uplooking03:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 2 "configuration description for channel temporary cache of data using memory buffers a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10" associate source R1 with sink K1 through channel C1 a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

After the configuration is completed, start Flume Agent to listen to the data of avro:

$flume-ng agent-- conf conf-n A1-f app/flume/conf/flume-source_avro-sink_kafka.conf > / dev/null 2 > & 1 & 3.2.2 Kafka configuration

In our Kafka, first create a topic to receive the data collected by Flume:

Kafka-topics.sh-- create-- topic f-k-s-- zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181-- partitions 3-- replication-factor 33.3 Integration Verification

Start the consumption script for Kafka:

$kafka-console-consumer.sh-topic f-k-s-zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

If there is new log data on the Web server, it will be monitored by our Flume program and eventually transferred to the f-k-stopic of Kafka. As verification, what we start above is a script consumed by the kafka terminal. At this time, we will see the output of the data in the terminal:

$kafka-console-consumer.sh-topic f-k-s-zookeeper uplooking01:2181,uplooking02:2181,uplooking03:21811003 221.8.9.6 80 0f57c8f5-13e2-428d-ab39-9e87f6e85417 10709 0 GET / index HTTP/1.1 null null Mozilla/5.0 (Windows; U Windows NT 5.2) Gecko/2008070208 Firefox/3.0.1 15231074961641002 220.194.55.244 fb953d87-d166-4cb4-8a64-de7ddde9054c 10201 0 GET / check/detail HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0 Rv:11.0) like Gecko 15231074971651003 211.167.248.22 9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48 10022 1 GET / user/add HTTP/1.1 null null Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0) 15231074966641002 61.172.249.96 null 10608 0 POST / updateById?id=21 HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0) Rv:11.0) like Gecko 15231074981661000 202.98.11.101 aa7f62b3-a6a1-44ef-81f5-5e71b5c61368 20202 0 GET / getDataById HTTP/1.0 404 / check/init Mozilla/5.0 (Windows; U; Windows NT 5.1) Gecko/20070803 Firefox/1.5.0.12 1523107497666

In this way, there will be no problem with our integration. Of course, the data in kafka should be consumed by our storm. Here is just a test of integration. Here we will do the integration of kafka+storm.

4 Kafka+Storm Integration

In fact, the integration of Kafka and Storm has a very detailed and clear document on Storm's website: http://storm.apache.org/releases/1.0.6/storm-kafka.html, students who want to know more about it can refer to it.

4.1 Integration thinking

In the construction of big data's real-time processing system, Kafka acts as a message queue (or message middleware), and the messages it produces need to be consumed by consumers, so the key to the integration of Kafka and Storm lies in how our Storm consumes the messages in the Kafka message topic (the messages in the kafka message topic are collected by Flume, and now we need to consume them in Storm).

In Storm, topology is a very critical concept.

Compared with MapReduce, in MapReduce, the job we submitted is called a job, and in a Job, there are several Mapper and Reducer. It is in Mapper and Reducer that we have our data processing logic:

In Storm, we submit a job called topology, which also includes spout and bolt. In Storm, the data processing logic is reflected in spout and bolt:

That is, in spout, it is the source of our data, and because of its real-time characteristics, it can be compared to a "faucet", indicating that it continues to generate data:

So, the crux of the question is how does spout get the data from kafka?

Fortunately, such API is provided in storm-kafka 's integration library for us to operate on.

4.2 Integration process: application of KafkaSpout

In the logic of the code, you only need to create a KafkaSpout object provided by storm-kafkaAPI:

SpoutConfig spoutConf = new SpoutConfig (hosts, topic, zkRoot, id); return new KafkaSpout (spoutConf)

The complete integration code is given below:

Package cn.xpleaf.bigdata.storm.statics;import kafka.api.OffsetRequest;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.StormTopology;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer Import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Tuple / * Integration of Kafka and storm, used to count real-time traffic corresponding to pv and uv * / public class KafkaStormTopology {/ / static class MyKafkaBolt extends BaseRichBolt {static class MyKafkaBolt extends BaseBasicBolt {/ * kafkaSpout sent fields named bytes * / @ Override public void execute (Tuple input, BasicOutputCollector collector) {byte [] binary = input.getBinary (0) / / transfer data across jvm and receive byte data / / byte [] bytes = input.getBinaryByField ("bytes"); / / in this way, String line = new String (binary); System.out.println (line) } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {}} public static void main (String [] args) throws Exception {TopologyBuilder builder = new TopologyBuilder (); / * set dag (directed acyclic graph) of spout and bolt * / KafkaSpout kafkaSpout = createKafkaSpout (); builder.setSpout ("id_kafka_spout", kafkaSpout) Builder.setBolt ("id_kafka_bolt", new MyKafkaBolt ()) .shuffleGrouping ("id_kafka_spout"); / / specify the upstream components of the data through different data flow methods / / use builder to build topology StormTopology topology = builder.createTopology (); String topologyName = KafkaStormTopology.class.getSimpleName (); / / name of the topology Config config = new Config () / / Config () object inherits from HashMap, but encapsulates some basic configuration / / startup topology, using LocalCluster for local startup and StormSubmitter if for cluster startup (args = = null | | args.length

< 1) { // 没有参数时使用本地模式,有参数时使用集群模式 LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } /** * BrokerHosts hosts kafka集群列表 * String topic 要消费的topic主题 * String zkRoot kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量) * String id 当前操作的标识id */ private static KafkaSpout createKafkaSpout() { String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; BrokerHosts hosts = new ZkHosts(brokerZkStr); // 通过zookeeper中的/brokers即可找到kafka的地址 String topic = "f-k-s"; String zkRoot = "/" + topic; String id = "consumer-id"; SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); // 本地环境设置之后,也可以在zk中建立/f-k-s节点,在集群环境中,不用配置也可以在zk中建立/f-k-s节点 //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"}); //spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设置之后,刚启动时就不会把之前的消费也进行读取,会从最新的偏移量开始读取 return new KafkaSpout(spoutConf); }} 其实代码的逻辑非常简单,我们只创建了 一个由storm-kafka提供的KafkaSpout对象和一个包含我们处理逻辑的MyKafkaBolt对象,MyKafkaBolt的逻辑也很简单,就是把kafka的消息打印到控制台上。 需要注意的是,后面我们分析网站PV、UV的工作,正是在上面这部分简单的代码中完成的,所以其是非常重要的基础。 4.3 整合验证 上面的整合代码,可以在本地环境中运行,也可以将其打包成jar包上传到我们的Storm集群中并提交业务来运行。如果Web服务器能够产生日志,并且前面Flume+Kafka的整合也没有问题的话,将会有下面的效果。 如果是在本地环境中运行上面的代码,那么可以在控制台中看到日志数据的输出: ......45016548 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=f-k-s, partitionMap={0=uplooking02:9092, 1=uplooking03:9092, 2=uplooking01:9092}}45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=uplooking02:9092, topic=f-k-s, partition=0}, Partition{host=uplooking03:9092, topic=f-k-s, partition=1}, Partition{host=uplooking01:9092, topic=f-k-s, partition=2}]45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: []45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: []45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing1003 221.8.9.6 80 0f57c8f5-13e2-428d-ab39-9e87f6e85417 10709 0 GET /index HTTP/1.1 null null Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1 15231074961641000 202.98.11.101 aa7f62b3-a6a1-44ef-81f5-5e71b5c61368 20202 0 GET /getDataById HTTP/1.0 404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 15231074976661002 220.194.55.244 fb953d87-d166-4cb4-8a64-de7ddde9054c 10201 0 GET /check/detail HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 15231074971651003 211.167.248.22 9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48 10022 1 GET /user/add HTTP/1.1 null null Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0) 15231074966641002 61.172.249.96 null 10608 0 POST /updateById?id=21 HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107498166...... 如果是在Storm集群中提交的作业运行,那么也可以在Storm的日志中看到Web服务器产生的日志数据: 这样的话就完成了Kafka+Storm的整合。 5 Storm+Redis整合 5.1 整合思路 其实所谓Storm和Redis的整合,指的是在我们的实时处理系统中的数据的落地方式,即在Storm中包含了我们处理数据的逻辑,而数据处理完毕后,产生的数据处理结果该保存到什么地方呢?显然就有很多种方式了,关系型数据库、NoSQL、HDFS、HBase等,这应该取决于具体的业务和数据量,在这里,我们使用Redis来进行最后分析数据的存储。 所以实际上做这一步的整合,其实就是开始写我们的业务处理代码了,因为通过前面Flume-Kafka-Storm的整合,已经打通了整个数据的流通路径,接下来关键要做的是,在Storm中,如何处理我们的数据并保存到Redis中。 而在Storm中,spout已经不需要我们来写了(由storm-kafka的API提供了KafkaSpout对象),所以问题就变成,如何根据业务编写分析处理数据的bolt。 5.2 整合过程:编写Storm业务处理Bolt5.2.1 日志分析 我们实时获取的日志格式如下: 1002 202.103.24.68 1976dc2e-f03a-44f0-892f-086d85105f7e 14549 1 GET /top HTTP/1.1 200 /tologin Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 15238069163731000 221.8.9.6 80 542ccf0a-9b14-49a0-93cd-891d87ddabf3 12472 1 GET /index HTTP/1.1 500 /top Mozilla/4.0 (compatible; MSIE 5.0; WindowsNT) 15238069168741003 211.167.248.22 0e4c1875-116c-400e-a4f8-47a46ad04a42 12536 0 GET /tologin HTTP/1.1 200 /stat Mozilla/5.0 (Windows; U; Windows NT 5.2) AppleWebKit/525.13 (KHTML,like Gecko) Chrome/0.2.149.27 Safari/525.13 15238069173751000 219.147.198.230 07eebc1a-740b-4dac-b53f-bb242a45c901 11847 1 GET /userList HTTP/1.1 200 /top Mozilla/4.0 (compatible; MSIE 6.0; Windows NT5.1) 15238069178761001 222.172.200.68 4fb35ced-5b30-483b-9874-1d5917286675 13550 1 GET /getDataById HTTP/1.0 504 /tologin Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806918377 其中需要说明的是第二个字段和第三个字段,因为它对我们统计pv和uv非常有帮助,它们分别是ip字段和mid字段,说明如下: ip:用户的IP地址mid:唯一的id,此id第一次会种在浏览器的cookie里。如果存在则不再种。作为浏览器唯一标示。移动端或者pad直接取机器码。 因此,根据IP地址,我们可以通过查询得到其所在的省份,并且创建一个属于该省份的变量,用于记录pv数,每来一条属于该省份的日志记录,则该省份的pv就加1,以此来完成pv的统计。 而对于mid,我们则可以创建属于该省的一个set集合,每来一条属于该省份的日志记录,则可以将该mid添加到set集合中,因为set集合存放的是不重复的数据,这样就可以帮我们自动过滤掉重复的mid,根据set集合的大小,就可以统计出uv。 在我们storm的业务处理代码中,我们需要编写两个bolt: 第一个bolt用来对数据进行预处理,也就是提取我们需要的ip和mid,并且根据IP查询得到省份信息;第二个bolt用来统计pv、uv,并定时将pv、uv数据写入到Redis中; 当然上面只是说明了整体的思路,实际上还有很多需要注意的细节问题和技巧问题,这都在我们的代码中进行体现,我在后面写的代码中都加了非常详细的注释进行说明。 5.2.2 编写第一个Bolt:ConvertIPBolt 根据上面的分析,编写用于数据预处理的bolt,代码如下: package cn.xpleaf.bigdata.storm.statistic;import cn.xpleaf.bigdata.storm.utils.JedisUtil;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import redis.clients.jedis.Jedis;/** * 日志数据预处理Bolt,实现功能: * 1.提取实现业务需求所需要的信息:ip地址、客户端唯一标识mid * 2.查询IP地址所属地,并发送到下一个Bolt */public class ConvertIPBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { byte[] binary = input.getBinary(0); String line = new String(binary); String[] fields = line.split("\t"); if(fields == null || fields.length < 10) { return; } // 获取ip和mid String ip = fields[1]; String mid = fields[2]; // 根据ip获取其所属地(省份) String province = null; if (ip != null) { Jedis jedis = JedisUtil.getJedis(); province = jedis.hget("ip_info_en", ip); // 需要释放jedis的资源,否则会报can not get resource from the pool JedisUtil.returnJedis(jedis); } // 发送数据到下一个bolt,只发送实现业务功能需要的province和mid collector.emit(new Values(province, mid)); } /** * 定义了发送到下一个bolt的数据包含两个域:province和mid */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("province", "mid")); }}5.2.3 编写第二个Bolt:StatisticBolt 这个bolt包含我们统计网站pv、uv的代码逻辑,因此非常重要,其代码如下: package cn.xpleaf.bigdata.storm.statistic;import cn.xpleaf.bigdata.storm.utils.JedisUtil;import org.apache.storm.Config;import org.apache.storm.Constants;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Tuple;import redis.clients.jedis.Jedis;import java.text.SimpleDateFormat;import java.util.*;/** * 日志数据统计Bolt,实现功能: * 1.统计各省份的PV、UV * 2.以天为单位,将省份对应的PV、UV信息写入Redis */public class StatisticBolt extends BaseBasicBolt { Map pvMap = new HashMap(); Map midsMap = null; SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); @Override public void execute(Tuple input, BasicOutputCollector collector) { if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) { // 如果收到非系统级别的tuple,统计信息到局部变量mids String province = input.getStringByField("province"); String mid = input.getStringByField("mid"); pvMap.put(province, pvMap.get(province) + 1); // pv+1 if(mid != null) { midsMap.get(province).add(mid); // 将mid添加到该省份所对应的set中 } } else { // 如果收到系统级别的tuple,则将数据更新到Redis中,释放JVM堆内存空间 /* * 以 广东 为例,其在Redis中保存的数据格式如下: * guangdong_pv(Redis数据结构为hash) * --20180415 * --pv数 * --20180416 * --pv数 * guangdong_mids_20180415(Redis数据结构为set) * --mid * --mid * --mid * ...... * guangdong_mids_20180415(Redis数据结构为set) * --mid * --mid * --mid * ...... */ Jedis jedis = JedisUtil.getJedis(); String dateStr = sdf.format(new Date()); // 更新pvMap数据到Redis中 String pvKey = null; for(String province : pvMap.keySet()) { int currentPv = pvMap.get(province); if(currentPv >

0) {/ / only update if the pv in the current map is greater than 0, otherwise it doesn't make sense pvKey = province + "_ pv"; String oldPvStr = jedis.hget (pvKey, dateStr); if (oldPvStr = = null) {oldPvStr = "0" } Long oldPv = Long.valueOf (oldPvStr); jedis.hset (pvKey, dateStr, oldPv + currentPv + ""); pvMap.replace (province, 0) / / reset the pv of the province to 0}} / / update midsMap to Redis String midsKey = null; HashSet midsSet = null; for (String province: midsMap.keySet ()) {midsSet = midsMap.get (province) If (midsSet.size () > 0) {/ / the size of the set in the current province is not updated until it is greater than 0, otherwise it is meaningless midsKey = province + "_ mids_" + dateStr; jedis.sadd (midsKey, midsSet.toArray (new String [midsSet.size ()])); midsSet.clear () }} / / release jedis resource JedisUtil.returnJedis (jedis); System.out.println (System.currentTimeMillis () + "- > write data to Redis") } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {} / * * set a scheduled task, which is only valid for the current bolt. The system will periodically send a system-level tuple * / @ Override public Map getComponentConfiguration () {Map config = new HashMap (); config.put (Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return config to StatisticBolt. } / * initialize the pv and mids information of each province (using temporary storage to calculate the data needed by pv and uv) * / public StatisticBolt () {pvMap = new HashMap (); midsMap = new HashMap () String provinceArray = {"shanxi", "jilin", "hunan", "hainan", "xinjiang", "hubei", "zhejiang", "tianjin", "shanghai", "anhui", "guizhou", "fujian", "jiangsu", "heilongjiang", "aomen", "beijing", "shaanxi", "chongqing", "jiangxi", "guangxi", "gansu" "guangdong", "yunnan", "sicuan", "qinghai", "xianggang", "taiwan", "neimenggu", "henan", "shandong", "shanghai", "hebei", "liaoning", "xizang"} For (String province: provinceArray) {pvMap.put (province, 0); midsMap.put (province, new HashSet ());}} 5.2.4 write Topology

We need to write a topology to organize the Bolt we wrote earlier, as follows:

Package cn.xpleaf.bigdata.storm.statistic;import kafka.api.OffsetRequest;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.StormTopology;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.topology.TopologyBuilder / * build topology * / public class StatisticTopology {public static void main (String [] args) throws Exception {TopologyBuilder builder = new TopologyBuilder (); / * set dag (directed acyclic graph) of spout and bolt * / KafkaSpout kafkaSpout = createKafkaSpout (); builder.setSpout ("id_kafka_spout", kafkaSpout) Builder.setBolt ("id_convertIp_bolt", new ConvertIPBolt ()) .shuffleGrouping ("id_kafka_spout"); / / specify the upstream component of the data builder.setBolt ("id_statistic_bolt", new StatisticBolt ()) .shuffleGrouping ("id_convertIp_bolt") through different data flow modes. / / specify the upstream components of the data through different data flow methods / / build topology StormTopology topology = builder.createTopology () using builder; String topologyName = KafkaStormTopology.class.getSimpleName (); / / name of the topology Config config = new Config () The / / Config () object inherits from HashMap, but encapsulates some basic configuration / / startup topology. Local startup uses LocalCluster, and cluster startup uses StormSubmitter if (args = = null | | args.length < 1) {/ / use local mode when there are no parameters, and use cluster mode LocalCluster localCluster = new LocalCluster () when there are parameters. / / Local development mode. The objects created are LocalCluster localCluster.submitTopology (topologyName, config, topology);} else {StormSubmitter.submitTopology (topologyName, config, topology) }} / * BrokerHosts hosts kafka cluster list * topic topics to be consumed by String topic * directory of String zkRoot kafka in zk (the offset of reading kafka messages will be recorded under this node directory) * identification of String id's current operation id * / private static KafkaSpout createKafkaSpout () {String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181" BrokerHosts hosts = new ZkHosts (brokerZkStr); / / the address of kafka can be found through / brokers in zookeeper String topic = "f-k-s"; String zkRoot = "/" + topic; String id = "consumer-id"; SpoutConfig spoutConf = new SpoutConfig (hosts, topic, zkRoot, id) / / after the local environment is set, / f-k-s node can also be established in zk. In cluster environment, / f-k-s node / / spoutConf.zkServers = Arrays.asList can be established in zk without configuration (new String [] {"uplooking01", "uplooking02", "uplooking03"}); / / spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime () / / after setting, the previous consumption will not be read as soon as it is started, but return new KafkaSpout (spoutConf) will be read from the latest offset.

After the above program is packaged into a jar package and uploaded to our cluster submission business, if there is no problem with the previous integration and Web logs are generated in the Web service, then after a period of time, we can see the final processing result of the data in the Redis database, that is, the uv and pv information of each province:

It is important to note that the mid information is a set collection, and only the size of the set collection is required, and the UV value can be calculated.

So far, accurately speaking, the big data real-time processing system of our statistical pv and uv is completed, and the use of the data results varies according to different business needs, but for the pv and uv data of the website, it is very suitable for visual processing, that is, to dynamically display the data with web pages. Our next step is to build a simple Web application to dynamically display pv and uv data.

6 data visualization processing

At present, we need to complete two parts of data visualization:

1. Develop a Web project that can query data in Redis and provide visited pages 2. 5. Develop or find a front-end UI that meets our needs and display the data queried in the Web project.

For the development of Web projects, depending on individual technology stack capabilities, the choice of language and technology is also different, as long as we can achieve our ultimate goal of data visualization, in fact. What we want to show in this project is pv and uv data, which is not very difficult, so we can choose Java Web, such as Servlet, SpringMVC, etc., or Python Web, such as Flask, Django, etc., Flask I personally like very much, because the development is very fast, but because the front has been using Java, so here I still choose to use SpringMVC to complete.

As for UI, my front-end capabilities are mediocre, and there is no problem with ordinary development, but there is really nothing I can do to make a map-type UI interface like the one above to display data. Fortunately, there are more third-party UI frameworks, such as highcharts and echarts, which are displayed in the chart class, in which echarts is open source from Baidu with rich Chinese documents, so here I choose to use echarts as UI, and it just happens to have UI components that can meet our needs of the map class.

Because it is not difficult, the specific development process will not be mentioned here, interested students can directly refer to the source code provided by me, here we will directly take a look at the effect.

Because in fact, in this project case, this part of the code is also very small, using SpringMVC development, as long as the JavaEE three-tier architecture together, the introduction of dependencies, the following development is not difficult; and if there is a classmate association Flask or Django, the construction and code of the project itself will be easier.

After starting our Web project, enter the address to access the data display interface:

As you can see, the UI of echarts is quite good-looking, and it can really meet our needs. Two different color dots on each province indicate that there are two kinds of data we need to display, namely pv and uv, which are also reflected in the upper left corner, and the depth of the color can reflect the relationship between the number of pv or uv.

On this interface, click uv in the upper left corner to indicate that we do not view the data of uv, so we can only see the situation of pv:

Of course, you can only look at uv:

When you hover over a province, you can check the specific pv or UV value of that province. For example, when we hover the mouse over "Guangdong", you can see that the PV value is 170. the same is true for other provinces:

So the data can be viewed, how to reflect the dynamic?

There are two schemes for the dynamic refresh of page data, one is to refresh the page regularly, and the other is to request data asynchronously from the back end.

At present, I use the first, the page is regularly refreshed, interested students can also try to use the second method, just need to develop the relevant API to return JSON data in the back-end.

7 Summary

So far, from the construction of the whole big data real-time processing system to the final data visualization work, we have completed, and we can see that there are still many knowledge levels involved in the whole process, but I personally think that as long as we firmly grasp the core principles, for most cases, the construction of the environment and business-based development can be well solved.

To write this article, on the one hand, it is a summary of my own practice, and on the other hand, I also hope to share some good project cases with you. In short, I hope I can be helpful to you.

The code involved in the project case has been uploaded to GitHub, which is divided into two. One is the project code of storm, and the other is the code for data visualization, as follows:

Storm-statistic: https://github.com/xpleaf/storm-statistic

Dynamic-show: https://github.com/xpleaf/dynamic-show

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report