Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

DataPipeline? Guazi's structured data flow based on Kafka

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Why choose Kafka?

Why Kafka? In view of the huge amount of data, it needs to be distributed. At this time, the data in Q needs to be distributed to many machines for storage. In addition, there are distributed computing requirements. At the same time, it needs to support multiple languages, such as Java, GO, PHP, etc., and there is also a high availability requirement.

II. Kafka Cluster

Realtime's Kafka cluster synchronizes all data to Analysis's Kafka cluster via Mirror Maker.

Realtime's Kafka cluster is mainly responsible for online real-time reading and writing. Analysis is responsible for many tasks, such as data import and export. The multiple outflows of data bring great pressure to the cluster and network hard disk. To ensure stability on the line, make sure that both sides are separated. In addition, there are currently more than 50,000 topics, and there may be more than 1 million data inflows and outflows per second.

Third, the user use of Kafka problems

1. Parameter configuration problem, Kafka has a lot of parameters to configure, common cluster configuration, delay, importance, etc., need to be encapsulated.

Development testing is inconvenient, users usually have such requirements: my data is not written, consumption is not, what does he write data look like, structured data needs to write code to parse, and so on. Without tools and platforms to solve these problems, development efficiency will be greatly reduced.

Topic application is not convenient, topic is not open to create their own, we have opened Topic in the test environment, found that within a week rose to tens of thousands, and the parameters are strange, there are all default parameters, according to the document, the time first to 10 9, there are partitions directly to 100. The work order mode is very unfriendly to managers, requiring logging on to the server to knock commands, which is inefficient and error-prone.

Structured data query is not convenient, melon seeds are structured using AVRO, serialized data is difficult to view the original data.

It is inconvenient to locate abnormal consumption, such as the consumption data or the wrong location. If you want to roll back the consumption or skip the dirty data, you will face various problems. From which offset do you start to consume again? Or which offset after the jump? In addition, rolling restart of a service, the result found that the consumption of data is less than a batch, it is likely that a hidden consumer is using this consumer group at the same time, but a circle did not find which service has not been turned off.

I don't know the downstream. If I write the Topic data produced by the producer, I don't know which consumers there are. If I want to change the Topic information, I don't know who to notify. This is a very complicated matter. Either first, downstream problems called themselves, or hesitate, first collect downstream, of course, the actual situation is generally the former, often flying chickens and jumping dogs.

