In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to understand the log-centered big data management style of LinkedIn". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how to understand the log-centric big data management style of LinkedIn".
The following is the original text:
I joined LinkedIn at an exciting moment six years ago. Since then we have cracked the limitations of a single, centralized database and initiated the conversion to a special distributed system suite. This is an exciting thing: the distributed graphical database that we build, deploy, and still run today, the distributed search back end, the Hadoop installation, and the first and second generation key data storage.
The most useful thing we have learned from all this is that there is a simple idea at the core of many of the things we build: journaling. Sometimes it is also called pre-write log or commit log or transaction log, which exists almost when the computer is generated, and it is also the core of many distributed data systems and real-time application structures.
Without logging, you can't fully understand databases, NoSQL storage, key storage, replication, paxos,Hadoop, version control, and almost all software systems; however, most software engineers are not familiar with them. I am willing to change this situation. In this blog post, I'll show you everything you need to know about logs, including what logs are, how to use them in data integration, real-time processing, and system construction.
The first part: what is the log?
Logging is a simple storage abstraction that can no longer be simple. It is a series of records that can only be added, sorted entirely by time. The log looks like this:
We can add records to the end of the log, and we can read the log records from left to right. Each record is assigned a unique log record number in a certain order.
The order of log records is determined by "time" because the log records on the left are earlier than those on the right. The log record number can be thought of as the "timestamp" of this log record. It is a bit redundant to describe this sort as sorting by time at the beginning, but the time attribute is a very easy-to-use attribute compared to any specific physical clock. This property is very important when we are running multiple distributed systems.
For the purpose of this discussion, the content and format of logging are not very important. As a reminder, it is not possible to add records to the log when the storage space is completely exhausted. We will talk about this later.
Logs are not completely different from files or data tables. A file is composed of a series of bytes, a table is made up of a series of records, and a log is really just a data table or file that stores records in chronological order.
At this point, you may wonder why you are talking about such a simple thing. How does a log record that can only be added in a certain order in different environments be associated with the data system? The answer is that the log has a specific application goal: it records when and what happens. For many aspects of distributed data systems, this is the real core of the problem.
However, before we go any further, let me clarify some confusing concepts. Every programmer is familiar with another type of logging-applications that use syslog or log4j to write unstructured error messages or trace messages to local files. In order to distinguish, we call the logging of this situation "application logging". Application logging is a low-level variant of what I'm talking about here. The biggest difference is that the text log means that it is mainly used for people to read, while the establishment of the "log" or "data log" I have described is convenient for program access.
(in fact, if you think about it deeply, the idea that people read logs on a machine is a bit out of line with the times. When it comes to many services and servers, this approach quickly becomes a difficult way to manage, and in order to recognize the behavior of multiple machines, the goal of the log quickly becomes the input of queries and graphical these behaviors-for some behaviors of multiple machines, the English text in the file is almost inappropriate compared to the structured log described here. )
Database log
I don't know where the concept of logging originated-maybe it's like binary search: the inventors thought it was too simple to be an invention. It appeared as early as when IBM's system R appeared. The usage in the database is to synchronize various data structures and indexes in the event of a crash. In order to ensure the atomicity and persistence of the operation, the database transcribes the information to be modified into the log before making changes to all the various data structures maintained by the database. The log records what happened, and each table or index is a historical mapping of some data structure or index. Because the log is immediately permanent, it can be used as a trusted data source to restore all other persistent structures in the event of a crash.
Over time, the use of logs has grown from implementing ACID details to a way to replicate data between databases. The result of using the log is that the order of changes that occur on the database needs to be fully synchronized with those on the remote replicated database.
Both Oracle,MySQL and PostgreSQL include log transfer protocols for transferring logs to alternate replicated databases. Oracle also products logging into a general data subscription mechanism so that non-Oracle data subscribers can subscribe to data using XStreams and GoldenGate, and similar implementations on MySQL and PostgreSQL are key components of many data structures.
It is because of this origin that the concept of machine-readable logs is largely confined to the database. The mechanism used by logs for data subscriptions seems to be accidental, but it is impractical to use this abstraction to support all types of message transmission, data flow, and real-time data processing.
Distributed system log
Logging solves two problems: the sorting of change actions and the distribution of data, which are particularly important in distributed data systems. Agreeing to change the order of actions (or maintaining the practice of each subsystem itself, but copying data with side effects) is one of the core issues in distributed system design.
The log-centric implementation of a distributed system is inspired by a simple common sense of experience, which I call the state machine replication principle: if two identical, deterministic processes start in the same state and get the same input in the same order, then the two processes will generate the same output and end in the same state.
This may be a little difficult to understand, let's discuss it more deeply and understand its true meaning.
Certainty means that the process is time-independent, and any other "external" input does not affect the processing result. For example, if the output of a program is affected by the specific order in which threads execute, or by gettimeofday calls, or other non-repetitive events, such programs are generally most likely to be considered non-deterministic.
The process state is any data saved by the process on the machine, which is either stored in memory or on disk at the end of the process processing.
Attention should be paid to getting the same input in the same order-this is where the log is introduced. There is an important common sense here: if you give the same log input to two pieces of deterministic code, they will generate the same output.
The application of distributed computing is particularly obvious. You can reduce the problem of performing the same thing with multiple machines to the problem of implementing distributed consistency log input for these processes. The purpose of logging here is to exclude all non-deterministic things from the input stream to ensure that each replication process can process input synchronously.
When you understand this, the state machine replication principle is no longer complex or esoteric: this more or less means that "deterministic processing is deterministic." In any case, I think it is one of the more common tools in distributed system design.
One beauty of this approach is that the timestamp of the index log is like a copy of the clock state-you can describe each copy with a separate number, which is the timestamp of the processed log. The timestamp and log correspond to the status of the entire copy.
Because of the different contents written into the log, there are many different ways to apply this principle in the system. For example, let's record a request for a service, or the state change of the service from the request to the response, or it performs the transition of the command. In theory, we can even record a series of machine instructions to be executed or the method names and parameters to be called for each copy. As long as the two processes process these inputs in the same way, the processes will maintain the consistency of the copies.
There are a thousand uses of journals in the eyes of a thousand people. Database workers usually distinguish between physical logs and logical logs. A physical log is to record what is changed in each line. The logical log records not the changed rows, but the SQL statements (insert,update and delete statements) that cause the contents of the rows to be changed.
Distributed systems can be broadly divided into two ways to process data and complete responses. The "state machine model" usually refers to an active-active model-the object for which we record requests and responses. A minor change to this, called the "pre-backup model," is to select a copy as the leader and allow it to process the request at the time it arrives and output a log recording its state changes from the process. Other replicas apply those changes in the order in which the leader state changes so that they are synchronized and can take over from leader if the leader fails.
To understand the differences between the two ways, let's look at a less rigorous example. Suppose you have a copy of the algorithm service, keep a separate number as its state (the initial value is 0), and add and multiply this value. The active-active approach should output the changes made, such as "+ 1", "* 2", etc. Each copy applies these transformations to get the same set of solutions. In the active-passive mode, there will be an independent principal to perform these transformations and output the result log, such as "1", "3", "6" and so on. This example also clearly shows why order is the key to consistency among copies: a change in the order of addition and multiplication will lead to different results.
Distributed logs can be understood as the data structure of the consistency problem model. Because the log represents a series of decisions for subsequent added values. You need to re-examine the Paxos algorithm cluster, although the logging module is their most common application. In the Paxos algorithm, it usually uses a protocol called multi-paxos, which models the log as a series of problems, and each problem has a corresponding part in the log. In other protocols such as ZAB, RAFT and so on, log plays a more important role. It directly models the problem of maintaining distributed and consistent logs.
What I suspect is that our view of historical development is biased, perhaps because in the past few decades, the theory of distributed computing has far exceeded its practical application. In reality, the problem of consensus is a little too simple. Computer systems rarely need to determine individual values, and they almost always process requests as sequences. Such a record, rather than a simple single-valued register, is naturally more abstract.
In addition, focusing on algorithms obscures the underlying logs needed by abstract systems. I suspect that we will eventually pay more attention to the journal as a cornerstone of commercialization, whether or not it is implemented in the same way, we often talk about a hash table rather than obsessing over whether we get a hash table with a specific detail. for example, linear or what other variant hash table. Logging will become a popular interface, providing the best guarantee and performance for most algorithms and their implementation.
Change log 101: the duality of tables and events.
Let's keep talking about the database. There is a large amount of dichotomy between change logs and tables in the database. These logs are a bit like loan lists and bank processes, and database tables are the current earnings tables. If you have a large number of change logs, you can use these changes to create tables that capture the current state. This table will record the status information of each key point (a particular point in time in the log). This is why logs are a very basic data structure: logs can be used to create basic tables as well as various derivative tables. It also means that non-relational objects can be stored.
The process is also reversible: if you are updating a table, you can record the changes and publish all updated logs to the table's status information. These change logs are what you need to support quasi-real-time cloning. Based on this, you can clearly understand the duality of tables and events: tables support static data while logs capture changes. The charm of the log is that it is a complete record of changes, not only capturing the contents of the final version of the table, but also recording information about other versions that have existed. A log is essentially a series of backups of the historical state of a table.
This may lead to version management of your source code. There is a close relationship between source code control and database. Version management solves a very familiar problem, that is, what distributed data systems need to solve-distributed management, which is changing all the time. Version management systems are usually based on the release of patches, which may actually be a log. You can directly make "snapshot" interactions with code that is currently similar to that in the table. You will notice that, like other distributed stateful systems, the version control system copies logs when you update, and all you want is to update the patches and apply them to your current snapshot.
Recently, some people got some ideas from Datomic, a company that sells log databases. These ideas give them a broad understanding of how to apply these ideas to their systems. Of course, these ideas are not just for this system, they will be part of the literature on distributed systems and databases for more than a decade.
This may seem a little too idealistic. But don't be pessimistic! We will make it happen soon.
Let me first explain what "data integration" means and why I think it is important, and then let's see what it has to do with logging.
Data integration is about organizing data so that it can be accessed in the services and systems associated with it. The phrase "data integration" should be more than that, but I can't find a better explanation. The more common term ETL usually covers only a limited subset of data integration-as opposed to relational data warehouses-ETL,Extraction-Transformation-Loading. But what I describe is largely understood as extending ETL to real-time systems and processing processes.
You won't be interested in holding your breath when you hear about data integration and hype about the concept of big data, but I believe that the secular problem of "making data accessible" is a valuable thing that an organization should pay attention to.
The efficient use of data follows Maslow's hierarchy of needs theory. The basic part of the pyramid includes capturing all the relevant data and being able to put it all in the appropriate processing environment (which should be a wonderful real-time query system, or just text files and python scripts). These data need to be modeled in a unified way so that they can be easily read and processed. If this basic need to capture data in a uniform manner is met, then the data can be processed in several ways on the infrastructure-mapping simplification (MapReduce), real-time query systems, and so on.
Obviously, it is worth noting that without a reliable and complete data flow, the Hadoop cluster will do nothing more than expensive and difficult to install space heaters. Once data and processing are available, people will be concerned about the more nuanced issues of a good data model and a consistently easy-to-understand syntax. Finally, people will focus on more advanced processing-better visualization, reporting, and processing and prediction algorithms.
In my experience, most organizations have huge loopholes at the bottom of the data pyramid-they lack reliable, complete data streams-and instead intend to jump directly to advanced data model technologies. This is completely the opposite. Therefore, the question is how do we build reliable data flows through all data systems in the organization.
Data integration: two complications
Two trends make data integration more difficult.
Event data pipeline
The first trend is growing event data (event data). Event data records what happens, not what exists. In web systems, this means a log of user activity, as well as machine-level events and statistics that need to be recorded for reliable operations and for monitoring machines in the data center. People tend to call them "log data" because they are often written to the log of the application, but this confuses form with function. This data is at the heart of modern web: in the final analysis, Google's assets are generated by such related channels based on clicks and images-that is, events.
These things are not limited to Internet companies, but Internet companies are completely digital, so they are easier to record with devices. Financial data have always been event-oriented. RFID (Radio Frequency Identification) gives this tracking capability to physical objects. I think this trend will continue, accompanied by the digitization of traditional business activities.
This type of event data records what happens and is often several orders of magnitude larger than traditional database applications. This poses a major challenge to dealing with it.
The outbreak of specialized data systems
The second trend comes from the explosion of specialized data systems, which have usually become popular in the last five years and are available for free. Specialized data systems exist for OLAP, search, simple online storage, batch processing, image analysis, etc.
The combination of more different types of data, and the desire to store it in more systems, leads to a huge data integration problem.
Log structure data flow
In order to handle the data flow between systems, logs are the most natural data structure. The secret is simple:
Extract data from all organizations and put them in a central log for real-time access.
Each logical data source can be modeled as its own log. A data source can be an application's event log (such as clicks or page views), or a database table that accepts changes. Each system that subscribes to messages reads information from the log as quickly as possible, saves each new record to its own storage, and improves its status in the log. Subscribers can be any kind of data system-a cache, Hadoop, another database in another website, a search system, and so on.
For example, the log gives the concept of a logical clock for each change so that all subscribers can be measured. As a result, it is relatively easy to derive the state of different subscription systems, because each system has a "point in time" for reading actions.
To make this seem more specific, let's consider a simple case where there is a database and a cluster of cache servers. The log provides a way to update all of these systems synchronously and derive the contact time point for each system. Let's assume that we write a log X and then need to do a read from the cache. If we want to make sure that we are not seeing stale data, we just need to make sure that we are not reading from any cache that has not yet copied X.
Logs also act as caches, synchronizing data production with data consumption. This feature is important for many reasons, especially when multiple subscribers consume data at different speeds. This means that a subscription data system can be down, or offline for maintenance, and then come back online and catch up: subscribers consume data at their own beat. Batch systems, such as Hadoop or a data warehouse, may only consume data hourly or daily, while real-time query systems may need to be timely to seconds. Since neither the original data source nor the log has the relevant knowledge of the various target data systems, the consumer system can be added and deleted without changes in the transmission pipeline.
It is particularly important that the target system only knows the log and does not know any details of the data source system. The consumer system itself does not need to consider whether the data comes from a RDBMS (relational database management system Relational Database Management System), a new type of key value storage, or it is not generated by any form of real-time query system. This may seem like a small problem, but in fact it is crucial.
Here I use the term "log" instead of "messaging system" or "publish-subscribe" because it is more semantically explicit and has a closer description of the need to support the actual implementation of data replication. I've found that "publish and subscribe" doesn't have more meaning than indirectly addressed messages-if you compare any two publish-subscribe messaging systems, you'll find that they promise completely different things. and most models are not useful in this field. You can think of logging as a messaging system with persistence guarantees and strong subscription semantics. In distributed systems, this communication model is sometimes (somewhat frightening) called atomic broadcasting.
It is worth emphasizing that logging is still just infrastructure. This is not the end of the story of managing data flows: the rest of the story revolves around metadata, schemas, compatibility, and dealing with all the details of data structures and their evolution. Unless there is a reliable, general way to deal with the operation of data flows, semantics are always secondary details.
On LinkedIn (SNS social networking site)
I saw this data integration problem evolve rapidly during the transformation of LinkedIn from a centralized relational database to a distributed system collection.
Today's major data systems include:
Search
Social graph
Voldemort (key value Storage)
Espresso (document Storage)
Push engine
OLAP query engine (OLAP online Analysis Technology)
Hadoop
Terradata
Ingraphs (monitoring chart and indicator service)
These are specialized distributed systems that provide advanced functions in their areas of expertise.
The idea of using logs as data streams was with LinkedIn even before I got here. One of the earliest infrastructures we developed was a service called databus, which provided a log cache abstraction on our early Oracle tables that was scalable to subscribe to database changes so that we could well support our social networking and search indexes.
I will give some history and explain the context. The first time I was involved in this was around 2008, after we moved the key value store. My next project is to evolve a working Hadoop configuration and add some of our recommended processes to it. Due to the lack of experience in this area, we naturally planned for several weeks on the import and export of data, and the rest of the time was spent on implementing wonderful prediction algorithms. In this way, we began a long journey.
Our original plan was to simply separate the data from the existing Oracle data warehouse. But we first found that getting data out of Oracle quickly is a dark art. To make matters worse, the processing of the data warehouse is not suitable for the batch production process we planned for Hadoop-most of its processing is irreversible and specific to the reports that are about to be generated. In the end, our approach is to avoid using the data warehouse and access the source database and log files directly. Finally, we implemented another pipeline to load the data into the key value store and generate the results.
This common data replication eventually became one of the main contents of the original development project. Unfortunately, there is a problem with any pipeline at any time, and the Hadoop system is largely useless-running strange algorithms on the wrong data will only produce more wrong data.
Although we have created things in a common way, each data source requires a custom configuration installation. This has also proved to be the root cause of a large number of errors and failures. The website features we implemented on Hadoop have become popular, and we have found that we have a long list of interested engineers. Each user has a series of systems they want to integrate and a series of new data sources they want.
Something is beginning to become clear in front of me.
First of all, although the passageways we have built are somewhat messy, they are very valuable in essence. It opens up a number of possibilities in the process of using new processing systems such as Hadoop to generate available data. Based on calculations that were difficult to achieve in the past, it is now possible. Many new products and analytical techniques come from putting fragmented data together, which are locked in specific systems.
Second, it is well known that reliable data loading requires the deep support of the data channel. If we can capture all the structures we need, I can make the Hadoop data load automatically, so that no additional operations are required to add new data sources or handle schema changes-the data will automatically appear in the HDFS,Hive table and the appropriate columns corresponding to the new data source will be automatically generated.
Third, our data coverage is still very low. If you look at the full percentage of available Linked data stored in Hadoop, it is still incomplete. It takes a lot of effort to get each new data source up and running, and it is not easy to make the data coverage complete.
What we are pursuing is to add custom data loading for each data source and target, which is obviously not feasible. We have a lot of data systems and data warehouses. Linking these systems to the warehouse will cause any pair of systems to produce customized channels as shown below.
It is important to note that data flows in both directions: for example, many systems such as databases and Hadoop are both sources and destinations for data transformation. This means that we do not have to establish two channels for each system: one for data input and one for data output.
This obviously requires a large group of people, and it is not operable. As we approach full connectivity, eventually we will have almost O (N2) pipes.
Instead, we need something universal like this:
We need to isolate each consumer from the data source as much as possible. Ideally, they should be integrated with only a single data warehouse, thus giving them access to everything.
The idea is to add a new data system-- either it's a data source or it's a data destination-- so that the integration effort only needs to be connected to a separate pipeline, without having to connect to each data consumer.
This experience led me to focus on creating a Kafka to correlate what we see in messaging systems with logs published by databases and distributed system kernels. We want some entities to act as central channels, first for all active data, and gradually expand to other uses, such as data implementation outside Hadoop, data monitoring, and so on.
Kafka has been a unique underlying product for quite a long time. It is neither a database nor a log file collection system, let alone a traditional messaging system. But recently Amazon offers a service that is very similar to Kafka, called Kinesis. Similarity includes the way sharding is processed, data retention, and even the somewhat special classification of high-end and low-end consumers in Kafka API. I'm glad to see this, which shows that you've created a good underlying protocol that AWS has provided as a service. Their expectations are consistent with what I have described: the channel connects all distributed systems, such as DynamoDB, RedShift, S3, etc., and serves as the basis for distributed stream processing using EC2.
Relationship with ETL and data Warehouse
Let's talk about the data warehouse again. A data warehouse is a warehouse that is used to clean and normalize data structures to support data analysis. This is a great idea. For those who are not familiar with the concept of a data warehouse, the data warehouse methodology includes periodically extracting data from a data source, transforming it into an understandable form, and then importing it into a central data warehouse. For centralized data analysis and processing, having a highly centralized location to store the original copy of all data is a very valuable asset. At a high level, maybe you adjust the order in which you extract and load data slightly, and this methodology won't change much, whether you use traditional data warehouse Oracle or Teradata or Hadoop.
A data warehouse is an extremely important asset that contains raw and structured data, but the mechanism for achieving this goal is a bit out of date.
The key problem for data-centric organizations is to connect the original normalized data to the data warehouse. Data warehouses are the basic queries for batch processing: they are suitable for all kinds of reports and temporary analysis, especially when queries include simple counting, aggregation, and filtering. However, if a batch processing system only contains the original complete data warehouse, it means that the data is not available for real-time queries such as real-time data processing, search index and system monitoring.
In my opinion, ETL includes two things: first, it is the process of extraction and data cleaning-in particular, releasing data locked in various systems of the organization and removing system-specific useless objects. Second, reconstruct the data according to the query of the data warehouse. For example, it conforms to the relational database type system, enforces the use of asterisks, snowflake schemas, or decomposes into high-performance columnar formats. It is difficult to merge the two. These regular data sets should be available in real-time or low-latency processing, as well as other storage system indexing.
In my opinion, it is for this reason that there is an added benefit: making the data warehouse ETL more organization-level. The classic problem of the data warehouse is that the data warehouse is responsible for collecting and cleaning all the data generated by each group in the organization. The motivation of each organization is different, the data producer does not know the use of the data in the data warehouse, the resulting data is difficult to extract, or it takes a large-scale transformation to be transformed into an available form. Of course, it is impossible for the central team to have the right size to match other teams in the organization, so data coverage often varies widely, data flow is fragile and change is slow.
A better approach is to have a central channel, logs, and a well-defined API for adding data. The responsibility of providing well-structured data files integrated with channels depends on the data files generated by the data producer. This means that when designing and implementing other systems, we should consider the output of the data and how the output data is transformed into a well-structured form and passed to the central channel. Adding a new storage system does not have to focus on the data warehouse team because it has a central node to integrate. The data warehouse team only needs to deal with simple problems, such as loading structured data from the central log and implementing personalized data conversion to other surrounding systems.
As shown in the figure, the scalability of the organizational structure is particularly important when considering adding additional data systems to the traditional data warehouse. For example, you might consider providing search capabilities for your organization's complete dataset. Or provide two-level data flow to monitor real-time data trends and alarms. No matter which of the two, the traditional data warehouse architecture or even Hadoop clustering is no longer applicable. To make matters worse, the purpose of the process channel of ETL is to support data loading, but ETL does not seem to be exported to other systems, nor can it be bootstrapped to make the architectures of these peripheral systems important assets for data warehouses. This is not difficult to explain why it is difficult for an organization to easily use all of its data. On the other hand, if the organization has established a set of standard, well-structured data, then any new system to use this data can be achieved simply by integrating with the channel.
This architecture leads to a different view of the stage at which data cleaning and transformation takes place:
Before the producer of the data adds the data to the company's global log.
During the real-time conversion phase of the log, this will result in a new conversion log.
When loading data into the target system, as part of the loading process.
The ideal pattern is for the data producer to clean up the data before publishing it to the log. This ensures the authority of the data without the need to maintain other relics such as the special processing code generated for the data or other storage systems that maintain the data. These details should be handled by the team that produces the data, because they know their own data best. Any logic used at this stage should be lossless and reversible.
Any type of value-added conversion that can be done in real time should be post-processed based on the original log. This process includes the session flow of event data or the addition of derived fields of public interest. The original log is still available, but the derived log resulting from this real-time processing contains parameter data.
In the end, only the aggregation for the target system needs to be part of the loading process. It includes converting data into specific star or snowflake patterns for data warehouse analysis and reporting. Because at this stage, most of the natural mapping to the traditional ETL process, and now it is in a cleaner and organized data stream set, it will be more simple.
Log files and events
Let's talk about the advantages of this architecture: it supports decoupling and event-driven systems.
A typical way to obtain active data in the network industry is to record it as a text log, which can be decomposed into a data warehouse or Hadoop for aggregation and query processing. The resulting problem is the same as that of all batch ETL: it couples the ability of data streams to enter the data warehouse system and the scheduling of processes.
In LinkedIn, we have built event data processing in the form of a central log. We are using the Kafka-centric, multi-subscriber event log. We have defined hundreds of event types, each of which captures unique properties for a particular type of action. This will cover all aspects, including page views, expressions, search, as well as service calls, application exceptions, and so on.
To further understand this advantage: imagine a simple transaction-displaying published logs on the log page. This log page should include only the logic needed to display the log. However, in a considerable number of dynamic sites, log pages often add a lot of logic that has nothing to do with displaying logs. For example, we will integrate the following systems:
Data needs to be transferred to Hadoop and data warehouses for offline data processing.
The view needs to be counted to ensure that view subscribers do not attack some pieces of content.
These views need to be aggregated and will be used for the analysis page display of the job publisher.
We need to record the view to ensure that we provide appropriate impression coverage for the users recommended by the job, and we don't want to repeat the same thing over and over again.
The recommended system needs to record logs to correctly track the popularity of jobs.
Wait.
Soon, the simple homework became quite complex. We have added other terminals for job display-mobile terminal applications, etc.-these logic must continue to exist and the complexity is increasing. To make matters worse, the systems we need to interface with are now complex-engineers working to display daily tasks need to know a number of other systems and their characteristics to make sure they are properly integrated. This is only a simple version of the problem, the real application system will only be more complex.
The "event-driven" model provides a mechanism to simplify such problems. The job display page now displays only jobs and records other properties related to the job being displayed, job subscribers, and other valuable properties related to job display. Every other system related to this, such as the recommendation system, the security system, the job push analysis system, and the data warehouse, all just subscribe to seed files and operate on them. The display code does not need to focus on other systems, nor does it need to be changed accordingly because of the increase in the number of consumers of the data.
Build scalable logs
Of course, separating publishers from subscribers is nothing new. But if you want to ensure that the behavior of submitting logs records everything that happens to the site like multiple subscribers' real-time classified logs, scalability will be your number one challenge. If we cannot create fast, cost-effective and scalable logs to meet the actual scalable needs, it is no longer a good idea to use logs as a unified integration mechanism.
It is generally accepted that distributed logging is a slow, heavy concept (and it is usually associated only with the use of the "raw data" type, for which Zookeeper is applicable). However, it is impractical to deeply implement and focus on classifying and recording large-scale data streams. At LinkedIn, we now run more than 60 billion different message write points over Kafka every day (if we were to write between the statistical mirror and the data center, that number would be in the hundreds of billions.)
We used some tricks in Kafk to support this extensibility:
Log fragmentation
Optimize throughput by batch reading and writing
Avoid useless data replication.
To ensure horizontal scalability, we slice the log:
Each slice is an orderly log, but there is no global order between the slices (this is different from the wall clock time that you may include in the message). Assigning messages to specific log fragments is controlled by the writer, and most users will slice through key values such as user ID. Slicing can append logs to segments where there is no cooperation, and it can also make the throughput of the system linearly proportional to the size of Kafka clusters.
Each slice is reproduced through a configurable number of copies, and each copy has an exact copy of the slice. At any time, any one of them can be used as the main shard, and if the master shard goes wrong, any copy can be taken over and done as the master shard.
The lack of global order across shards is the limitation of this mechanism, but we don't think it is the most important. In fact, interactions with logs come mainly from hundreds of different processes, so much so that it doesn't make sense to put them in an overall order of behavior. Instead, what we can make sure is that each slice we provide is kept in order. Kafka ensures that specific fragments appended to a single sender will be processed in the order in which they are sent.
Logs, like file systems, are easily optimized to be linearly readable and writable. Logs can combine small reads and writes into large, high-throughput operations. Kafka has been committed to achieving this optimization goal. Batch processing can occur in many links, such as sending data from the client to the server, writing to disk, copying between servers, passing data to consumers, confirming the submission of data, and so on.
Finally, Kafka maintains memory logs, disk logs, and network data transfers in a simple binary form. This allows us to use a large number of optimization mechanisms, including "zero data replication transfer".
The cumulative effect of these optimizations is that the operations you often do to write and read data can be supported on disk and network, and even maintain a large number of datasets outside of memory.
So far, I'm just describing the ideal mechanism for data replication from end to end. But moving bytes in a storage system is not the whole story. In the end, we found that logging is another way of saying flow, and logging is the core of flow processing.
But, wait, what is streaming?
If you are a fan of database culture or data infrastructure products in the late 1990s or early 21st century, you may associate streaming with building a SQL engine or creating "boxes and arrows" interfaces for event-driven processing.
If you are concerned about the proliferation of open source database systems, you may associate streaming with some open source database systems, including: Storm,Akka,S4 and Samza. But most people think of these systems as asynchronous messaging systems, which are no different from the application of the remote process call layer that supports clustering (and in fact this is true in some aspects of open source database systems).
These views have some limitations. Stream processing is independent of SQL. It is also limited to real-time stream processing. There is no inherent reason to limit your ability to process streaming data from yesterday or a month ago, and to express your calculations in many different languages.
I see stream processing as a broader concept: the infrastructure for continuous data flow processing. I think computing models can be as common as MapReduce or distributed processing architectures, but capable of handling results with low latency.
The real-time driver of the processing model is the data collection method. Data collected in batches is processed in batches. The data is constantly collected, and it is processed sequentially.
Statistical surveys in the United States are a good example of collecting data in batches. Statistical surveys are carried out periodically, through door-to-door visits, the use of brute force to discover and count information about citizens of the United States. This worked at the beginning of the survey in 1790. At that time, data collection was batch-processed, which included leisurely riding, writing information on paper, and then transmitting batches of records to the central site of people's statistics. Now, when describing this statistical process, people immediately think about why we don't keep records of births and deaths so that demographic information can be generated either continuously or in other dimensions.
This is an extreme example, but a large number of data transfer processes still rely on periodic dumps, batch conversions, and integration. The only way to handle bulk dumps is to process them in bulk. But as these batches are replaced by a continuous supply, it is natural for people to start uninterrupted processing to smooth the required resources and eliminate delays.
LinkedIn, for example, has almost no bulk data collection. Most of the data is either active data or database changes, both of which occur continuously. In fact, any business you can think of, as Jack Bauer tells us, the low-level mechanisms are continuous process events that occur in real time. Data is collected in batches, and it always depends on artificial steps, or the lack of legacy information from digital or automated non-digital processes. When the mechanism for transmitting and processing this data is mail or manual processing, the process is very slow. The first round of automation always maintains the original form of processing, and it often lasts for quite a long time.
Batch processing jobs that run every day often simulate an uninterrupted calculation of the window size of the day. Of course, low-level data also change frequently. At LinkedIn, these are insightful, and the mechanisms that make them work in Hadoop are skillful, so we have implemented a set of architecture for managing incremental Hadoop workflows.
From this point of view, there can be different views on flow processing. Stream processing includes the concept of time in the underlying data processing, it does not need a static snapshot of the data, it can produce the output of user-controllable frequency without waiting for the full arrival of the data set. From this point of view, streaming is batch processing in a broad sense, and it will become more common with the popularity of real-time data.
This is why stream processing is a niche application from a traditional perspective. Personally, I think the biggest reason is that the lack of real-time data collection makes uninterrupted processing an academic concept.
I think the lack of real-time data collection is like the fate of commercial stream processing systems. Their customers still need to deal with file-oriented, daily batch processing of ETL and data integration. The company built a stream processing system that focused on providing a processing engine attached to real-time data streams, but in the end, very few people actually used real-time data streams. In fact, in the early days of my work at LinkedIn, there was a company that tried to sell us a great streaming system, but because all our data was collected in files by the hour, the best application we proposed at that time was to input these files into the streaming system at the end of each hour. They noted that this was a universal problem. These exceptions prove the following rules: one of the important business goals to be met by the stream processing system is finance, which is the benchmark for real-time data flow, and stream processing has become a bottleneck.
Even in a healthy batch system, the practical application capability of stream processing as an infrastructure is quite extensive. It crosses the gap between real-time data request-reply service and offline batch processing. In today's Internet companies, about 25% of the code can be classified into this type.
Finally, these logs solve most of the key technical problems in flow processing. In my opinion, the biggest problem it solves is that it makes real-time data available to multiple subscribers. For those who are interested in these technical details, we can use the open source Samza, which is a stream processing system based on these ideas. More technical details of these applications are described in detail in this document.
Data flow graph
The most interesting aspect of stream processing is that it has nothing to do with the interior of the flow processing system, but what is closely related to it is how to extend the concept of data acquisition in the early data integration we talked about. We mainly discuss the acquisition of basic data or log-events and the data generated in the execution of all kinds of systems. But streaming allows us to include data that calculates other data. These derived data are no different from the original data they calculated in the eyes of consumers. These derived data can be compressed at any complexity.
Let's go one step further. Our goal is that the stream processing job can read any log and write the log to the log or other system. The logs they use for input and output relate these processes to a set of processes. In fact, with this style of centralized logging, you can think of organizing all data crawls, transformations, and workflows as a series of logging and writing processes.
A stream processor doesn't need an ideal framework at all: it could be any processor or collection of processors that read and write logs, but additional infrastructure and assistance can help manage processing code.
The goal of log integration is twofold:
First, it ensures that each dataset has multiple subscribers and ordered. Let's review the principle of state replication to remember the importance of order. To make this more specific, imagine updating the data stream from the database-if we reorder two updates to the same record during processing, it may produce incorrect output. Links such as TCP are limited to a single point-to-point link, and this order is more persistent than links such as TCP, which can still exist when the process fails and reconnects.
Second, the log provides a buffer for the process. This is very basic. If the processing flow is asynchronous, the job that generates the flow data upstream runs faster than the job that generates the downstream consumption flow data. This will cause the processing process to block, or buffer the data, or discard the data. Discarding data is not a feasible method, and blocking will cause the entire flowchart to stop immediately. Logging is actually a very large buffer that allows the process to restart or stop without affecting the processing speed of the rest of the flowchart. This isolation is extremely heavy if the data flow is to be extended to a larger organization, and if the processing job is provided by multiple different teams. We cannot tolerate the background pressure caused by a wrong job, which will stop the whole processing process.
Both Storm and Sama are designed asynchronously and can use Kafka or other similar systems as their logs.
Stateful real-time stream processing
Some real-time stream processing is stateless at the time of conversion. Most applications in stream processing will be quite complex statistics, aggregations, and associations between different windows. For example, sometimes people want to expand the event stream that contains user action information (a series of click actions)-actually associating the user's click action flow with the user's account information database. What remains unchanged is that such processes eventually require some state information maintained by the processor. For example, in data statistics, you need to count the counters that need to be maintained so far. If the processor itself fails, how to maintain this state information correctly?
The simplest alternative is to keep this state information in memory. But if the process crashes, it loses its intermediate state. If the state is maintained by window, the process falls back to the point in time at the beginning of the window in the log. However, if the statistics are carried out on an hourly basis, then this approach will not be feasible.
Another alternative is to simply store all state information to a remote storage system and associate it with that storage over the network. The problem with this mechanism is that there is no local data and a large amount of communication between networks.
How can we support data that can be partitioned like a table?
Review the discussion about the duality of tables and journals. This mechanism provides tools to transform data streams into tables that are co-located with the processing process, as well as a mechanism for fault-tolerant processing of these tables.
The stream processor can save its state in a local table or index-bdb, or leveldb, or even an index as unusual as Lucene or fastbit. This content is stored in its input stream (perhaps using arbitrary conversions). The generated change log records the local index, which allows you to store status information about events such as crashes, restarts, and so on. Stream processing provides a general mechanism for saving the state of common fragments in a random index of local input stream data.
When the process fails, it restores its index from the change log. At each backup, the log converts the local state into a series of incremental records.
One advantage of this state management approach is that the state of the processor is also maintained as a log. We can think of these logs as change logs corresponding to database tables. In fact, these processors also maintain tables like common shard tables. Because these states are logs themselves, other processors can subscribe to it. This approach is particularly important if the goal of the process processing is to update the final state of the node, which is the output of the process.
For data integration, the duality of logs and database tables is clearer when associated with logs from the database. The change log can be extracted from the database, and the log can be indexed in different ways by different stream processors (which are used to associate different streams of events).
We can list more details and a large number of practical examples of stateful flow processing management in Samza.
Log compression
Of course, we can't expect to keep a complete log of all changes. Unless you want to use unlimited space, the log cannot be completely cleared. To clarify this, let's talk about the implementation of Kafka. In Kafka, there are two options for cleanup, depending on whether the data includes critical updates and event data. For event data, Kafka supports maintaining data for only one window. Typically, configuration takes some time, and windows can be defined by time or space. Although for critical data, an important feature of the full log is that you can reproduce the status information of the source system, or in other systems.
Over time, keeping the log intact uses more and more space, and it takes longer and longer to reproduce. Therefore, in Kafka, we support different types of reservations. Instead of simply discarding the old logs, we remove obsolete records (whose primary keys have been updated recently). We still ensure that the log contains a full backup of the source system, but now we no longer reproduce the full state of the original system, but only the most recent state. We call this feature log compression.
The last thing we want to discuss is the role of logs in online data system design.
The role of logs in distributed database data streams is similar to that of logs in data integrity of large organizations. In both application scenarios, logs are reliable, consistent, and recoverable for data sources. If an organization is not a complex distributed data system, what is it?
If you look at it another way, you can see that the entire organizational system and data flow are seen as a single distributed data system. You can think of all sub-query systems (such as Redis, SOLR,Hive tables, etc.) as specific indexes of data. You can think of a stream processing system like Storm or Samza as a well-developed trigger and view materialization mechanism. I have noticed that traditional database managers like this view very much because it ultimately explains what these different data systems are for-they are just different index types.
There is no denying that there are a large number of such database systems now, but in fact, this complexity has always existed. Even in the heyday of relational database systems, there are a large number of relational database systems in organizations. Perhaps since the mainframe era, all data is stored in the same location, and real integration does not exist at all. There are a variety of external requirements, the need to decompose the data into multiple systems, these external requirements include: scale, geographical factors, security, performance isolation is the most common factor. These requirements can be met by a high-quality system: for example, organizations can use a single Hadoop cluster, which includes all the data, and can serve large and diverse customers.
Therefore, in the process of transition to distributed systems, there has been a simple way to deal with data: a large number of small instances of different systems are aggregated into large clusters. Many systems are not enough to support this approach: because they are not secure enough, or their performance isolation is not guaranteed, or their scale does not meet the requirements. But all these problems can be solved.
In my opinion, the reason for the emergence of a large number of different systems is that it is difficult to build distributed database systems. By reducing to a single query or use case, each system can be scaled down to a level that is easy to implement. However, the complexity of running these systems is still very high.
There are three possible trends for such problems in the future:
The first possibility is to maintain the status quo: isolated systems will last for a long or short period of time. This is because the difficulties of building distributed systems are difficult to overcome, or because the uniqueness and convenience of isolated systems are difficult to achieve. For these reasons, the core issue of data integration is still how to use data properly. Therefore, it is very important to integrate external logs of data.
The second possibility is refactoring: a single system with versatility gradually merges multiple functions to form a hypersystem. This super system looks like a relational database system on the surface, but the biggest difference when you use it in an organization is that you only need one large system instead of countless small systems. In this world, there is no real data integration problem other than the one that has been solved within the system. I think this is because of the practical difficulties in building such a system.
Although another possible outcome is attractive to engineers. One of the features of the new generation of database systems is that they are completely open source. Open source offers a third possibility: the data infrastructure does not have to be packaged into a set of services or application-oriented system interfaces. In the Java stack, you can see that this has happened to some extent.
Do you charge by category?
Zookeeper is used to handle coordination between multiple systems and may get some help from high-level abstractions such as Helix or Curator.
Mesos and YARN are used to handle process visualization and resource management.
Embedded class libraries such as Lucene and LevelDB are used as indexes.
Netty,Jetty and Finagle,rest.li are packaged into high-level ones for handling remote communications.
Other class libraries such as Avro,Protocol Buffers,Thrift and umpteen zillion are used to handle serialization.
Kafka and Bookeeper provide support logs.
If you stack these together, from another point of view, it's a bit like a simplified version of distributed database system engineering. You can put these together and create a large number of possible systems. Obviously, what we are discussing now is not how to implement API that end users care about, but how to design and implement a single system in the process of continuous diversification and modularization. Because with the emergence of reliable and flexible modules, the time period for the implementation of distributed systems is shortened from annual to weekly, and the pressure of aggregation to form a large-scale overall system gradually disappears.
The position of Log Files in system structure
Systems that provide external logs now allow personal computers to abandon their own complex logging systems and use shared logs. In my opinion, a journal can do the following:
Deal with data consistency by sorting concurrent updates of nodes (whether in a timely or final case)
Provide data replication between nodes
Provide "commit" syntax (written only if the writer ensures that data is not lost)
Bit system provides external data subscription resources.
Provides the ability to store failed replication operations and guide new replication operations
Deal with data balance between nodes
This is actually the most important part of a data distribution system, and most of the rest is related to the API and indexing strategy invoked by the terminal. This is the difference between different systems. For example, a full-text query needs to query all partitions, while a primary key query only needs to query a single node responsible for key data.
Let's take a look at how the system works. The system is divided into two logical areas: log and service layer. The log captures the state changes sequentially, and the service node storage index provides all the information needed by the query service (key-value storage may be done in the form of B-tree or SSTable, while the search system may have the opposite index). The writer can access the log directly, although it needs to be proxied through the service layer. A logical timestamp (that is, the index in log) will be generated when the log is written. If the system is segmented, it will generate the same number of log files and service nodes as the number of segments. There may be a big difference between the number of files and the number of machines.
The service node subscribes to the log information and applies the writer to its local index as soon as possible in the order in which the log is stored.
As long as the client provides the corresponding writer's timestamp in the query statement, it can obtain the "read-write" semantics from any node. After receiving the query statement, the service node will compare the timestamp with its own index. If necessary, the service node will delay the request until the index at the corresponding time is established, so as not to provide old data.
The service node may not need to know the concept of "control" or "bid choice" (leader election) at all. For many simple operations, the service node can provide services completely without leadership, and the log is the source of information.
One of the more complex tasks that the distribution system needs to do is to repair the failed nodes and remove the isolation between several points. It is a typical practice to retain the repaired data and combine the snapshots of the data in each region, which is almost equivalent to keeping a complete backup of the data and recycling the logs from the dustbin. This makes the service layer much simpler and the logging system more targeted.
With this logging system, you can subscribe to API, which provides data content that provides ETL to other systems. In fact, many systems can share the same log while providing different indexes, as follows:
How does such a log-centric system achieve both the provider of data flow and the loading of data from other systems? Because the stream processor can consume multiple input data streams, and then index the data through other systems to provide services for them.
The view of this system can be clearly decomposed into logs and query API, because it allows you to decompose the characteristics of queries in terms of system availability and consistency. This can help us break down systems and understand systems that are not designed and implemented in this way.
Although both Kafka and Bookeeper are consistent logs, this is not necessary and does not make sense. You can easily decompose data structures such as Dynamo into consistent AP logs and key-value pair service layers. Such a log is flexible to use because it retransmits old messages, and like Dynamo, such processing depends on the subscriber of the message.
In the eyes of many people, it is a waste to keep a complete copy of the data in the log. In fact, although there are many factors that make it not difficult. First, logging can be an effective storage mechanism. We stored 5 TB of data on the server in the Kafka production environment. At the same time, there are many service systems that need more memory to provide effective data services, such as text search, which is usually in memory. The service system also needs to optimize the sample hard disk. For example, our real-time data system either provides services out of memory or uses solid state drives. In contrast, the logging system only needs to read and write linearly, so it is happy to use TB-sized hard drives. Finally, as shown in the figure above, the cost of logs for data provided by multiple systems is spread over multiple indexes, and this aggregation minimizes the cost of external logs.
LinkedIn uses this way to implement many of its real-time query systems. These systems provide a database (using a data bus for log summaries or removing dedicated logs from Kafka), and they also provide special sharding, indexing, and query capabilities on top-level data streams. This is how we implement search, social networking and OLAP query systems. In fact, this approach is quite common: providing a single piece of data (either real-time or derived) for multiple service systems for real-time services. This approach has proved to be quite concise. These systems simply do not need externally writable API,Kafka and databases to be used for system records and change streams, and you can query the system through logs. A node that holds a specific shard completes the write operation locally. These nodes blindly transcribe the data provided by the log into their own storage space. Nodes that failed to transcribe can be recovered by playing back the upstream log.
The extent of these systems depends on the diversity of logs. A completely reliable system can use logs for data fragmentation, storage nodes, load balancing, data consistency and data replication. In this process, the service layer is really nothing more than a caching mechanism that allows stream processing that is written directly to the log.
At this point, I believe you have a deeper understanding of "how to understand the log-centric big data management style of LinkedIn". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.