Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the use of the DataX tool

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

Editor to share with you what the use of DataX tools, I believe that most people do not know much about it, so share this article for your reference, I hope you will learn a lot after reading this article, let's go to understand it!

1. Introduction to DataX tools 1. Design concept

DataX is an offline synchronization tool for heterogeneous data sources, which aims to achieve stable and efficient data synchronization among various heterogeneous data sources, including relational databases (MySQL, Oracle, etc.), HDFS, Hive, ODPS, HBase, FTP and so on. To solve the problem of synchronization of heterogeneous data sources, DataX turns complex mesh synchronization links into star data links, and DataX is responsible for connecting various data sources as an intermediate transmission carrier. When you need to access a new data source, you only need to connect the data source to DataX to achieve seamless data synchronization with existing data sources.

A word of chatter: heterogeneous data sources refer to the use of different database systems to store data in order to handle different kinds of business.

2. Component structure

DataX itself, as an offline data synchronization framework, is built using Framework+plugin architecture. Data source reads and writes are abstracted into Reader and Writer plug-ins, which are integrated into the whole synchronization framework.

Reader

Reader is the data acquisition module, which is responsible for reading the data collected from the data source and sending the data to Framework.

Writer

Writer is a data writing module, which is responsible for continuously fetching data from Framework and writing data to the destination.

Framework

Framework is used to connect reader and writer, as a data transmission channel for both, and to deal with core technical issues such as buffering, flow control, concurrency, data conversion and so on.

3. Architecture design

Job

When DataX completes a single data synchronization job, called Job,DataX, when it receives a Job, it starts a process to complete the entire job synchronization process. Job module is the central management node of a single job, which undertakes the functions of data cleaning, sub-task segmentation (transforming a single job calculation into multiple sub-Task), TaskGroup management and so on.

Split

After DataXJob starts, the Job will be split into several small Task (subtasks) according to different source-side sharding strategies to facilitate concurrent execution. Task is the smallest unit of a DataX job, and each Task is responsible for synchronizing part of the data.

Scheduler

After splitting multiple Task, Job will call the Scheduler module and reassemble the split Task into TaskGroup (task group) according to the configured amount of concurrent data.

TaskGroup

Each TaskGroup is responsible for all the Task allocated at the end of a certain concurrency run, and the default number of concurrency for a single task group is 5. Each Task is started by TaskGroup. After Task is started, the thread of Reader- > Channel- > Writer will be started to complete the task synchronization. After the DataX job is running, Job monitors and waits for multiple TaskGroup module tasks to complete, and waits for Job to exit successfully after all TaskGroup tasks are completed. Otherwise, the process exits abnormally and the process exit value is not 0.

II. Environmental installation

Python2.6+,Jdk1.8+ (brain supplement installation process) is recommended.

1. Download the Python package # yum-y install wget# wget https://www.python.org/ftp/python/2.7.15/Python-2.7.15.tgz# tar-zxvf Python-2.7.15.tgz2, install Python# yum install gcc openssl-devel bzip2-devel [root@ctvm01 Python-2.7.15] #. / configure-- enable-optimizations# make altinstall# python-V3, DataX installation # pwd/opt/module# lldatax# cd / opt/module/datax/bin-- Test Environment is correct # python datax.py / opt/module/datax/job/job.json III, synchronization Task 1, synchronous Table creation-- PostgreSQLCREATE TABLE sync_user (id INT NOT NULL User_name VARCHAR (32) NOT NULL, user_age int4 NOT NULL, CONSTRAINT "sync_user_pkey" PRIMARY KEY ("id") CREATE TABLE data_user (id INT NOT NULL, user_name VARCHAR (32) NOT NULL, user_age int4 NOT NULL, CONSTRAINT "sync_user_pkey" PRIMARY KEY ("id")) 2. Write task script [root@ctvm01 job] # pwd/opt/module/datax/job [root@ctvm01 job] # vim postgresql_job.json3, script content {"job": {"setting": {"speed": {"channel": "3"}} "content": [{"reader": {"name": "postgresqlreader", "parameter": {"username": "root01", "password": "123456", "column": ["id" "user_name", "user_age"], "connection": [{"jdbcUrl": ["jdbc:postgresql://192.168.72.131:5432/db_01"] "table": ["data_user"]}, "writer": {"name": "postgresqlwriter" "parameter": {"username": "root01", "password": "123456", "column": ["id", "user_name", "user_age"] "connection": [{"jdbcUrl": "jdbc:postgresql://192.168.72.131:5432/db_01" "table": ["sync_user"]}], "postSql": [] "preSql": []}} 4, execute script # / opt/module/datax/bin/datax.py / opt/module/datax/job/postgresql_job.json5, Execution Log 2020-04-23 18 25 33.404 [job-0] Task start time: 2020-04-23 18:25:22 Task end time: 2020-04-23 18:25:33 Task Total time elapsed: 10s task average traffic 1B/s record write speed: 0rec/s read record total: 2 read and write failure total: 0. 4. Source code flow analysis

