In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
How to use Kafka to save the New York Times and push, I believe that many inexperienced people do not know what to do, so this article summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.
The New York Times has a lot of content generation systems, and we use third-party data to write stories. In addition, we have 161 years of experience in the news industry and 21 years of online content publishing experience, so a large amount of online content needs to be searched and provided for different services and applications.
On the other hand, there are many services and applications that need to access this content-search engines, personalized customization services, news seed generators, and various other front-end applications, such as websites and mobile applications. Once new content is released, it is necessary to make these services accessible in a very short period of time, and there can be no data loss-after all, it is valuable news.
We will describe in detail how we solved the above problems based on Apache Kafka. We call this system the Publishing Pipeline. Focusing on back-end systems, we will show you how to use Kafka to save New York Times articles and how to use Kafka and Steams API to push published content to various applications in real time. The following is the overall architecture diagram, the details of which will be detailed later.
Shortcomings of API-based solution
There are different requirements for accessing the back-end systems of published content.
We have a service that provides real-time content for websites and mobile applications, so it needs to access the content immediately after it is published.
We also have some services to provide content lists. Some lists are edited manually, while others are obtained through queries. For the list obtained through the query, once the content that meets the query criteria is published, it needs to be included in the list. If the published content is modified and no longer meets the query criteria, it will be removed from the list. We also support changes to the query condition itself, such as creating a new list, which requires access to previously published content.
Our Elasticsearch cluster provides search services for websites. We don't have high requirements for latency, such as not finding new content within a minute or two after the content is released is not a big problem. However, search engines still have to access previously published content, because once the schema definition of Elasticsearch is changed, or the search intake pipeline is modified, all content needs to be re-indexed.
We also have a personalized customization system that is only interested in the latest content. After the personalized customization algorithm has changed, these contents need to be reprocessed.
In the beginning, we provided API for these applications to access published content directly, or to subscribe to feeds, and they would be notified as soon as new content was released.
There are many problems with this typical API-based solution.
Different API is developed by different teams in different ways at different times. There are differences in endpoints, semantics, and even parameters. Although we can try to solve these problems, it takes time and effort to coordinate the various teams.
These systems all define their own schema, and the same field is called differently in different systems, while the field with the same name has different meanings in different systems.
Another problem is that it is difficult to access previously published content. Most systems do not provide content streaming because the databases they use do not support this feature. Although the content is stored in the database, a large number of API calls are time-consuming and can place an unpredictable load on the API service.
Log-based architecture
We are going to introduce a log-based architecture. Martin Kleppmann mentions this architectural scheme in "Turning the database inside-out with Apache Samza" and later describes it in more detail in "Designing Data-Intensive Applications". "Log: What every software engineer should know about real-time data's unifying abstraction" mentions the use of logs as a general data structure. For us, our log is Kafka, and all published content is added to the Kafka topic in chronological order, and other services access it through consumption logs.
Traditional applications use databases to store data. Although databases also have many advantages, managing databases will become a burden in the long run. First of all, changing the database schema is tricky. It is not difficult to add or remove fields, but these changes need to be at the expense of suspending the service. We are also not free to change the database. Most databases do not support streaming changes, and although we can get database snapshots, they can quickly become obsolete. In other words, it is difficult to create derivative stores, such as indexes used by search engines, because the index must contain all the article content, and the index must be rebuilt as soon as new content is published. Although we can allow clients to send content to multiple storage systems at the same time, this still does not solve the consistency problem because some writes fail.
In the long run, the database will eventually become a complex unit.
A log-based architecture can solve these problems. Generally speaking, the database saves the result or state of the event, while the log saves the event itself. We can create any data store we want based on the log, which is the materialized view of the log, and they contain derived content, not the original content. If you want to change the schema of the datastore, just create a new datastore, then consume all the logs from beginning to end, and then throw away the old datastore.
Once the log is used as the source of the facts, it is no longer necessary to use the central database. Each system can create its own data store, or materialized view, which contains only the data necessary for the system and provides a specific format for the system. This simplifies the role of the database in the architecture and better meets the needs of each application.
In addition, the log-based architecture simplifies the way to access content streams. For traditional databases, accessing entire data dumps (such as snapshots) and accessing "real-time" data (such as seeds) are two different operations. For logs, there is no difference. You can start consuming the log at any offset, from the beginning, from the middle, or even from the end. In other words, if you want to recreate the data store, just re-consume the log as needed.
Systems based on log architecture also have many advantages in deployment. Invariant mode deployment of stateless services in virtual machines has become a common way. Redeploying the entire instance can avoid many problems. Because of the logs, we can now deploy stateful systems in invariant mode. Because we can recreate the data store from the log, we can get a new data store every time we deploy changes.
Why can't Google's PubSub or AWS SNS/SQS/Kinesis solve these problems?
There are generally two application scenarios for Kafka.
Kafka is often used as a message broker for data analysis and data integration. Kafka has many advantages in this respect, but Google PubSub and AWS SNS/SQS/Kinesis can also solve these problems. These services support multiple consumers and multiple producers, and can track the consumption status of consumers, and consumers will not lose data when they are down. In these scenarios, logging is just a concrete implementation of a message broker.
But in a log-based architecture, the situation is different. At this time, the log is not just a simple implementation detail, but becomes a core function. We have the following two requirements:
We need to keep all events permanently through the log, otherwise we won't be able to create the data store we need at will.
We need to consume the logs in a certain order, because disordered processing of related events will get the wrong results.
At present, only Kafka can meet these two needs.
Monolog
Monolog is our new content distribution source, and other systems write the created content to Monolog in an appended way. The created content will pass through a gateway before entering the Monolog, and the gateway will check whether the content flows through conforms to our defined schema.
Monolog contains all the content released since 1851, sorted by release time. In other words, consumers can start consuming this content at any point in time. If you need to consume all the content, start from scratch (that is, from 1851), or consume only the updated parts as needed.
For example, we have a service that provides a list of content, such as content published by an author, content related to a scientific topic, and so on. The service consumes the Monolog from the starting point and then builds the list of contents. We have another service that only provides a list of the latest releases, so it doesn't need a permanent data store, it just needs log data from the past few hours. It consumes logs for the last few hours at startup and maintains a list of the latest content in memory.
We send the content to the Monolog in canonical form, and each part of the content is written to the Kafka as a separate message. For example, pictures and articles are sent separately, because multiple different articles may contain the same picture.
This is very similar to the normalized model in relational databases, where there is a many-to-many relationship between pictures and articles.
In the previous example, we have two articles that quote other content. For example, the title line is published separately and then referenced by two other articles. All content is identified by URI in the format nyt://article/577d0341-9a0a-46df-b454-ea0718026d30. We have a native browser that can view these URI, just click on the URI to see their JSON representation, and the content itself is saved on the Monolog in protobuf format.
Monolog is actually a topic on Kafka that contains only one partition because we want to maintain the global order of messages. This ensures the internal consistency of the top-level content-if we add a picture to an article, along with some text that references the picture, then we need to make sure that the position of the picture should precede the new text.
In fact, the content is sorted by topology, as shown in the following figure.
Because the topic contains only one partition, everything is saved on the same disk (such is the storage mechanism of Kafka). But this is not a problem for us, because all our content is text, and so far the total amount has not exceeded 100GB.
Normalize logs and Kafka Streams API
Monolog meets the needs of some applications that require a normalized view of data, but this is not the case for others. For example, in order to index data to Elasticsearch, you need non-normalized data because Elasticsearch does not support many-to-many relational mapping. If you want to search for articles by image captions, the descriptive text of those images must be included in the article object.
In order to support this data view, we have also prepared a set of non-standardized logs. In these logs, the top-level content and all its dependencies are packaged and released. For example, when Article 1 was released, the log messages included not only this article, but also related images and tags.
The Kafka consumer client consumes messages from the log and adds them to the Elasticsearch index. When Ariticle 2 is released, all the relevant content of this article will also be packaged together, even though some images may have already appeared in Ariticle 1.
If the dependencies of the article change, the entire content will be republished. For example, if Image 2 is updated, Article 1 will be added to the log again.
We use a component called Denormalizer to create non-normalized logs.
Denormalizer is a Java application that uses Kafka Streams API. It consumes Monolog messages and keeps an up-to-date version of each article locally, including references to the article. As the content is constantly published and updated, the local storage is constantly updated. Once top-level content is published, Denormalizer collects all dependencies from the local store and packages them into a de-normalized log. If the dependencies of some top-level content change, Denormalizer republishes the entire package.
Non-normalized logs do not need to be guaranteed in global order, we just need to make sure that different versions of the same article are written to the log in a certain order. So we can use partitions to allow multiple consumers to consume these partitions at the same time.
Elasticsearch example
The following figure shows the back-end search service we built, using Elasticsearch.
The whole data flow goes like this.
CMS publishes or updates content.
The content is sent to the gateway in protobuf binary.
The gateway validates the content and writes it to the Monolog.
Denormalizer consumes logs from Monolog, and if it is top-level content, collects all dependencies from local storage and packages them to the non-normalized log. If it is referenced content, then all the top-level content associated with it will also be written to the non-normalized log.
The Kafka divider is partitioned according to the URI of the top-level content.
All search nodes access the non-normalized log by calling Kafka Streams API. Each node reads a partition, wraps the message into a JSON object, adds it to the Elasticsearch index, and finally writes to the specified Elasticsearch node. When we rebuild the index, we turn off the replication function, which can speed up the index and turn on the replication function after the index has been built.
Realize
Our release pipeline is deployed on Google Cloud Platform. I'm not going to describe the details here, but the following figure shows its overall architecture. We run Kafka and ZooKeeper on GCP Compute, and the other components-- the gateway, the Kafka replica node, and the Denormalizer-- run in the container. We use API based on gRPC and Cloud Endpoint, and use SSL authentication and authorization to secure Kafka.
We spent nearly a year on our new architecture, which is now running in a production environment. But this is just the beginning, and we have a lot of other systems that need to be migrated to this architecture. The new architecture has many advantages, but it is also a major shift in thinking for developers, who need to move from the traditional database and publish-subscribe model to a new data flow model. In order for these advantages to carry forward, we need to change the way we develop, and spend a lot of energy building tools and infrastructure to make development easier.
After reading the above, have you mastered how to use Kafka to save and push the New York Times? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.