In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces you how to use DataX to synchronize MaxCompute data to TableStore, the content is very detailed, interested friends can refer to, hope to be helpful to you.
Overview
Now more and more technical architectures will combine MaxCompute and TableStore, use MaxCompute for big data analysis, and the calculated results will be exported to TableStore to provide online access. MaxCompute provides the ability to calculate massive data, while TableStore provides the ability to read and write massive data with high concurrency and low latency.
Currently, there are several main ways to export data within MaxCompute to TableStore:
Write your own tools: use MaxCompute SDK to read table data through Tunnel, and then write data through TableStore SDK.
DataX: host the execution of DataX tasks on the server yourself.
Using data integration services: the underlying layer of the system is also DataX, providing additional service-oriented and distributed capabilities.
The second one is most often recommended to users for temporary data export, if there is no need for special processing of the data, we generally do not recommend the first way.
DataX has been used in Ali Group for many years and has experienced many tests of Singles Day. It is a stable, easy-to-use and efficient tool. With the increasing size of the resulting data on the MaxCompute, the rate of data export is becoming more and more important, and the massive data need to be exported within the baseline. This article will mainly introduce several optimization methods to improve the throughput of using DataX for MaxCompute to TableStore data export.
Optimization process
We will demonstrate how to improve the speed of data export through step-by-step optimization. In the whole link of data export, there are mainly three links, one is the reading of MaxCompute data channel, the second is the data exchange of DataX, and the third is the online writing of TableStore. Any one of these three links becomes a bottleneck, which will affect the speed of export.
The reading performance of MaxCompute data channel is relatively high, which generally will not become a bottleneck. This paper mainly aims at the latter two links to optimize. The core guidelines for optimization are: 1. Improve concurrency, 2. Reduce write latency. The next list of several optimization means, but also around these two points, to constantly optimize.
The experiment chooses a test environment that uses TableStore. On MaxCompute, we create a table and prepare 100 million rows of data. The scale of the test environment of TableStore and the specification of DataX Job host are small, so the rate achieved in the whole experiment is relatively small, mainly to demonstrate how to improve the speed. In a real TableStore production environment, we have helped applications optimize to hundreds of megabytes or even gigabytes per second with the same optimization tools.
Data preparation
First, create the following table in MaxCompute:
Md5 string, userid string, name string, comments string, attr0 string, attr1 string, attr2 string, attr3 string, create_time string, udpate_time string)
Secondly, 100 million rows of data are poured into the table, and each row of data is about 200 bytes. The userid column uses a random value, and the calculated md5 value takes 4 bytes as the md5 column. The data examples are as follows:
The test data is imported using MaxCompute Tunnel, and the speed is considerable.
When the data is ready, create a table on TableStore, using md5 and userid as the primary key columns:
TableMeta tableMeta = new TableMeta ("DataTable"); tableMeta.addPrimaryKeyColumn ("md5", PrimaryKeyType.STRING); tableMeta.addPrimaryKeyColumn ("userid", PrimaryKeyType.STRING); CapacityUnit capacityUnit = new CapacityUnit (0,0); CreateTableRequest request = new CreateTableRequest (); request.setTableMeta (tableMeta); request.setReservedThroughput (capacityUnit); ots.createTable (request)
After the table and data are ready, use the following DataX Job configuration class to export the data:
"job": {"setting": {"speed": {"channel": "1"}}, "content": [{"reader": {"name": "odpsreader" "parameter": {"accessId": "accessid", "accessKey": "accesskey", "project": "aliyun_ots_dev", "table": "data_for_ots", "partition": [] "column": ["md5", "userid", "name", "comments", "attr0", "attr1", "attr2", "attr3", "create_time", "udpate_time"], "packageAuthorizedProject": "," splitMode ":" record "," odpsServer ":" * * " "tunnelServer": "* *"}}, "writer": {"name": "otswriter", "parameter": {"endpoint": "http://data-import-test.cn-hangzhou.ots.aliyuncs.com"," "accessId": "accessid", "accessKey": "accesskey", "instanceName": "data-import-test", "table": "DataTable", "primaryKey": [{"name": "md5" "type": "string"}, {"name": "userid", "type": "string"}], "column": [{"name": "name", "type": "string"}, {"name": "comments" "type": "string", {"name": "attr0", "type": "string"}, {"name": "attr1", "type": "string"}, {"name": "attr2", "type": "string"}, {"name": "attr3" "type": "string", {"name": "create_time", "type": "string"}, {"name": "update_time", "type": "string"}] "writeMode": "UpdateRow"]}}
Start the DataX task, and you can see the current data export speed from the standard output:
2017-02-07 08 Total 41V 49.285 [job-0] INFO StandAloneJobContainerCommunicator-Total 271520 records, 55194052 bytes | Speed 1.05MB/s, 5404 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 4.501s | All Task WaitReaderTime 47.815s | Percentage 0.006 [job-0] INFO StandAloneJobContainerCommunicator-Total 324640 records, 65992457 bytes | Speed 1.03MB/s, 5312 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 5.474s | All Task WaitReaderTime 55.068s | Percentage 0.008 [job-0] INFO StandAloneJobContainerCommunicator-Total 377600 records, 76758462 bytes | Speed 1.03MB/s, 5296 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 6.479s | All Task WaitReaderTime 62.297s | Percentage 0.002% 2017 02-07 08 Fran 42Switzerland 19.289 [job-0] INFO StandAloneJobContainerCommunicator-Total 431072 records, 87628377 bytes | Speed 1.04MB/s, 5347 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 7.469s | All Task WaitReaderTime 69.559s | Percentage 0.002% 201702-07 08All Task WaitReaderTime 42Partition 29.290 [job-0] INFO StandAloneJobContainerCommunicator-Total 484672 records, 98524462 bytes | Speed 1.04MB/s, 5360 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 8.421s | All Task WaitReaderTime 76.892s | Percentage 0.002% 2017 02-07 08V 42V 39.292 [job-0] INFO StandAloneJobContainerCommunicator-Total 538144 records, 109394175 bytes | Speed 1.04MB/s, 5347 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 9.428s | All Task WaitReaderTime 83.889s | Percentage 0.005%
As you can see, the current speed is about 1MB/s, and then we will demonstrate how to optimize it, increasing the speed step by step.
One: configure reasonable basic parameters of DataX
The first step is to tune several basic parameters of DataX, and first get a general understanding of the running structure of the task inside the next DataX Job:
A DataX Job will be split into multiple Task, each Task will be grouped by TaskGroup, and there will be a set of Reader- > Channel- > Writer within a Task. Channel is the data exchange channel connecting Reader and Writer, and all data will be transmitted through Channel.
Within DataX, there is a strict speed control for each Channel, and the default speed limit is 1MB/s, which is why we use the default configuration, the speed is 1MB/s. So the first basic parameter that needs to be optimized is the speed limit of a single Channel. Change the configuration as follows:
"core": {"transport": {"channel": {"speed": {"byte": 5242880}}, "job": {.}}
We configure the speed limit of a single Channel to 5MB. This value needs to be configured differently for different scenarios. For example, for MaxCompute, the speed of a single Channel can reach dozens of MB. For TableStore, the speed of a single Channel is several MB in scenarios with smaller columns, while in scenarios with larger columns, the speed may reach dozens of MB.
In our current default configuration, the number of Channel in the startup Job is 1. In order to improve the speed, the concurrency must be improved. This is the second step of optimization. But before doing the second optimization, there is one more basic parameter that needs to be adjusted, and that is the memory size configuration of the JVM started by DataX Job.
At present, the default configuration of JVM launched by DataX is "- Xms1g-Xmx1g". When the number of Channel in a Job increases, the memory footprint will increase significantly, because DataX, as a data exchange channel, will cache more data in memory. For example, there will be a Buffer in Channel as a buffer for temporary data exchange, and there will also be some Buffer in some Reader and Writer.
There are two ways to adjust the JVM parameters. One is to change the datax.py directly, and the other is to add the corresponding parameters at startup, as follows:
Python datax/bin/datax.py-jvm= "- Xms8G-Xmx8G" ots.json
It is usually recommended to set the memory to 4G or 8G, which can also be adjusted according to the actual situation.
After optimizing the speed limit of the single Channel and the memory parameters of the JVM, let's run the task again:
2017-02-07 08 Total 4415 [job-0] INFO StandAloneJobContainerCommunicator-Total 153920 records, 31289079 bytes | Speed 1.67MB/s, 8608 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 2.873s | All Task WaitReaderTime 12.098s | Percentage 0.002% 2017 02-07 08V 45V 03.189 [job-0] INFO StandAloneJobContainerCommunicator-Total 256064 records, 52051995 bytes | Speed 1.98MB/s, 10214 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 4.892s | All Task WaitReaderTime 17.194s | Percentage 0.002% 201702-07 008All Task WaitReaderTime 4513. 191 [job-0] INFO StandAloneJobContainerCommunicator-Total 360864 records, 73356370 bytes | Speed 2.03MB/s, 10480 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 9.221s | All Task WaitReaderTime 19.19s | Percentage 0.002% 2017-02-07 08VV 45 purge 23.192 [job-0] INFO StandAloneJobContainerCommunicator-Total 464384 records, 94400221 bytes | Speed 2.01MB/s, 10352 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 11.754s | All Task WaitReaderTime 22.278s | Percentage 0.002% 201702-07 008All Task WaitReaderTime 4533.194 [job-0] INFO StandAloneJobContainerCommunicator-Total 570176 records, 115905214 bytes | Speed 2.05MB/s, 10579 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 14.827s | All Task WaitReaderTime 25.367s | Percentage 0.002% 2017-0708VRV 4543.195 [job-0] INFO StandAloneJobContainerCommunicator-Total 675328 records, 137281049 bytes | Speed 2.04MB/s, 10515 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 18.515s | All Task WaitReaderTime 27.810s | Percentage 0.007 [job-0] INFO StandAloneJobContainerCommunicator-Total 778752 records, 158304152 bytes | Speed 2.00MB/s, 10342 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 20.403s | All Task WaitReaderTime 32.418s | Percentage 0.005%
The current speed of data export has been accelerated from 1MB to 2MB.
Second: improve Channel concurrency in DataX Job
As pointed out in the previous point, only a single Channel is performing export tasks within the current Job, and to increase the speed, all you need to do is increase the concurrency number of the Channel.
DataX imposes a speed limit on each Channel. You can limit the number of byte per second or the number of record per second. In addition to the speed limit for each Channel, there will be a global speed limit configuration, which is unlimited by default.
There are three ways to increase the number of Channel concurrency:
1. Configure global Byte speed limit and single Channel Byte speed limit. Number of Channel = global Byte speed limit / single Channel Byte speed limit. (the final number of Channel in the following example is 10)
"core": {"transport": {"channel": {"speed": {"byte": 1048576}} "job": {"setting": {"speed": {"byte": 10485760}},...}}
2. Configure global Record speed limit and single Channel Record speed limit. Number of Channel = global Record speed limit / single Channel Record speed limit. (the final number of Channel in the following example is 3)
"core": {"transport": {"channel": {"speed": {"record": 100} "job": {"setting": {"speed": {"record": 300}},...}}
3. No global speed limit, directly configure the number of Channel. (the final number of Channel in the following example is 5)
"core": {"transport": {"channel": {"speed": {"byte": 1048576}} "job": {"setting": {"speed": {"channel": 5}},...}}
The third way is the most simple and straightforward, but it lacks the overall speed limit. When choosing the number of Channel, it is also important to note that the number of Channel is not the more the better. The increase in the number of Channel brings more CPU consumption and memory consumption. If the Channel concurrency configuration is too high, resulting in insufficient JVM memory, what will happen is frequent Full GC, and the export speed will drop sharply, which is counterproductive.
You can find the number of Channel for this task in the output log of DataX:
2017-02-07 13 DataX Reader.Job 27V 45.016 [job-0] INFO JobContainer-DataX Reader.Job [odpsreader] splits to [15] tasks.2017-02-07 13 V 27A 45.017 [job-0] INFO OtsWriterMasterProxy-Begin split and MandatoryNumber: 152017-02-07 13 V 27A 45.025 [job-0] INFO OtsWriterMasterProxy-End split.2017-02-07 13 V 27A [job-0] INFO JobContainer-DataX Writer.Job [otswriter] splits to [15] tasks.
In our experiment, we directly configure the number of Channel to 10 and export again:
2017-02-07 08 Total 58 Total 24.366 [job-0] INFO StandAloneJobContainerCommunicator-Total 2465984 records, 501286700 bytes | Speed 9.19MB/s, 47414 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 362.875s | All Task WaitReaderTime 378.978s | Percentage 0.002% 2017-02-07 08V 58 All Task WaitReaderTime 34.368 [job-0] INFO StandAloneJobContainerCommunicator-Total 2941792 records, 598009404 bytes | Speed 9.22MB/s, 47580 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 459.910s | All Task WaitReaderTime 379.002s | Percentage 0.002% 2017-02-07 08VV 58V 44.369 [job-0] INFO StandAloneJobContainerCommunicator-Total 3436064 records, 698484741 bytes | Speed 9.58MB/s, 49427 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 556.324s | All Task WaitReaderTime 379.026s | Percentage 0.002% 2017-02-07 08Fr58 RV 54.371 [job-0] INFO StandAloneJobContainerCommunicator-Total 3905856 records, 793982836 bytes | Speed 9.11MB/s, 46979 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 652.749s | All Task WaitReaderTime 379.050s | Percentage 0.002% 2017-02-07 008VO8 All Task WaitReaderTime 04.372 [job-0] INFO StandAloneJobContainerCommunicator-Total 4384512 records, 891284760 bytes | Speed 9.28MB/s, 47865 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 749.464s | All Task WaitReaderTime 379.074s | Percentage 0.002% 2017-02-07 08 All Task WaitReaderTime 5914.373 [job-0] INFO StandAloneJobContainerCommunicator-Total 4875136 records, 991017582 bytes | Speed 9.51MB/s, 49062 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 846.522s | All Task WaitReaderTime 379.098s | Percentage 0.005%
You can see that after the number of Channel increases from 1 to 10, the speed increases from 2MB/s to 9MB/s. At this time, if you further increase the number of Channel to 15, the speed has not increased, but from the monitoring of the server, the write delay of each batch of imports is indeed increasing, indicating that the current bottleneck is on the TableStore write end.
Third, pre-partition the TableStore table and further improve DataX Channel concurrency
After the above optimizations are done, the DataX data exchange is no longer a bottleneck, but the current bottleneck lies in the writing capacity of the TableStore side. TableStore is distributed storage. A large table will be divided into many partitions, and the partitions will be distributed to various physical machines at the back end to provide services. For a newly created table, the default number of partitions is 1. As the table gets larger and larger, TableStore splits it, which is done automatically. The number of partitions is to some extent related to the service capacity that can be provided. In some business scenarios, large-scale data import of the table is required after the new table is created. At this time, the default single partition is definitely not enough. Of course, you can wait for the table to split automatically after the amount of data increases slowly, but this cycle will be longer. At this point, the recommended practice is to pre-partition when the table is created.
However, at present, we do not have the function of pre-partitioning through SDK, so if you need to pre-partition the table, you can contact us through the ticket to help with the pre-partitioning.
We create a new table and pre-divide the table into four partitions, with partition key as the md5 column. The main reason for adopting the md5 column is that the partition of the data on it is basically uniform. If the data is unevenly distributed in the partition key, the import performance will not be significantly improved even after pre-partitioning. With the same Job configuration, run the export task again:
2017-02-08 13 Total 48 records 18.692 [job-0] INFO StandAloneJobContainerCommunicator-Total 11395424 records, 2316456451 bytes | Speed 18.79MB/s, 96940 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 666.003s | All Task WaitReaderTime 336.048s | Percentage 0.007% 2017-02-08 1348 purge 28.693 [job-0] INFO StandAloneJobContainerCommunicator-Total 12340192 records, 2508508780 bytes | Speed 18.32MB/s, 94476 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 716.743s | All Task WaitReaderTime 349.424s | Percentage 0.002% 2017-02-08 1314 48 Error 38.694 [job-0] INFO StandAloneJobContainerCommunicator-Total 13197472 records, 2682776109 bytes | Speed 16.62MB/s, 85728 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 776.487s | All Task WaitReaderTime 359.132s | Percentage 0.002% 2017-08 1348 Fringe 48.695 [job-0] INFO StandAloneJobContainerCommunicator-Total 14085856 records, 2863367678 bytes | Speed 17.22MB/s, 88838 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 826.191s | All Task WaitReaderTime 378.034s | Percentage 0.002% 2017-02-08 1314 48 Error 58.696 [job-0] INFO StandAloneJobContainerCommunicator-Total 15063328 records, 3062065378 bytes | Speed 18.95MB/s, 97747 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 867.363s | All Task WaitReaderTime 401.640s | Percentage 0.002% 2017-08 134949 Fringe 08.697 [job-0] INFO StandAloneJobContainerCommunicator-Total 15908736 records, 3233917750 bytes | Speed 16.39MB/s, 84540 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 921.19s | All Task WaitReaderTime 418.862s | Percentage 0.005%
At this time, the speed increases from 9MB/s to 18MB/s. After the TableStore server can improve more service capabilities, we try to increase the concurrency of Channel from 10 to 15:
2017-02-08 13 records 54.546 [job-0] INFO StandAloneJobContainerCommunicator-Total 8194848 records, 1665844036 bytes | Speed 20.97MB/s, 108160 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 884.016s | All Task WaitReaderTime 263.742s | Percentage 0.007% 2017-02-08 13Total 52lane 04.547 [job-0] INFO StandAloneJobContainerCommunicator-Total 9351040 records, 1900875263 bytes | Speed 22.41MB/s, 115619 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 1007.206s | All Task WaitReaderTime 263.789s | Percentage 0.002% 2017-02-08 1314. 548 [job-0] INFO StandAloneJobContainerCommunicator-Total 10460064 records, 2126318844 bytes | Speed 21.50MB/s, 110902 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 1140.113s | All Task WaitReaderTime 263.824s | Percentage 0.002% 2017-02-08 13 13 purge 52All Task WaitReaderTime 24.549 [job-0] INFO StandAloneJobContainerCommunicator-Total 11662112 records, 2370669233 bytes | Speed 23.30MB/s, 120204 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 1269.070s | All Task WaitReaderTime 263.863s | Percentage 0.007% 2017-02-08 1313 All Task WaitReaderTime 52purl 34.550 [job-0] INFO StandAloneJobContainerCommunicator-Total 12874240 records, 2617069638 bytes | Speed 23.50MB/s, 121212 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 1396.991s | All Task WaitReaderTime 263.913s | Percentage 0.005%
At this time, the speed increases further, from 18MB/s to 22MB/s or so.
Four: increase the number of lines written in batches
The scenario we built is about 200 bytes in size per line. The underlying layer of DataX's OTSWriter write plug-in uses the BatchWrite interface provided by TableStore SDK for data writing. By default, 100 rows of data are written in one request, that is, only data about 20KB size will be imported in one request. Each time the data packets written are relatively small and very uneconomical.
The current BatchWrite limit of TableStore is inflexible, which limits the number of rows and data size. The default limit for the number of rows is 200rows. In our experiment, we changed the upper limit to 1000 rows and changed the number of rows written in a batch within the DataX TableStore write plug-in to 1000 rows to verify the improvement in write efficiency after increasing the size of each write packet. The task configuration is changed as follows (the configuration item is job.content.writer.parameter.batchWriteCount):
"job": {"content": [{"reader": {...}, "writer": {"name": "otswriter", "parameter": {"batchWriteCount": 1000 ...}]}}
Perform the task again at the following speed:
2017-02-08 13 Total 55 records 16.924 [job-0] INFO StandAloneJobContainerCommunicator-Total 11413216 records, 2320073926 bytes | Speed 29.44MB/s, 151849 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 72.662s | All Task WaitReaderTime 1030.787s | Percentage 0.007% 2017-02-08 13V 55mer 36.925 [job-0] INFO StandAloneJobContainerCommunicator-Total 14462240 records, 2939879188 bytes | Speed 29.55MB/s, 152451 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 85.228s | All Task WaitReaderTime 1297.655s | Percentage 0.002% 2017-02-08 13 purge 5546.927 [job-0] INFO StandAloneJobContainerCommunicator-Total 15979552 records, 3248317815 bytes | Speed 29.41MB/s, 151731 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 89.841s | All Task WaitReaderTime 1432.022s | Percentage 0.002% 2017-02-08 1315 purge 55RU 56.928 [job-0] INFO StandAloneJobContainerCommunicator-Total 17488864 records, 3555129299 bytes | Speed 29.26MB/s, 150931 records/s | Error 0 records 0 bytes | All Task WaitWriterTime 100.300s | All Task WaitReaderTime 1558.120s | Percentage 0.002% 2017-02-08 13 All Task WaitReaderTime 56 All Task WaitReaderTime 06.929 [job-0] INFO StandAloneJobContainerCommunicator-Total 19018240 records, 3866017412 bytes | Speed 29.65MB/s, 152937 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 106.391s | All Task WaitReaderTime 1691.072s | Percentage 0.005%
Speed up again, from 22MB/s to 29MB/s. TableStore will then optimize the row limit for BatchWrite and adopt a more friendly strategy for scenarios with smaller rows.
Fifth, MaxCompute table partitioning to improve DataX Job concurrency
The above optimization strategies are all optimized in the scenario of a single DataX Job. A single DataX Job can only be run on a single server, and there is no way to execute it distributed. The managed server on D2 is usually a gigabit network card, which means that the maximum speed of 100MB/s is provided. If you want to speed up further, you must use multiple DataX Job distributed across multiple servers.
The ODPSReader in DataX can be configured to export the entire table or a Partition of the table at one time. We can use Partition to split a table into multiple Job decentralized exports, but the table must be multi-partitioned.
In our experiment, the MaxCompute table created is not multi-partitioned, so we recreate a multi-partitioned table:
Md5 string, userid string, name string, comments string, attr0 string, attr1 string, attr2 string, attr3 string, create_time string, udpate_time string) PARTITIONED BY (partid string)
Add a column partid. As a partition, we import the data from the original table into the new table through a SQL, and automatically distribute it evenly to the partid:
Attr0, attr1, attr2, attr3, create_time, udpate_time, SUBSTR (md5, 1,1) from data_for_ots
The above SQL takes the value of partid from the first character of the md5 column. Md5 is a hexadecimal value with a range of characters: 0Murf, so we cut the original table into a table with 16 partitions. We want the data to be uniform within each partition, which is why a md5 column is designed to avoid long tails.
After splitting a table into multiple partitions, we can choose to start a task for each partition on different servers, configured as follows (job.content.reader.parameter.partition):
"job": {"content": [{"reader": {"name": "odpsreader", "parameter": { "partition": ["partid=0"],...}}, "writer": {...}}]}}
Due to the size of the test cluster, we will not demonstrate the speed increase after multiple Job concurrency. When the capability of the TableStore server is not the bottleneck, the speed can be increased linearly by extending the concurrency of DataX Job.
Summarize the above optimization points:
Several basic parameters of DataX are adjusted, including the number of Channel, the speed limit of a single Channel and the memory parameters of JVM.
When creating TableStore tables, try to adopt pre-partitioning, and when designing partition key, try to ensure a uniform distribution of imported data on each partition key.
If the data rows imported into TableStore are relatively small, you need to consider increasing the number of imported rows in a single batch.
If a single DataX Job has become a bottleneck, you need to consider splitting the task into multiple DataX Job for parallel execution.
On how to use DataX to synchronize MaxCompute data to TableStore to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.