Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Design and implementation of distributed temporal Database QTSDB

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

The existing open source time series database influxdb only supports stand-alone operation. When faced with a large number of data writes, there will be slow query, high machine load and single machine capacity limitations.

In order to solve this problem, the infrastructure team developed a cluster version-QTSDB on the basis of stand-alone influxdb.

Brief introduction of QTSDB

QTSDB is a distributed time series database, which is used to handle massive data writing and query. In implementation, it is a distributed version based on open source stand-alone time series database influxdb 1.7. it not only has the characteristics of influxdb itself, but also has cluster functions such as capacity expansion, replica fault tolerance and so on.

The main features are as follows:

High-performance data storage specially written for time series data, taking into account both write performance and disk space footprint

Sql-like query statements that support a variety of statistical aggregate functions

Automatically clean up expired data

Built-in continuous queries to automatically complete user-preset aggregation operations

Golang is written without other dependencies, and it is easy to deploy and operate.

Dynamic horizontal expansion of nodes to support massive data storage

Replica redundancy design, automatic failover, support for high availability

Optimize data writing to support high throughput

System architecture

Logical storage hierarchy

The highest level of influxdb architecture is that the database,database is divided into different retension policy according to the data retention duration, forming multiple storage containers under the database. Because the time series database is associated with the time dimension, the contents with the same retention period are stored together to facilitate expiration and deletion. In addition, under retension policy, the retention duration of retension policy is further subdivided, and the data of each time period is stored in a shard group, so that when a segment of shard group expires, it will be deleted completely to avoid pulling out part of the data from within the storage engine. For example, data under database may be retained for 30 days or 7 days, and they will be stored under different retension policy. Assuming that the 7-day data continues to be divided into 1 day, they are stored in 7 shard group separately. When the data on the 8th day is generated, a new shard group write is created and the entire shard group on the 1st day is deleted.

So far, the current timing data sent under the same retension policy will only fall in the current time period, that is, only the latest shard group can be written. In order to improve concurrency, a shard group is divided into multiple shard. These shard are globally unique and distributed on all physical nodes. Each shard corresponds to a tsm storage engine, which is responsible for storing data.

When requesting access to data, a database and retension policy can be locked through the requested information, and then some shard group can be locked based on the time period information in the request. In the case of writing, each piece of written data corresponds to a serieskey (this concept will be described later), and a shard can be locked for writing by hashing the serieskey. On the other hand, shard has a copy, and it will be written to each copy at the same time using the strategy of unattended multi-write. When querying, because there is no information about serieskey in the query request, you can only query all the shard in the shard group. For a shard, you will select an available physical node in its copy for access.

So how many shard does a shard group need? in order to achieve the maximum concurrency without unduly interfering with the overall order of the data, after the number of physical nodes and replicas are determined, the number of shard in a shard group is the number of machines divided by the number of replicas, ensuring that the current data can be evenly written to all physical nodes, and will not affect query efficiency because of too much shard. For example, if the data cluster on the figure has six physical nodes and the user specifies two copies, then there are three shard.

Cluster structure

The whole system is divided into three parts: proxy, meta cluster and data cluster. Proxy is responsible for receiving requests, stateless, and can be preceded by lvs to support horizontal scaling. The meta cluster keeps the logical storage hierarchy mentioned above and its corresponding relationship with the physical nodes, and ensures the strong consistency of metadata through the raft protocol, where meta information is stored in memory, and logs and snapshots are persisted to disk. A data cluster is a true data storage node, where data is stored in shard units, and each shard corresponds to a tsm storage engine.

When the request arrives, after lvs locks a proxy,proxy, it first looks for meta information in the meta cluster based on database, retension policy and time period, and finally gets a mapping from shard to physical node, then converts the mapping relationship into a mapping from physical node to shard and returns it to proxy. Finally, according to this mapping relationship, it visits the specific shard in the physical node specified by the data cluster. The data access under shard will be described later.

Data access

Grammatical format

Influxdb's query provides a query similar to a relational database, showing a similar relational table: measurement, where the time of the temporal database is regarded as an eternal column, and the other columns are divided into two categories:

1 、 field

One is field, which is the most critical part of time series data, whose values are constantly appended over time, such as the delay between two machines at each point in time.

2 、 tag

The other is tag, which are tags of a field value, so they are all string types, and the range of values is limited. For example, the delay field value at a certain point in time is 2ms, and there should be two tag attributes, the delay from which machine to which machine, so you can design two tag:from and to.

Measurement shows that the first line is key, and the rest can be seen as value, so tag has tagkey,tagvalue,field and fieldkey and fieldvalue.

Data reading and writing

When a row of written data is received, it is converted to the following format:

Measurement+tagkey1+tagvalue1+tagkey2+tagvalue2+fieldkey+fieldvalue+time .

If there is more than one field in a row, it will be divided into multiple such data stores. Influxdb's storage engine can be understood as a map, from measurement to fieldkey as storage key, followed by fieldvalue and time is storage value, these values will be continuously appended, in the storage engine, these values will be stored together as a column, because they are gradually changing over time, saving them together can improve the effect of compression. In addition, the remaining part of the storage key after removing the fieldkey is the serieskey mentioned above.

As mentioned above, how access requests lock shard in the cluster, here is how to access within a shard.

Influxdb's query is similar to sql syntax, but the fragmented information with sql statements cannot directly query the storage engine, so some strategies are needed to convert sql statements into storage key. Influxdb converts the tag information after where into a collection of all related serieskey by building an inverted index, and then splices each serieskey with the fieldkey behind the select to form a storage key, so that the corresponding data can be extracted by column.

