In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
It is believed that many inexperienced people have no idea about how to migrate DynamoShake from dynamodb to mongodb. Therefore, this article summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
Basic functions of DynamoShake
DynamoDB supports full and incremental synchronization. After the process starts, full synchronization will be performed first, and after the full synchronization ends, it will enter the incremental synchronization phase.
Full synchronization is divided into two parts: data synchronization and index synchronization. Data synchronization is used to synchronize data. Index synchronization will be performed after data synchronization ends, and index synchronization will synchronize with the default primary key. If the user-built index GSI is supported by replica set, the cluster version does not support synchronization currently.
Incremental synchronization synchronizes only the data, not the indexes generated during incremental synchronization.
In addition, the full and incremental synchronization phase does not support DDL operations on the original database tables, such as deleting tables, creating tables, indexing and so on.
Breakpoint continuation
Full synchronization does not support breakpoint resuming, while incremental synchronization supports breakpoint resuming, that is, if the increment is disconnected, only incremental breakpoint resuming can be carried out within a certain period of time. However, in some cases, such as the disconnection for too long, or the loss of previous sites (see below), it can cause full synchronization to be triggered again.
Synchronous data
All the tables on the source side will be written to different tables in the destination library (default is dynamo-shake). For example, if the user has table1,table2, then after synchronization, the destination will have a dynamo-shake library with tables of table1 and table2.
In native dynamodb, the protocol wraps a layer of type fields in the format of "key: type: value". For example, if a user inserts a {hello: 1}, the data obtained by the dynamodb interface is in the format of {"hello": {"N": 1}.
All data types of Dynamo:
String
Binary
Number
StringSet
NumberSet
BinarySet
Map
List
Boolean
Null
Then we provide two conversion methods, raw and change, in which raw is written according to the data obtained by the naked dynamodb API:
Rszz-4.0-2:PRIMARY > use dynamo-shakeswitched to db dynamo-shakerszz-4.0-2:PRIMARY > db.zhuzhao.find () {"_ id": ObjectId ("5d43f8f8c51d73b1ba2cd845"), "aaa": {"L": [{"S": "aa1"}, {"N": "1234"}]} "hello_world": {"S": "f2"}} {"_ id": ObjectId ("5d43f8f8c51d73b1ba2cd847"), "aaa": {"N": "222"}," qqq ": {" SS ": [" H2 "," h3 "]}," hello_world ": {" S ":" yyyyyyyyyyy "} "test": {"S": "aaa"}} {"_ id": ObjectId ("5d43f8f8c51d73b1ba2cd849"), "aaa": {"L": [{"N": "0"}, {"N": "1"}, {"N": "2"}]}, "hello_world": {"S": "Test Chinese"}}
Change indicates a split type field:
Rszz-4.0-2:PRIMARY > use dynamo-shakeswitched to db dynamo-shakerszz-4.0-2:PRIMARY > db.zhuzhao.find () {"_ id": ObjectId ("5d43f8f8c51d73b1ba2cd845"), "aaa": ["aa1", 1234], "hello_world": "f2"} {"_ id": ObjectId ("5d43f8f8c51d73b1ba2cd847"), "aaa": 222, "qqq": ["H2", "h3"] "hello_world": "yyyyyyyyyyy", "test": "aaa"} {"_ id": ObjectId ("5d43f8f8c51d73b1ba2cd849"), "aaa": [0,1,2], "hello_world": "Test Chinese"}
Users can make their own synchronization types according to their own needs.
Site
Incremental breakpoint continuation is implemented based on the locus, which is written to the destination MongoDB by default, and the library name is dynamo-shake-checkpoint. Each table records a checkpoint table, and there is also an status_table table that records whether the current full synchronization or incremental synchronization phase.
Rszz-4.0-2:PRIMARY > use dynamo-shake42-checkpointswitched to db dynamo-shake42-checkpointrszz-4.0-2:PRIMARY > show collectionsstatus_tablezz_incr0zz_incr1rszz-4.0-2:PRIMARY > rszz-4.0-2:PRIMARY > rszz-4.0-2:PRIMARY > db.status_table.find () {"_ id": ObjectId ("5d6e0ef77e592206a8c86bfd"), "key": "status_key" "status_value": "incr_sync"} rszz-4.0-2:PRIMARY > db.zz_incr0.find () {"_ id": ObjectId ("5d6e0ef17e592206a8c8643a"), "shard_id": "shardId-00000001567391596311-61ca009c", "father_id": "shardId-00000001567375527511-6a3ba193", "seq_num": "," status ":" no need to process "," worker_id ":" unknown-worker " "iterator_type": "AT_SEQUENCE_NUMBER", "shard_it": "", "update_date": "} {" _ id ": ObjectId (" 5d6e0ef17e592206a8c8644c ")," shard_id ":" shardId-00000001567406847810-f5b6578b "," father_id ":" shardId-00000001567391596311-61ca009c "," seq_num ":"," status ":" no need to process "," worker_id ":" unknown-worker " "iterator_type": "AT_SEQUENCE_NUMBER", "shard_it": "", "update_date": "} {" _ id ": ObjectId (" 5d6e0ef17e592206a8c86456 ")," shard_id ":" shardId-00000001567422218995-fe7104bc "," father_id ":" shardId-00000001567406847810-f5b6578b "," seq_num ":"," status ":" no need to process "," worker_id ":" unknown-worker " "iterator_type": "AT_SEQUENCE_NUMBER", "shard_it": "", "update_date": "} {" _ id ": ObjectId (" 5d6e0ef17e592206a8c86460 ")," shard_id ":" shardId-00000001567438304561-d3dc6f28 "," father_id ":" shardId-00000001567422218995-fe7104bc "," seq_num ":"," status ":" no need to process "," worker_id ":" unknown-worker " "iterator_type": "AT_SEQUENCE_NUMBER", "shard_it": "", "update_date": "} {" _ id ": ObjectId (" 5d6e0ef17e592206a8c8646a ")," shard_id ":" shardId-00000001567452243581-ed601f96 "," father_id ":" shardId-00000001567438304561-d3dc6f28 "," seq_num ":"," status ":" no need to process "," worker_id ":" unknown-worker " "iterator_type": "AT_SEQUENCE_NUMBER", "shard_it": "", "update_date": "} {" _ id ": ObjectId (" 5d6e0ef17e592206a8c86474 ")," shard_id ":" shardId-00000001567466737539-cc721900 "," father_id ":" shardId-00000001567452243581-ed601f96 "," seq_num ":"," status ":" no need to process "," worker_id ":" unknown-worker " "iterator_type": "AT_SEQUENCE_NUMBER", "shard_it": "", "update_date": "} {" _ id ": ObjectId (" 5d6e0ef27e592206a8c8647e ")," shard_id ":" shardId-00000001567481807517-935745a3 "," father_id ":" shardId-00000001567466737539-cc721900 "," seq_num ":"," status ":" done "," worker_id ":" unknown-worker " "iterator_type": "LATEST", "shard_it": "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043 | 1 | AAAAAAAAAAGsTOg0+3HY+yzzD1cTzc7TPXi/iBi7sA5Q6SGSoaAJ2gz2deQu5aPRW/flYK0pG9ZUvmCfWqe1A5usMFWfVvd+yubMwWSHfV2IPVs36TaQnqpMvsywll/x7IVlCgmsjr6jStyonbuHlUYwKtUSq8t0tFvAQXtKi0zzS25fQpITy/nIb2y/FLppcbV/iZ+ae1ujgWGRoojhJ0FiYPhmbrR5ZBY2dKwEpok+QeYMfF3cEOkA4iFeuqtboUMgVqBh0zUn87iyTFRd6Xm49PwWZHDqtj/jtpdFn0CPoQPj2ilapjh9lYq/ArXMai5DUHJ7xnmtSITsyzUHakhYyIRXQqF2UWbDK3F7+Bx5d4rub1d4S2yqNUYA2eZ5CySeQz7CgvzaZT391axoqKUjjPpdUsm05zS003cDDwrzxmLnFi0/mtoJdGoO/FX9LXuvk8G3hgsDXBLSyTggRE0YM+feER8hPgjRBqbfubhdjUxR+VazwjcVO3pzt2nIkyKPStPXJZIf4cjCagTxQpC/UPMtcwWNo2gQjM2XSkWpj7DGS2E4738biV3mtKXGUXtMFVecxTL/qXy2qpLgy4dD3AG0Z7pE+eJ9qP5YRE6pxQeDlgbERg==", "update_date": ""} {"_ id": ObjectId ("5d6e1d807e592206a8c9a102"), "shard_id": "shardId-00000001567497561747-03819eba" "father_id": "shardId-00000001567481807517-935745a3", "seq_num": "391369000000000325557205", "status": "in processing", "worker_id": "unknown", "iterator_type": "AT_SEQUENCE_NUMBER" "shard_it": "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043 | 1 | AAAAAAAAAAFw/qdbPLjsXMlPalnhh65koia44yz6A1W2uwUyu/MzRUhaaqnI0gPM8ebVgy7dW7dDWLTh/WXYyDNNyXR3Hvk01IfEKDf+FSLMNvh3iELdrO5tRoLtZI2fxxrPZvudRc3KShX0Pvqy2YYwl4nlBR6QezHTWx5H2AU22MGPTx8aMRbjUgPwvgEExRgdzfhG6G9gkc7C71fIc98azwpSm/IW+mV/h/doFndme47k2v8g0GNJvgLSoET7HdJYH3XFdqh5QVDIP4sbz8X1cpN3y8AlT7Muk2/yXOdNeTL6tApuonCrUpJME9/qyBYQVI5dsAHnAWaP2Te3EAvz3ao7oNdnA8O6uz5VF9zFdN1OUHWM40kLUsX4sHve7McEzFLgf4NL1WTAnPN13cFhEm9BS8M7tiJqZ0OzgkbF1AWfq+xg/O6c57/Vvx/G/75DZ8XcWIABgGNkWBET/vLDrgjJQ0PUZJZKNmmbgKKTyHgSl4YOXNEeyH7l6atuc2WaREDjbf7lnQO5No11sz4g3O+AreBcpGVhdZNhGGcrG/wduPYEZfg2hG1sfYiSAM8GipUPMA0PM7JPIJmqCaY90JxRcI1By24tpp9Th45/5rLTGPYJZA==", "update_date": ""}
Where "status_value": "incr_sync" in the status_table table indicates that it has entered the incremental phase. Each incremental shard records a checkpoint, and the rules for specific shard splits can be found in dynamodb's official guan'fa documentation. The following is a description of the fields of the increment table checkpoint:
_ id:mongodb comes with primary key id
Id of shard_id:shard, each shard has a unique id
Father_id: the id,shard of the parent shard may have a parent shard.
Seq_num: the sequence number within the shard currently being processed, which is the main locus information.
Status: the current synchronization phase has the following statuses:
"not process": not processed
"no need to process": no need to deal with
"prepare stage": prepare to process
"in processing": in proc
"wait father finish": wait for the parent node to finish processing before processing
"done": finished processing
Worker_id: processed worker id, which is not enabled at this time
The traversal mode of iterator_type:shard
Iterator address of shard_it:shard, secondary locus information.
Timestamp of update_date:checkpoint update
Indexes
Create a unique index based on the default primary key and create a shard key based on partition key. The user's own index gsi is not currently created.
DynamoShake internal architecture
This section focuses on some of the architectural details of DynamoShake
Full synchronization
The following is a basic table data synchronization architecture diagram (dynamo-shake starts multiple concurrent threads tableSyncer to pull, and the user can control the degree of concurrency). The fetcher thread pulls the data from the source dynamodb and pushes the data into the queue, and then the parser thread takes the data from the queue and parses it (dynamo protocol is transferred to bson). Executor is responsible for aggregating some of the data and writing it to mongodb.
Fetcher . At present, there is only one fetcher thread, using the protocol conversion driver is driver provided by aws. The principle of fetcher is to call driver to grab the data of the source database in batches, and then plug it into the queue until all the data of the current table is captured. The separation of fetcher is mainly due to the consideration of network IO. At present, pulling is affected by the network, and it will be relatively slow.
Parser . Parser can start more than one, the default is currently 2, users can control through the FullDocumentParser. Its main purpose is to read data from the queue and parse it into bson structure. After parser parses, the data is written to the queue of executor as strips. The main reason why parser threads are independent is that parsing consumes CPU resources.
Executor . Executor can also start more than one, the default is currently 4, users can control through FullDocumentConcurrency. Executor pulls from the queue and writes to the destination mongodb after batch aggregation (the upper limit of the aggregate 16MB, the total number of entries is 1024).
After all the current tables have been written, tableSyncer will exit.
Incremental synchronization
The overall incremental architecture is as follows:
The Fetcher thread is responsible for sensing the changes in the shard in the stream, and the Manager is responsible for notifying the message, or creating a new Dispatcher for message processing, with a shard corresponding to a Dispatcher. Dispatcher pulls incremental data from the source side, parses and packages the data through Batcher, then writes to MongoDB through executor, and updates checkpoint at the same time. In addition, if it is a breakpoint continuation, then Dispatcher will pull from the old checkpoint site instead of starting from scratch.
The use of DynamoShake
Start:. / dynamo-shake-conf=dynamo-shake.conf. The configuration parameters are specified in dynamo-shake.conf. Here is the meaning of each parameter:
Id: modification will affect the name of the destination library on the MongoDB
Log.file: log file, not configured to print to standard output
Log.level: log level. Default is recommended.
Log.buffer: whether to print with cache. Default is recommended.
System_profile: prints the port number of the internal stack. Default is recommended.
Http_profile: not enabled yet
Sync_mode: synchronous mode. All indicates full volume + increment, full indicates full volume only, and incr indicates increment only (not supported currently)
Source.access_key_id: dynamodb connection configuration parameter
Source.secret_access_key: dynamodb connection configuration parameter
Source.session_token: dynamodb connection configuration parameter, none can be left blank
Source.region: dynamodb connection configuration parameter
Filter.collection.white: filter the whitelist and synchronize only the specified tables
Filter.collection.black: filter the blacklist without passing the specified table.
Qps.full: speed limit in full phase, how many requests are sent per second
Qps.full.batch_num: the speed limit in the full phase, and the maximum number of item included in a request.
Qps.incr: speed limit in incremental phase. How many requests are sent per second?
Qps.incr.batch_num: the speed limit in the incremental phase. The maximum number of item included in a request.
Target.type: destination configuration. Only mongodb is supported.
Target.address: the connection string address of the destination mongodb.
Target.mongodb.type: is mongodb replica or sharding
Target.mongodb.exist: what behavior is performed if the destination database table with the same name exists. Drop means delete, rename means rename, and leave blank means no processing.
Full.concurrency: the number of threads in full synchronization. One thread corresponds to one table.
Full.document.concurrency: fully synchronize the number of concurrency in 1 table.
Number of parser threads in full.document.parser:1 tables
Full.enable_index.primary: whether to synchronize the primary key of dynamodb.
Full.enable_index.user: whether to synchronize user-built indexes, which is not supported currently
Convert.type: the mode of writing. Raw means naked writing, and change means writing after parsing the type field. Refer to the above document.
Increase.concurrency: incremental synchronization concurrency parameter, the maximum number of shard fetched at a time
Checkpoint.address = the storage address of checkpont, which is not configured to be the same as the destination database by default.
Checkpoint.db = the name of the db written by checkpoint. The default is $db-checkpoint.
DynamoFullCheck
DynamoFullCheck is a tool for verifying whether DynamoDB and MongoDB data are consistent. Currently, only full verification is supported, not increments, that is, if the incremental synchronization phase, then the source and destination are not consistent.
DynamoFullCheck only supports unidirectional verification, that is, verifying whether the data of DynamoDB is a subset of MongoDB, and no verification is carried out in reverse.
In addition, sampling verification is supported, and only tables of interest are supported.
The verification is mainly divided into the following parts:
Contour check. First, verify that the number of tables on both sides is the same; then, verify that the indexes are consistent (no index verification is currently done). Note that if the number in the table is inconsistent, it will exit directly and no subsequent verification will be made.
Accurate check. Accurate verification of data, the principle is to pull data from the source and parse, if there is a unique index, then find the doc of MongoDB according to the unique index, and compare the consistency; if there is no unique index, then it will be looked up in MongoDB according to the entire doc (heavier).
Sampling principle:
For precise verification, if sampling is enabled, each doc is sampled to determine whether the current doc needs sampling. The principle is relatively simple, such as sampling according to 30%, then a random number is generated in 0: 100, and if it is 0: 30, it is checked, otherwise it is not checked.
DynamoFullCheck also needs to go through the fetch,parse phase to pull from the source DynamoDB, so to a certain extent, this part of the code reuses DynamoShake, except that the concurrency of each fetcher, parser, and executor thread within DynamoFullCheck is 1.
Use parameter
The full-check parameter is a little simpler and is injected directly with the command line, for example:. / dynamo-full-check-- sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q-- sourceSecretAccessKey=TwWV9reJCrZhHKSYfqtTaFHW0qRPvjXb3m8TYHMe-- sourceRegion=ap-east-1-t = "10.1.1.1 sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q 30441"-- sample=300
Usage: dynamo-full-check.darwin [OPTIONS] Application Options:-I,-- id= target database collection name (default: dynamo-shake)-l,-- logLevel=-s,-- sourceAccessKeyID= dynamodb source access key id-- sourceSecretAccessKey= dynamodb source secret access key-- sourceSessionToken= dynamodb source session token-- sourceRegion= dynamodb source region-- qpsFull= qps of scan command Default is 10000-- qpsFullBatchNum= batch number in each scan command, default is 128t,-- targetAddress= mongodb target address-d,-- diffOutputFile= diff output file name (default: dynamo-full-check-diff)-p,-- parallel= how many threads used to compare, default is 16 (default: 16)-e,-- sample= comparison sample number for each table 0 means disable (default: 1000)-filterCollectionWhite= only compare the given tables, split by' '--filterCollectionBlack= do not compare the given tables, split by';'--c,-- convertType= convert type (default: raw)-v,-- version print versionHelp Options:-- h,-- help Show this help message after reading the above, have you mastered how DynamoShake migrates from dynamodb to mongodb? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.