Operation and maintenance is complex. Daily operation and maintenance includes adding topic partition, helping to locate dirty data (because they don't know there is dirty data), helping to eliminate configuration problems, etc.

IV. Solution: Kafka platform

In order to solve the above problems, Guazi launched Kafka platform, which is mainly oriented to users and management functions.

User-oriented tools include: viewing data, understanding consumption, easily adding monitoring alarms, and quickly checking errors if problems occur.

Management aspects include: authority management, application approval, and some common operations. For example, seek offset, delete a Topic, expand partitions, etc.

data query

You can query the data corresponding to offset through offset, or query the data in that section through the approximate time of entering Kafka. You can see the partition, offset, time of entering Kafka, version information of AVRO, etc. of each piece of information.

Consumption Inquiry

The interface shown below allows you to view a message to see which consumer groups have consumed and which have not.

At the same time, we can see which IP it is currently being consumed by. At this time, we can easily locate which consumer is not closed and which machine it is on, which comes from our practical experience. You can also see the consumption delay of each consumer group, accurate to the number of pieces, the delay of partition. You can also see the total number of partition messages, and you can troubleshoot some problems with uneven messages.

The following figure shows the monitoring alarm. You can know the inflow and outflow data of Topic, how many messages are written per second, what size, and the outflow per second.

Alarm is to set up some traffic alarms for Topic, or some delayed alarms. After it is built, you only need to subscribe to it, which is very convenient.

V. Structured data flow of melon seeds

At present, there are many usage scenarios, such as front-end buried points, tracking logs, Mysql data synchronization, operation logs, some exchanges between services, SQL-based streaming, APM data, and SQL-based ETL, etc., which can be quickly synchronized to big data for subsequent analysis through structure.

We do this through a suite of solutions provided by confluent. The two most important components are Schema Registry and Kafka Connect. Schema Registry is used to store schema information and Kafka connect is used for data transfer.

At present, more than 90% of melon seeds are structured except for the log part. Why Avro? Because Avro is fast and cross-language supported, all Schema AVSC is made in JSON, which is especially good for JSON support, if probably no one wants to learn another language for a schema definition. JSON does not require code generation.

But avro alone is not enough, we will face more problems in use, such as:

Unified schema center, this is the same reason as the necessity of configuration center, there is no unified place, word of mouth, configuration flying is not what we want to see.

Multi-version requirements, schema is sure to have update requirements, there must also be rollback requirements, there will also be compatibility requirements, so multi-version needs to be met.

Version compatibility check, imagine that the upstream changed a schema column name, the downstream wrote hive when the blind, historical data ah, how to deal with this column of data now. So there has to be version compatibility, and it's best to meet the requirements of all downstream components.

Schema must have comments, it is best to have comments for people to read, many people define enum yesterday, forget today, this thing is very common.

To solve these problems, we introduce the Schema Registry of confluence. Confluence Schema registry, through RESTful interface, provides similar configuration center capabilities, as well as a complete UI, support version compatibility check, support for multiple versions, etc., fully meet our needs. It also comes with HA and stores configuration information through Kafka to ensure consistency.

V. The practice of seeds

Of course, these are not enough, we encounter many problems in practice, such as schema registration can not be completely open, history tells us that complete freedom means chaos. In order to make good use of avro in practice, we changed two schemes back and forth to ensure that the schema is controllable.

initial programme

To achieve unified control, all schemas will be managed through git, and if necessary, you can fork the git. If you want to do an online, update or add a schema, you can submit it to the administrator by raising a merge request. After the administrator checks that there is no problem, it is automatically registered through gitlab-ci. The administrator only needs to complete the confirmed operation.

However, there will be some problems in this way. First, the online process is too long. When you want to go online or update a schema, you need to submit a merge request, which can only be viewed after the administrator receives the email. At that time, if the administrator finds that the schema is written incorrectly, you need to go through the process again, which may take one day. And rollback is complex, without permission management. And many people make the same mistake, customer service means a considerable waste of time.

VI. Platform-based solutions

Through platform-based solutions, we provide a page similar to git, where you can submit the schema directly and click Verify directly below. After evaluating whether the newly launched schema has problems, wait for background approval. Some functions such as authority management can be added.

Why use Kafka Connect?

Kafka connect focuses on copying data, transferring a piece of data from a data source to Kafka, and from Kafka to somewhere else. It supports batch and streaming, as well as real-time and batch processing, such as 5min synchronization once.

In addition, it supports copy between multiple systems, and the data source may be Mysql, SQL Server, or Oracle. Sink can be Hbase, Hive, etc. It defines its own plugin interface, and can write many data sources and unsupported sinks.

And he himself achieved distributed parallelism, supported perfect HA and load balance, and provided convenient RESTful interfaces.

Before Kafka Connect, ETL operations were cumbersome. Take canal for example, canal has server and client, all need manual deployment, if you have 100 canal nodes 1000 databases, think about it, how do administrators know which machine runs which database tables, and which machine to run new tasks.

In addition, if Mysql modifies a field, it also needs to let the programmer go to the machine to see how the table is modified, and all the corresponding downstream must complete the table structure modification before it can run. The response speed is very slow.

Kafka Connect has solved these problems. It has a very important feature. If the upstream data is modified according to AVRO compatibility, connect will also make some compatibility modifications downstream, automatically changing the downstream table structure and reducing the operation and maintenance burden.

Let's take a look at the architecture of Kafka Connect. Kafka Connect stores all the information in Kafka, where config topic stores metadata, Stutas topic refers to which nodes are currently running what kind of job, offset topic refers to which data is currently consumed by a partition of a Topic.

WorKer is stateless and can run many tasks on it. The same task1 may correspond to 5 partitions. If only three concurrent tasks are given, it will be distributed on three machines. If one machine fails, these jobs are distributed to the other two machines, and they are synchronized in real time.

8. Plugins

Many of the plugins in Kafka Connect have been modified.

Maxwell

We replace canal with maxwell and make maxwell a plugin for Kafka connect.

Native Maxwell does not support AVRO, melon seeds through the debezia idea of Maxwell has been modified to support avro format, and Mysql meta management, and support Mysql database switching.

HDFS

We use the hdfs plug-in of confluence company, but there are many problems in itself, for example, when writing hive, the column as partition will also be written to the main table data, although it does not affect the use of hive, but it affects presto reading hive, here we changed the source code and removed these columns in the main table.

Hdfs reads all files in hdfs to determine which offset to continue from when the plugin restarts. There are two problems here: it takes too long, offset cannot be continued when switching clusters, and we have also modified it.

Plugin supports using Kafka timestamp as partition when writing hive, and also supports using some columns in the data as partition, but it does not support both at the same time. We have also modified it.

HBase

Hbase plugin only supports the most primitive export, we will have some special requirements, such as custom rowkey, usually mysql primary key is self-increasing ID, hbase does not recommend using self-increasing ID as rowkey, we will have reverse requirements, there are multi-column joint rowkey requirements, etc., this we also changed the source code, support by configuring custom rowkey generation.

The original plugin does not support kertosis, and our online hbase is with permissions, so it has also been changed.

There are also some small functions, such as converting all types to string first and then saving, supporting delete, supporting json, etc.

KUDU

We use kudu a lot, kudu open source plugin some bugs, we also found a fix.

Kudu data sources are mysql, but there are often mysql brush library cases, then the amount will be very large, kudu sink will have a larger delay, we changed the plugin, added adaptive flow control, automatic expansion into multi-threaded processing, but also in the traffic hours, automatic shrinkage.

9. Data Pipeline of melon seed database

The data warehouse of melon seeds is completely based on the structured data of Kafka and AVRO. There are many data sources, and thousands of tables from multiple business lines need to be synchronized to the warehouse to provide services to the outside world.

The entire data warehouse adopts Lambda architecture, which is divided into two processes: T+1 inventory processing and T+0.1 incremental processing.

First, let's talk about the stock processing part of T+1. At present, all mysql tables are put into Kafka through Maxwell plug-in, and then written to Hbase through Kafka connect. Hbase takes a snapshot every night and writes it to Hive. After several rounds of ETL: DWB-->DWD-->DW->DM, finally DM layer data is imported into Kudu to provide BI analysis services to the outside world. Of course, offline olap analysis is still directly accessed Hive query through presto.

Besides, the incremental process of T+0.1 is the same as T+1. The data enters Kafka through maxwell. This part of the process is shared. Then the incremental data will be written to kudu in real time through kudu plug-in, and then ETL will be done through impala. The generated data provides T+0.1 queries to the outside world. The queries provided to the outside world are done through another set of impala. In the future, we will also consider reading data directly from Kafka through Flink to do real-time ETL to improve real-time performance.

This is the overall structure diagram of our warehouse structure

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report