Through the analysis of the serieskey stored in the key in the tsm storage engine, the inverted index can be constructed. The new version of influxdb persists the inverted index to each shard, corresponding to the tsm storage engine that stores the data, which is called the tsi storage engine. The inverted index is equivalent to a three-tier map,map. The key is measurment, and the value is a two-tier map. The key of this two-tier map is tagkey, and the corresponding value is a first-tier map. The key of this first-tier map is tagval, and the corresponding value is a collection of serieskey. Each serieskey string in this set contains measurement, tagkey and tagval on the map index path.

In this way, we can analyze the query sql, use the measurement after from to query the inverted index three-level map to get a second-level map, and then analyze the multiple filtering logic units after the where. Take tagkey1=tagval1 as the key of the second-tier map, and find the final value: the collection of serieskey, each serieskey string of this set contains measurment, tagkey1 and tagval1, they are serieskey that satisfy the current filtering logic unit. According to the and / OR logic of these logic units, the sets of corresponding serieskey are intersected and merged, and finally all the logical sets of serieskey are filtered out according to the semantics of sql, and then these serieskey are spliced with the fieldkey behind the select to get the final storage key, and the data can be read.

Query without aggregate function: as shown in the figure, for a serieskey, you need to concatenate a large number of fieldkey, and then take out data from multiple columns. The problem they face when they come out is how to combine the data into a row. Influxdb row and column constraints are relatively loose, and rows can not be determined simply according to the column offset. Influxdb takes serieskey and time as the basis for judging column data as one row, and multiple columns corresponding to each serieskey are aggregated into a data flow with multi-behavior granularity, and the data streams corresponding to multiple serieskey are aggregated into a data flow in a certain order, which is returned to the client as the final result set.

Query with aggregate function: this method is just the opposite of the above query. Here, for the aggregate function parameter field, many serieskey are concatenated, of course, the ultimate goal is the same, to get the storage key, multiple storage key can read multiple data streams, these data streams are faced with two kinds of processing, first assemble them into a data stream in a certain order. Then, according to a certain strategy, some adjacent data in this data stream are aggregated and calculated, and then the final aggregated value is obtained. The order and strategy here come from the way group by is aggregated in the sql statement.

The merging and aggregating mode of multiple data streams is also applicable to the query results on shard.

It is relatively simple for writing, just update the data storage engine and inverted index directly.

The whole process

The whole process of access has been mentioned above, so let's sort it out as a whole: the query is divided into two phases, the query above shard and the query under shard.

First of all, the access request locks to a proxy,proxy-to-meta cluster to find meta information through lvs, locks database,retension policy and shard group according to the request information, and then gets a large number of shard.

For write operations, one shard is locked for writing according to the serieskey at the time of writing, and because there are multiple copies of the shard, the data needs to be written to multiple copies at the same time. For queries, you cannot get serieskey by requesting information, so you need to query all shard and select an available copy for each shard to access.

After the above processing, the mapping of shard to physical nodes is obtained, and then converted to the mapping of physical nodes to shard, which is returned to proxy,proxy, and the corresponding shard can be accessed on a node of the data cluster.

Write access under shard requires disassembling the insert statement, combining the storage key-value pairs into the tsm storage engine, and then updating the inverted index based on the combined serieskey.

Query access under shard, analyze sql statements, query inverted index, obtain its related serieskey collection, and splice it into field to form the final storage key for data access. Then the data is merged and aggregated on the data node on the shard, and on the proxy on the data.

Finally, proxy returns the access result to the client.

Fault handling

strategy

It was mentioned above that influxdb provides replica fault tolerance for shard. When write data is sent to proxy,proxy, the data is sent to all shard replicas in the form of unattended multi-write. The meta cluster monitors whether the data node is online in the form of heartbeat. When reading, a reading node is randomly selected among the online data nodes for the same shard.

If a data node is not available at the time of writing, it will be written to a temporary file in proxy, and the temporary data will be sent to the specified node when the network returns to normal.

Deal with

Data cluster expansion

When a new node joins the data cluster, it is not supported to automatically migrate the existing data, but some efforts have been made. In order to make the current written data apply to the new node as soon as possible, when the new node is added, the current time will be taken as the end time of the current shard group, and then a new shard group will be created according to the new number of data nodes, so that the amount of current data can be evenly distributed to each data node. The meta information related to each shard group is stored in the meta cluster, so it does not interfere with the previous data reading.

The data node is temporarily unavailable

If the data node is in a state of short-term unavailability, including self-recovery after a short network failure, or the intervention of operation and maintenance personnel after a hardware failure, and finally the data node still has data before being offline, then it can join the data cluster in its original identity. For writes, proxy temporarily stores the data of this data node during unavailability, and sends this data to the data node again when data joins the cluster to ensure that the data is ultimately consistent.

Data node is not available for a long time

If, for some reason, the data node cannot or does not need to join the cluster in its original identity, and the OPS personnel are required to manually offline the previously unavailable data node, then when the machine is available, it can be added to the cluster with a new data identity, which is equivalent to the expansion of the cluster.

Total knot

The QTSDB cluster is implemented as follows: data is written to the specified shard according to serieskey when written, and serieskey cannot be predicted when reading, so each shard needs to be queried. The whole reading process is divided into two stages: reading from the storage engine on the data node and merging and aggregating multi-shard within the node, aggregating the data of multiple data nodes in the proxy node, merging and aggregating later, forming the final result set and returning to the client.

There are still some imperfections in the existing cluster function of QTSDB, which will be improved in the future.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report