In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "how Netflix achieves 2 million data processing per second". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how Netflix achieves 2 million data processing per second".
While promoting technological innovation and upgrading, it is not easy to ensure that Netflix has a consistent and good experience.
How can we ensure that updates do not affect users? What if we make sure that our improvements are measurable? Netflix uses real-time logs from playback devices as event sources to obtain metrics to understand and quantify the fluency of browsing and playback on user devices.
Once we have these metrics, we enter them into the database. Each indicator is accompanied by anonymous details related to the type of device used, such as whether the device is a smart TV, iPad or Android phone. In this way, we can classify devices and view data from different aspects. Similarly, we can isolate only issues that affect specific groups, such as versions of applications, specific types of devices, or specific countries.
This aggregated data can be used for queries immediately, either through dashboards or impromptu queries. These indicators also continuously check for alarm signals, such as whether the new version will affect the playback or browsing of some users or devices. These checks are used to notify the responsible team so that they can deal with the problem as soon as possible.
During the software update, we enable the new version for some users and use these real-time metrics to compare the performance of the new version with the old version. In the metric, if there is anything inappropriate, we can abort the update and restore those users who have obtained the new version to the previous version.
Because the processing speed of this data is more than 2 million times per second, it is very difficult to store it in a database that can be queried quickly. We need enough data dimensions to effectively isolate the problem, so that we generate more than 1150 billion rows of data every day. At Netflix, we use Apache Druid to help us solve this challenge on this scale.
1. Druid
Apache Druid is a high-performance real-time analysis database. It is designed for workflows that pay special attention to quick query and ingestion. Druid is especially suitable for real-time data visualization, ad hoc query, operational analysis and highly concurrent processing. -- druid.io
Therefore, Druid is well suited for our use cases, with high event data intake rates, high cardinality (high cardinality) and fast query requirements.
Druid is not a relational database, but some concepts can be transformed. We have data sources, not tables. Like relational databases, there are logical grouping of data represented as columns. Unlike relational databases, there is no concept of connections. Therefore, we need to make sure that the desired filtered or grouped columns are included in each data source.
There are three main types of columns in the data source-time, dimension, and measurement.
Everything in Druid is time-stamped. Each data source has a timestamp column, which is the main partitioning mechanism. Dimensions are values that can be used for filtering, querying, or grouping. Metrics are values that can be aggregated and are almost always numeric.
By removing the ability to perform connections and assuming that the data is timestamped, Druid can make some optimizations in storing, distributing, and querying data, so that we can extend the data source to trillions of rows and still achieve query response times of less than 10 milliseconds.
To achieve this degree of scalability, Druid divides the stored data into time blocks. The length of the time block is configurable. You can choose the appropriate range based on the data and use cases. For data and use cases, we use an hour block of time. The data in the time block is stored in one or more segments. Each segment contains all rows of data that belong to this time block, which is determined by its timestamp column. The size of the segment can be configured to the maximum number of lines or the total size of the segment file.
When querying data, Druid sends the query to all nodes in the cluster that belong to time blocks that are within the scope of the query. Before sending the intermediate results back to the query agent node, each node processes the query for the data it holds in parallel. The agent performs the final merge and aggregation before sending the result set back to the client.
two。 Uptake
Data insertion in this database is real-time, instead of inserting a single record into the data source, events (that is, our metrics) are read from the Kafka stream. Each data source uses one theme. In Druid, we use the Kafka indexing task, which creates multiple index workers distributed across real-time nodes (intermediate managers).
These indexers subscribe to topics and read their events from the stream. The indexer extracts values from the event message according to the ingestion specification and accumulates the created rows into memory. Once you have created a row, you can query it. Querying the block of time for the period being populated by the indexer will be serviced by the indexer itself. Since indexing tasks essentially perform two tasks, namely, ingesting and processing queries, it is important to send data to history nodes in a timely manner and unload query work to them in a more optimized way.
Druid can summarize data during ingestion to minimize the amount of raw data that needs to be stored. Rollup is a form of aggregation or pre-aggregation. In some cases, summary data can greatly reduce the size of the data that needs to be stored, and may reduce the number of rows. However, this storage reduction comes at a cost: we lose the ability to query a single event and can only query at a predefined query granularity. For our use case, we chose a query granularity of 1 minute.
During ingestion, if any rows have the same dimension and their timestamps are within the same minute (our query granularity), these rows are summarized. This means that by adding all the measures to the row and increasing the counter, we can know how many events contribute to the value of this row. This form of Rollup can significantly reduce the number of rows in the database, thus speeding up the query.
Once the accumulated number of rows reaches a certain threshold, or the segments are opened for too long, these lines are written to the segment file and unloaded to deep storage. The indexer then informs the coordinator that the fragment is ready so that the coordinator can tell one or more history nodes to load it. Once the segment is successfully loaded into the history node, it is unloaded from the indexer, and any queries against that data are now serviced by the history node.
3. Data management
It is conceivable that as the dimension cardinality increases, the likelihood of the same event occurring within the same minute decreases. Managing cardinality (for summarization) is a powerful means of achieving good query performance.
In order to achieve the intake speed we need, we can run many indexer instances. Even if the indexing task uses Rollup to merge the same rows, the chances of getting these peers in the same instance of an indexing task are very low. To solve this problem and achieve the best possible Rollup, we will run a task after all segments of a given time block have been passed to the history node.
The scheduled compression task fetches all segments of the time block from deep storage and runs the map/reduce job to recreate the segments and achieve a perfect summary. Then, the history node loads and publishes new segments, replacing and replacing the original segments that are not fully summarized. In our example, by using this additional compression task, the number of rows is reduced to 1 stroke 2.
Knowing when to receive all events in a given block of time is no small matter. There may be delayed arrival data on the Kafka, or it may take some time for the indexer to pass the fragment to the history node. To solve this problem, we will perform some restrictions and checks before running the compression.
First, we discard all data that arrived very late. We believe that these data are out of date in our real-time system. This sets the limits of data latency. Second, the compression task is delayed, which gives the segment enough time to unload to the history node in the normal flow. Finally, when the scheduled compression task for a given time block starts, it queries the segment metadata to see if any related segments are still written or passed. If so, it will wait a few minutes and try again. This ensures that all data is processed by the compression job.
Without these measures, we find that data is sometimes lost. Segments that are still written at the beginning of compression will be overwritten by newly compressed segments, which have a later version and will therefore take precedence. This effectively removes data contained in segments that have not yet been transferred.
4. Query
Druid supports two query languages: Druid SQL and native queries. At the bottom, the Druid SQL query is converted to the native query. The native query is submitted to the REST endpoint in JSON format, which is the main mechanism we use.
Most of the queries in our cluster are generated by custom internal tools such as dashboards and early warning systems. These systems were originally designed to work with our internally developed open source timing database Atlas. Therefore, these tools use the Atlas Stack query language.
To speed up the adoption of query Druid and reuse existing tools, we added a translation layer to receive Atlas queries, rewrite them as Druid queries, send queries, and reformat results into Atlas results. This abstraction layer allows existing tools to be used as is, and users don't need to learn extra to access the data in our Druid data store.
5. Tuning
When adjusting the configuration of cluster nodes, we run a series of repeatable and predictable queries at a high speed to obtain a benchmark for response time and query throughput for each given configuration. These queries are designed to isolate parts of the cluster to check for improvement or degradation in query performance.
For example, we do targeted queries on recent data so that only Middle Manager is queried. Similarly, for longer but older data, we only query the history node to test the cache configuration. Similarly, check how merging results are affected using queries grouped by high cardinality dimension. We continue to adjust and run these benchmarks until we are satisfied with the query performance.
In these tests, we find that adjusting the size of the buffer, the number of threads, the length of the query queue and the memory allocated to the query cache have a practical impact on query performance. However, the introduction of compression jobs has a more important impact on query performance, which will recompress the segments that are not fully summarized to achieve perfect summarization.
We also found that enabling caching on the history node is very beneficial, while the effect of enabling caching on the proxy node is not obvious. Therefore, we do not use caching on proxies. This may be caused by our use case, but almost every query misses the cache on the proxy, probably because the query usually contains the latest data, which is not in any cache because the data arrives all the time.
At this point, I believe you have a deeper understanding of "how Netflix achieves 2 million data processing per second". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.