Note: here the source code only shows the core process. If you want to see the whole source code, you can download it from Git.

1. Read data

Core entry: PostgresqlReader

Start the read task

Public static class Task extends Reader.Task {@ Override public void startRead (RecordSender recordSender) {int fetchSize = this.readerSliceConfig.getInt (com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE); this.commonRdbmsReaderSlave.startRead (this.readerSliceConfig, recordSender, super.getTaskPluginCollector (), fetchSize);}}

After the read task starts, the read data operation is performed.

Core class: CommonRdbmsReader

Public void startRead (Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize) {ResultSet rs = null; try {/ / data read rs = DBUtil.query (conn, querySql, fetchSize); queryPerfRecord.end (); ResultSetMetaData metaData = rs.getMetaData (); columnNumber = metaData.getColumnCount () PerfRecord allResultPerfRecord = new PerfRecord (taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL); allResultPerfRecord.start (); long rsNextUsedTime = 0; long lastTime = System.nanoTime (); / / data transfer to while (rs.next ()) {rsNextUsedTime + = (System.nanoTime ()-lastTime); this.transportOneRecord (recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector) LastTime = System.nanoTime ();} allResultPerfRecord.end (rsNextUsedTime);} catch (Exception e) {throw RdbmsException.asQueryException (this.dataBaseType, e, querySql, table, username);} finally {DBUtil.closeDBResources (null, conn);}} 2, data transfer

Core interface: RecordSender (send)

Public interface RecordSender {public Record createRecord (); public void sendToWriter (Record record); public void flush (); public void terminate (); public void shutdown ();}

Core interface: RecordReceiver (receive)

Public interface RecordReceiver {public Record getFromReader (); public void shutdown ();}

Core class: BufferedRecordExchanger

Class BufferedRecordExchanger implements RecordSender, RecordReceiver3, write data

Core entry: PostgresqlWriter

Start the write task

Public static class Task extends Writer.Task {public void startWrite (RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite (recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector ());}}

After the write data task starts, the data write operation is performed.

Core class: CommonRdbmsWriter

Public void startWriteWithConnection (RecordReceiver recordReceiver, Connection connection) {/ / write SQL statements to the database calcWriteRecordSql (); List writeBuffer = new ArrayList (this.batchSize); int bufferBytes = 0; try {Record record; while ((record = recordReceiver.getFromReader ())! = null) {writeBuffer.add (record); bufferBytes + = record.getMemorySize () If (writeBuffer.size () > = batchSize | | bufferBytes > = batchByteSize) {doBatchInsert (connection, writeBuffer); writeBuffer.clear (); bufferBytes = 0;}} if (! writeBuffer.isEmpty ()) {doBatchInsert (connection, writeBuffer); writeBuffer.clear (); bufferBytes = 0 }} catch (Exception e) {throw DataXException.asDataXException (DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear (); bufferBytes = 0; DBUtil.closeDBResources (null, null, connection);}} these are all the contents of the article "what are DataX tools for?". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report