In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains "what are the reasons for migrating from Druid to ClickHouse". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what are the reasons for migrating from Druid to ClickHouse"?
EBay Advertising data platform provides eBay first-party advertisers (sellers using Promoted Listing services) with ad traffic, user behavior and effect data analysis functions.
Through the marketing tab, effect tab and public API of the seller Center (Seller Hub), advertising sellers effectively control and compare the flow of the store's marketing activities and promotional products, real-time and historical data of sales volume, and download data analysis reports through the web page or API.
At the beginning of its launch, this system uses a self-developed distributed SQL engine and is built on the object storage system. Three years ago, as advertising traffic increased, we switched our data engine to Druid.
The main challenges of this platform are as follows:
Large amount of data: there are tens of billions of inserted data records every day, and the insertion peak is close to 1 million per second.
Offline data intake: the data of the first 1-2 days need to be replaced online every day without affecting real-time data intake.
According to the cleansed daily data released by the upstream data team, the advertising data platform needs to replace real-time data every day without affecting the query, and data switching requires global atomic operations across nodes.
Integrity and consistency: for seller-oriented financial data, offline updated data requires no omission or repetition; real-time data requires end-to-end delay of less than ten seconds.
Druid vs ClickHouse
Druid, developed by Metamarkets in 2011, is a high-performance online analysis and storage engine. It was open source in 2012 and became a project under the Apache Foundation in 2015.
Druid is widely used in the industry, providing subsecond query latency for hundreds of billions of data, and is good at high availability and horizontal scaling.
In addition, it provides a lot of very convenient aggregation and conversion templates for data intake, built-in support for a variety of data sources, and can configure new data tables within dozens of minutes at the earliest, including data definition and data intake link (Lambda architecture), which greatly improves the efficiency of development.
ClickHouse, developed by Yandex, the largest search engine company in Russia, is designed to support core functions such as Yandex.Metrica (the world's second largest Web analysis platform) to generate user analysis reports.
ClickHouse is a database management system (DBMS), with database, table, view, DDL, DML and other concepts, and provides a more complete SQL support.
Its core features are as follows:
Efficient data storage: through data compression and column storage, you can achieve a maximum data compression ratio of 10 times.
Efficient data query: through primary key indexing, vectorization engine processing, multiprocessor concurrent and distributed queries, the maximum squeeze all the capabilities of CPU, especially in small and medium-sized data.
Flexible data definition and access: reduce learning and migration costs by supporting SQL, JDBC and relational models, and integrate seamlessly with other existing data products.
Why migrate?
Operation and maintenance
Although Druid provides a lot of very convenient data intake functions, its component composition is also more complex, there are 6 node types (Overload,Coordinator,Middle Manager,Indexer,Broker and Historical).
In addition to its own nodes, Druid relies on MySQL to store metadata information, Zookeeper election Coordinator and Overlord, and HDFS to back up historical data.
The architecture of ClickHouse adopts the design of peer-to-peer node. There is only one type of node, and there is no master-slave node. If the copy feature is used, it depends on the synchronization progress of the segments saved by Zookeeper.
At the same time, eBay's infrastructure team proposes to provide the product team with the service of column database storage on the basis of customized ClickHouse.
In addition to operation and maintenance and lifecycle management, the infrastructure team carried out transformation and secondary development of ClickHouse, which further improved the efficiency of data intake and storage, and made up the functional gap with Druid in offline intake.
Delayed data insertion
By introducing the indexing task of real-time data, Druid processes the real-time data into segmented data (segment) and archives them into historical data. After it becomes segmented data, the data for that period is not writable.
Due to the limit of the number of concurrent real-time indexing tasks, we set a window length of 3 hours (one task per hour), so data beyond 3 hours cannot be written.
In some extreme cases, such as the delay of upstream data or the lag of real-time data consumption, it will lead to the loss of this part of data before offline data replacement. ClickHouse does not have this restriction, and any partition can be written at any time.
Primary key optimization
The primary key supported by ClickHouse is not the primary key of relational databases in the traditional sense. The traditional primary key requires that each table record has a unique key value, and a table record can be uniquely queried by querying the primary key.
In ClickHouse, the primary key defines the order in which records are sorted in storage, allowing repetition, so it seems more reasonable to call it a sort key.
In fact, the definition of the primary key in ClickHouse is declared through ORDER BY and is only allowed to be inconsistent with the sort key in individual scenarios (but it must be the prefix of the sort key).
Because our products provide analysis function for sellers, almost all queries are limited to a single seller dimension, so the query efficiency and data compression ratio can be greatly improved by sorting the sellers by the primary key.
System architecture
Figure 1
As shown in figure 1, the system consists of four parts:
Real-time data acquisition module, access to eBay behavior and transaction real-time message platform.
The offline data replacement module is connected to the data warehouse platform within eBay.
ClickHouse deployment and peripheral data services.
Report service, supporting advertisers, merchant backend and eBay to expose API.
Actual combat experience
Schema design
ClickHouse provides a rich Schema configuration. This requires repeated consideration and experiments according to business scenarios and data models, because different choices will have an order of magnitude impact on storage and performance, and a wrong choice will lead to huge tuning and change costs in the later stage.
① Table engine
The core of ClickHouse's storage engine is the merge tree (MergeTree), which is derived from this:
Summary merge Tree (SummingMergeTree)
Aggregate merge tree (AggregationMergeTree)
Commonly used table engines such as version folding tree (VersionCollapsingTree)
In addition, all the above merge tree engines have corresponding versions of the replication function (ReplicatedXXXMergeTree).
Our advertising data platform shows and clicks on the data selected by the replication summary merge tree. These two types of user behavior have a large amount of data, so reducing the amount of data, saving storage overhead and improving query efficiency is the main goal of pattern design.
ClickHouse aggregates data according to a given dimension in the background, reducing the amount of data by 60%.
The sales data choose the ordinary replication merge tree, on the one hand, because the sales data have aggregate demand for some indicators other than summary, on the other hand, because the amount of data is not large, the need to merge data is not urgent.
② primary key
In general, the primary key (Primary Key) and sort key (Order By Key) of ClickHouse tables are the same, but tables with a summary merge tree engine (SummingMergeTree) can specify primary keys separately.
Excluding some dimension fields that do not require sorting or indexing from the primary key can reduce the size of the primary key (all of which need to be loaded into memory when the primary key is running) and improve query efficiency.
③ compression
ClickHouse supports column-level data compression, significantly reducing the storage of raw data, which is also a huge advantage of the column storage engine. In the query phase, smaller storage footprint can also reduce the amount of IO.
Choosing a suitable compression algorithm and grade for different columns can optimize the balance between compression and query.
All columns of ClickHouse default to LZ4 compression. In addition, general data columns can choose algorithms with a higher compression ratio, such as LZ4HC,ZSTD.
For monotonous growth data similar to time series, special compression algorithms such as DoubleDelta,Gorilla can be selected.
Algorithms with high compression rates such as LZ4HC and ZSTD can also choose their own compression level. In our production data set, the ZSTD algorithm is more effective in compressing String type fields. LZ4HC is an improved version of LZ4 with a high compression ratio and is more suitable for non-string types.
Higher compression ratio means less storage space, and the query performance can be improved indirectly by reducing the amount of IO of the query.
However, the CPU is not from the strong wind, the data insertion performance has become a victim. According to our internal test data, using LZ4HC (6) on our production dataset can save 30% of data compared to LZ4, but real-time data intake performance is down by 60%.
④ low base
It is worth mentioning that for columns with a low cardinality (that is, low diversity of column values), you can use LowCardinality to reduce the original storage space (and thus the final storage space).
If you use LowCardinality for a column of a string type using a compression algorithm, you can reduce the amount of space by another 25%.
On our test dataset, if the entire table uses a combination of LowCardinality, LZ4HC (6), and ZSTD (15), the overall compression ratio is about 13% of the original.
Offline data replacement
① Challenge
The data report for advertisers requires that the data be accurate and consistent. There is a small amount of bot data in real-time behavior data (which needs to be cleared offline), and the attribution of advertising also needs to be readjusted during the offline phase.
Therefore, we introduce an offline data link to replace real-time data with offline data 24-72 hours after the real-time data is written.
The challenges are as follows:
The amount of offline data that the advertising system needs to deal with every day is close to 1TB. Before that, it takes a lot of time to import data from Hadoop to Druid.
In addition, the cost of Ibank O, CPU, and memory during import puts a lot of pressure on the query. How to ensure the consistency of data as well as the efficiency of data migration is the crux of the problem.
How to ensure that the data visible to the user fluctuates to a minimum during data replacement. This requires that the data replacement operation be atomic, or at least atomic for every advertiser.
In addition to the daily offline data update, it is necessary to support a wide range of data correction and compensation when there is a deviation or omission in the data warehouse.
Job scheduling requires to ensure that the daily work is completed in time, and the data correction work is completed as soon as possible. In addition, various indicators in data updates need to be monitored to deal with various emergencies.
Druid natively supports offline data update services, which we have implemented on the ClickHouse platform in collaboration with the infrastructure team.
② data architecture
For big data architecture that integrates online and offline data, the usual practice in the industry is Lambda architecture. That is, the offline layer and the online layer import data respectively, and merge the data in the display layer.
We have also generally adopted this architecture. But the specific practice is different from the classic.
The data partition (partition) in ClickHouse is a separate data storage unit, and each partition can be separated from the existing table (detach), introduced (attach), and replaced (replace).
The conditions of the partition can be customized and are generally divided by time. Through a single replacement of the data partition in the data table, we can make the query layer transparent to the underlying data updates, and there is no need for additional logic for data merging.
③ Spark aggregation and fragmentation
In order to reduce the performance pressure of ClickHouse importing offline data, we introduce the Spark task to aggregate and fragment the original offline data. Each shard can pull and import data files separately, saving the cost of data routing and aggregation.
④ data update task management
Locking the partition topology: before processing the data, the offline data update system requests that the partition topology of the ClickHouse be locked to the services provided by the infrastructure team, during which time the partition topology will not change.
The server returns the slicing logic and ID of the data according to the predefined data table structure and partition information.
The offline data update system submits Spark tasks according to the topology information. The data processing of multiple tables is completed in parallel through Spark, which significantly improves the speed of data update.
Data aggregation and fragmentation: for each table that needs to be updated, start a Spark task to aggregate and fragment the data.
According to the table structure and sharding topology returned by the ClickHouse server, the data is written into Hadoop, and the checksum and the number of shard rows used to verify consistency in the data replacement phase are output.
The system submits and polls the task status through Livy Server API, and retries in case of task failure, in order to eliminate the task failure caused by insufficient resources in the Spark cluster.
Offline data update not only needs to meet the needs of daily batch data update, but also needs to support the re-update of past data in order to synchronize the changes of upstream data in addition to daily scheduled task updates.
We use the Spring Batch management update task encapsulated by the platform team to divide the daily data into a subtask by date.
The Continuously Job implemented through Spring Batch ensures the uniqueness of sub-tasks running at the same time and avoids the problem of task competition.
For past data updates, we classify Batch tasks so that in addition to routine tasks, we can also manually trigger data correction tasks within a given time range (see figure 2).
Figure 2
Data replacement: after all the Spark Job in the subtask is completed, the offline data update system will call the data replacement interface provided by the infrastructure team to initiate a data replacement request.
The server writes data directly from Hadoop to ClickHouse according to the defined partition, as shown in figure 3.
Figure 3
The architecture of the offline data update system is shown in figure 4:
Figure 4
The MySQL database is used to record the status and priority of the task during the data replacement process, and recover the progress of the task when the Spark Job fails or the replacement task fails to restart due to other reasons.
Atomicity and consistency of ⑤
To ensure the atomicity of data substitution, the infrastructure team provides a way to replace partitions. During the process of importing offline data, you first create a temporary partition of the target partition. When the data is replaced and the verification is complete, the target partition is replaced by a temporary partition.
To solve the problem of atomicity replacement of different fragments on different machines, the infrastructure team introduced a data version for each piece of data.
For each data partition, there is a corresponding active version number. Until all the fragments of the data partition to be replaced have been successfully imported, the version number of the partition is updated.
The same SQL applied upstream can only read the data of one version of the same partition, and the data replacement of each partition only feels a switch, and there is no problem of reading new and old data at the same time.
Therefore, the report generation application of advertising platform introduces corresponding modifications at the SQL level, through the introduction of fixed WITH and PREWHERE statements, the corresponding version number of each data partition is queried in the dictionary, and unnecessary data partitions are excluded from the query plan.
In order to ensure the consistency of data replacement, the offline data update system will calculate the check code and the total amount of data after the completion of Spark data processing.
When the replacement is completed, the ClickHouse server will verify the fragmented data to ensure that there is no data loss and duplication in the process of data relocation.
Data query
ClickHouse supports SQL query (incomplete), has two connection methods: HTTP and TCP, and is rich in official and third-party query tools and libraries.
Users can quickly develop and debug data queries using command line, JDBC or visualization tools.
ClickHouse makes full use of machine resources through MPP (Massively Parallel Processing) + SMP (Symmetric Multiprocessing). By default, a single query statement uses CPU with half the number of machine cores.
Therefore, ClickHouse does not support high concurrency application scenarios. At the business usage level, the core problem is query verification and concurrency control. A single query that is too large or too concurrent will lead to high utilization of cluster resources and affect the stability of the cluster.
Application architecture
EBay Seller Hub is connected to ClickHouse query through Reports Service. Reports Service provides two sets of API, Public and Internal.
Internal API is available to Seller Hub and other internal known applications, and Public API is open to third-party developers in eBay Developers Program. For more information, please see:
Https://developer.ebay.com/
Figure 5
The query of Internal API is directly submitted to the internal thread pool for execution, and the size of the thread pool is set according to the number of cluster machines in ClickHouse. Query requests are checked before execution, filtering all requests that are illegal and whose resources are unpredictable.
Public API executes the query asynchronously by the way of task submission, the query task submitted by the user is stored in DB, and the Schedule in Service scans the table regularly and executes the query task serially according to the status of the task.
The Report generated by the successful task is uploaded to the file server, and the user gets the URL and downloads it by himself. Perform failed tasks and choose whether to execute again in the next cycle according to the type of error (illegal requests, insufficient resources, etc.).
Test release
After the deployment of the production environment, we enable double write of data, and continuously insert real-time data and offline data into ClickHouse until we reach the data level of Druid.
After data consistency verification, we mirrored a query for the production service and forwarded these queries to ClickHouse.
By collecting and comparing the responses of Druid and ClickHouse, we can verify the data quality and query performance of ClickHouse links.
In the grayscale phase, we gradually increase the proportion of ClickHouse service production systems and keep Druid running to ensure that problems can be rolled back in time.
Query GUI
In terms of data visualization, we need to provide visualization tools similar to Turnilo for developers, testers and BI personnel to use.
ClickHouse supports a variety of commercial and open source product access, we chose Cube.JS, and carried out a simple secondary development.
Figure 6
Thank you for your reading, the above is "what are the reasons for migrating from Druid to ClickHouse"? after the study of this article, I believe you have a deeper understanding of the reasons for migrating from Druid to ClickHouse, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.