Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize data synchronization from Elasticsearch/DB to SFTP/FTP

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article focuses on "how to achieve Elasticsearch/DB to SFTP/FTP data synchronization", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to synchronize Elasticsearch/DB to SFTP/FTP data.

Elasticsearch/DB to SFTP/FTP data synchronization

Through the bboss data synchronization tool, the data in Elasticsearch and Database can be exported (incremental / full) to the file and uploaded to the SFTP/FTP server in real time, which is described in detail through a case.

1. Case source engineering

Https://gitee.com/bboss/elasticsearch-file2ftp

Https://github.com/bbossgroups/elasticsearch-file2ftp

two。 Case function description

Serial export data to a file and upload ftp and sftp

Serial batch export data to files and upload ftp and sftp

Batch export data to files and upload ftp and sftp in parallel

By setting disableftp to true, only data files are generated, and the file upload sftp/ftp function is disabled (the generated files are kept in the directory corresponding to fileDir)

Special concern

In addition to the general features of bboss synchronization tools (incremental / full synchronization, async / synchronization, add, delete, change and search synchronization), you need to explain the specific features in this case:

Support upload failed file retransmission function

Backup function of uploaded successful files is supported, and you can specify how long the backup will take.

Support automatic cleanup of backup files

Support to cut files by the number of records

Elegantly solve the problem that the asynchronous delayed write feature of elasticsearch may lead to missing step data in incremental synchronization

This article only introduces the case of synchronous upload of elasticsearch data to sftp.

Https://github.com/bbossgroups/elasticsearch-file2ftp/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/ES2FileFtpBatchSplitFileDemo.java

Other cases directly view the source code:

Elasticsearch data is synchronously uploaded to ftp case code address

Https://github.com/bbossgroups/elasticsearch-file2ftp/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/ftp/ES2FileFtpBatchDemo.java

The database is synchronously uploaded to the sftp case code address

Https://github.com/bbossgroups/elasticsearch-file2ftp/blob/main/src/main/java/org/frameworkset/elasticsearch/imp/db/DB2FileFtpDemo.java

3. Case explanation

Next, take elasticsearch data export and upload sftp server as an example to introduce.

3.1 establish a synchronous job processing class

First look at the complete synchronous job processing class-ES2FileFtpBatchSplitFileDemo, and then explain it in detail.

Package org.frameworkset.elasticsearch.imp;import com.frameworkset.util.SimpleStringUtil;import org.frameworkset.elasticsearch.serial.SerialUtil;import org.frameworkset.runtime.CommonLauncher;import org.frameworkset.tran.CommonRecord;import org.frameworkset.tran.DataRefactor;import org.frameworkset.tran.DataStream;import org.frameworkset.tran.context.Context;import org.frameworkset.tran.input.fileftp.es.ES2FileFtpExportBuilder;import org.frameworkset.tran.output.fileftp.FileFtpOupputConfig;import org.frameworkset.tran.output.fileftp.FilenameGenerator;import org.frameworkset.tran.output.fileftp.ReocordGenerator Import org.frameworkset.tran.schedule.CallInterceptor;import org.frameworkset.tran.schedule.ImportIncreamentConfig;import org.frameworkset.tran.schedule.TaskContext;import java.io.Writer;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Map;public class ES2FileFtpBatchSplitFileDemo {public static void main (String [] args) {ES2FileFtpExportBuilder importBuilder = new ES2FileFtpExportBuilder (); importBuilder.setBatchSize. SetFetchSize (1000) String ftpIp = CommonLauncher.getProperty ("ftpIP", "10.13.6.127"); / / also specified the default value FileFtpOupputConfig fileFtpOupputConfig = new FileFtpOupputConfig (); fileFtpOupputConfig.setFtpIP (ftpIp); fileFtpOupputConfig.setFileDir ("D:\\ workdir"); fileFtpOupputConfig.setFtpPort (5322); fileFtpOupputConfig.addHostKeyVerifier ("2a:da:5a:6a:cf:7d:65:e5:ac:ff:d3:73:7f:2c:55:c9") FileFtpOupputConfig.setFtpUser ("ecs"); fileFtpOupputConfig.setFtpPassword ("ecs@123"); fileFtpOupputConfig.setRemoteFileDir ("/ home/ecs/failLog"); fileFtpOupputConfig.setKeepAliveTimeout (100000); fileFtpOupputConfig.setTransferEmptyFiles (true); / / true upload empty files, false does not upload fileFtpOupputConfig.setFailedFileResendInterval (5000); / / upload failed file reupload interval (in milliseconds, 0) {String tmp = "" For (int I = 0; I

< t; i ++){ tmp += "0"; } _fileSeq = tmp+_fileSeq; } return "HN_BOSS_TRADE"+_fileSeq + "_"+time +"_" + _fileSeq+".txt"; } }); //指定文件中每条记录格式,不指定默认为json格式输出 fileFtpOupputConfig.setReocordGenerator(new ReocordGenerator() { @Override public void buildRecord(Context taskContext, CommonRecord record, Writer builder) { //直接将记录按照json格式输出到文本文件中 SerialUtil.normalObject2json(record.getDatas(),//获取记录中的字段数据 builder); String data = (String)taskContext.getTaskContext().getTaskData("data");//从任务上下文中获取本次任务执行前设置时间戳// System.out.println(data); } }); importBuilder.setFileFtpOupputConfig(fileFtpOupputConfig); importBuilder.setIncreamentEndOffset(300);//单位秒 //vops-chbizcollect-2020.11.26,vops-chbizcollect-2020.11.27 importBuilder .setDsl2ndSqlFile("dsl2ndSqlFile.xml") .setDslName("scrollQuery") .setScrollLiveTime("10m")// .setSliceQuery(true)// .setSliceSize(5)// .setQueryUrl("dbdemo/_search") //通过简单的示例,演示根据实间范围计算queryUrl,lastStartTime 记录查询开始时间,lastEndtime 查询为截止时间(在设置了IncreamentEndOffset情况下有值) .setQueryUrlFunction((TaskContext taskContext,Date lastStartTime,Date lastEndTime)->

< t; i ++){ tmp += "0"; } _fileSeq = tmp+_fileSeq; } return "HN_BOSS_TRADE"+_fileSeq + "_"+time +"_" + _fileSeq+".txt"; } }); //指定文件中每条记录格式,不指定默认为json格式输出 fileFtpOupputConfig.setReocordGenerator(new ReocordGenerator() { @Override public void buildRecord(Context taskContext, CommonRecord record, Writer builder) { //直接将记录按照json格式输出到文本文件中 SerialUtil.normalObject2json(record.getDatas(),//获取记录中的字段数据 builder); String data = (String)taskContext.getTaskContext().getTaskData("data");//从任务上下文中获取本次任务执行前设置时间戳// System.out.println(data); } }); importBuilder.setFileFtpOupputConfig(fileFtpOupputConfig);3.5 文件名称生成机制配置 必须通过FileFtpOupputConfig对象的setFilenameGenerator方法设置文件名称生成接口FilenameGenerator,示例代码如下: //自定义文件名称 fileFtpOupputConfig.setFilenameGenerator(new FilenameGenerator() { @Override public String genName( TaskContext taskContext,int fileSeq) { //fileSeq为切割文件时的文件递增序号 String time = (String)taskContext.getTaskData("time");//从任务上下文中获取本次任务执行前设置时间戳 String _fileSeq = fileSeq+""; int t = 6 - _fileSeq.length(); if(t >

0) {String tmp = ""; for (int I = 0; I

< t; i ++){ tmp += "0"; } _fileSeq = tmp+_fileSeq; } return "HN_BOSS_TRADE"+_fileSeq + "_"+time +"_" + _fileSeq+".txt"; } }); 接口方法genName带有两个参数: TaskContext taskContext, 任务上下文对象,包含任务执行过程中需要的上下文数据,比如任务执行时间戳、其他任务执行过程中需要用到的数据 int fileSeq 文件序号,从1开始,自动递增,如果指定了每个文件保存的最大记录数,fileSeq就会被用到文件名称中,用来区分各种文件 3.6 自定义记录输出格式 默认采用json格式输出每条记录到文件中,我们可以FileFtpOupputConfig对象的setReocordGenerator方法设置自定义记录生成接口ReocordGenerator。 接口方法buildRecord参数说明: Context recordContext, 记录处理上下文对象,可以通过recordContext.getTaskContext()获取任务执行上下文对象,并获取任务上下文数据 CommonRecord record, 处理当前记录对象,包含记录数据Map Writer builder 记录数据写入器 直接将record中的数据转换为json文本并输出到Writer builder中: //指定文件中每条记录格式,不指定默认为json格式输出 fileFtpOupputConfig.setReocordGenerator(new ReocordGenerator() { @Override public void buildRecord(Context recordContext, CommonRecord record, Writer builder) { //直接将记录按照json格式输出到文本文件中 SerialUtil.normalObject2json(record.getDatas(),//获取记录中的字段数据 builder); //String data = (String)recordContext.getTaskContext().getTaskData("data");//从任务上下文中获取本次任务执行前设置时间戳// System.out.println(data); } }); 竖线|分隔字段值: public class DataSendReocordGenerator implements ReocordGenerator { @Override public void buildRecord(Context taskContext, CommonRecord record, Writer builder) { Map datas = record.getDatas(); try { Map chanMap = (Map)taskContext.getTaskContext().getTaskData("chanMap");//从任务上下文中获取渠道字典数据 String phoneNumber = (String) datas.get("phoneNumber");//手机号码 if(phoneNumber==null){ phoneNumber=""; } builder.write(phoneNumber);//将字段内容输出到文件 builder.write("|");//输出字段分隔符 String chanId = (String) datas.get("chanId");//办理渠道名称 通过Id获取名称 String chanName = null; if(chanId==null){ chanName=""; }else{ chanName=chanMap.get(chanId); if(chanName == null){ chanName = chanId; } } builder.write(chanName); builder.write("|"); builder.write(goodsName); builder.write("|"); String goodsCode = (String) datas.get("goodsCode");//资费档次编码 if(goodsCode==null){ goodsCode=""; } builder.write(goodsCode); builder.write("|"); String bossErrorCode = (String) datas.get("bossErrorCode");//错误码 if(bossErrorCode==null){ bossErrorCode=""; } builder.write(bossErrorCode); builder.write("|"); String bossErrorDesc = (String) datas.get("bossErrorDesc");//错误码描述 if(bossErrorDesc==null){ bossErrorDesc=""; }else{ bossErrorDesc = bossErrorDesc.replace("|","\\|").replace("\r\n","\\\\r\\\\n"); //处理字段内容中包含的|字符和回车换行符 } builder.write(bossErrorDesc); } catch (IOException e) { e.printStackTrace(); } }}3.7 elasticsearch增量导出截止时间偏移量设置 elasticsearch增量导出截止时间偏移量设置-IncreamentEndOffset,由于elasticsearch异步写入数据的特性,如果采用原有的增量时间戳机制(起始时间>

LastImporttime, there is no deadline), which will result in missing part of the unsold data, so you need to specify the time corresponding to the forward offset of IncreamentEndOffset based on the current time as the data export deadline (in seconds).

Examples are as follows:

ImportBuilder.setIncreamentEndOffset (300); / / in seconds, synchronize the data 5 minutes before the current time of the last synchronization deadline, and continue to synchronize data 3.8 from the last deadline to retrieve data configuration from elasticsearch next time

The following describes the relevant configuration parameters for retrieving data from Elasticsearch

Parameter name description default value dsl2ndSqlFileString type, required, retrieve the xml profile path of dsl, corresponding to classpath path, configuration case: dsl2ndSqlFile.xml has no dslNameString type, required, configuration name of dsl in xml configuration file, key configuration increment field xxx in dsl statement and add cut-off field xxx__endTime naming convention: the cut-off field is the incremental field name xxx automatically suffixed with _ _ endTime Example: {# # incremental retrieval range, which can be either a time range or a numeric range. Here we use the numeric incremental field "range": {# if ($collecttime) "collecttime": {# # time incremental retrieval field "gt": # [collecttime], "lte": # [collecttime__endTime]} # end}} No scrollLiveTimeString type, required. Validity period of the files above and below the scroll. Set according to the actual situation, for example: 10m has no queryUrlString type, optional, directly specify elasticsearch query rest service address, format: index name / _ search, for example: dbdemo/_search

QueryUrlFunctionQueryUrlFunction API type, optional, public String queryUrl (Date lastTime). Synchronize the data parameter lastTime according to the interface method to dynamically set the synchronized index name and retrieval service address, which is suitable for time-based indexing scenarios.

SourceElasticsearchString type, optional, set elasticsearch data source (bboss supports the configuration of multiple data sources). The specific configuration is configured in application.properties.

AddParam method type, optional. Add the search condition in dsl by name/value, for example: importBuilder.addParam ("var1", "v1"); corresponding to the writing in dsl: {"term": {"var1.keyword": # [var1]}}

SliceQuery optional, boolean type, mark whether the query is slicescroll query falsesliceSize optional, int type, set the slicesize of slicescroll parallel query

Configuration code example:

ImportBuilder .setDsl2ndSqlFile ("dsl2ndSqlFile.xml") .setDslName ("scrollQuery") .setScrollLiveTime ("10m") / / .setSliceQuery (true) / / .setSliceSize (5) / / .setQueryUrl ("dbdemo/_search") / / through a simple example, demonstrate calculating the start time of a queryUrl,lastStartTime record query based on a real-time range LastEndtime query is the deadline (value if IncreamentEndOffset is set) .setQueryUrlFunction ((TaskContext taskContext,Date lastStartTime,Date lastEndTime)-> {String formate = "yyyy.MM.dd" SimpleDateFormat dateFormat = new SimpleDateFormat (formate); String startTime = dateFormat.format (lastEndTime); / / Date lastEndTime = new Date (); String endTimeStr = dateFormat.format (lastEndTime) Return "dbdemo-" + startTime+ ", dbdemo-" + endTimeStr+ "/ _ search"; / / return "vops-chbizcollect-2020.11.26,vops-chbizcollect-2020.11.27/_search"; / / return "dbdemo/_search" }) .addParam ("fullImport", false) / add parameters and parameter values needed in dsl .addParam ("var1", "v1") .addParam ("var2", "v2") .addParam ("var3", "v3") / / specify the elasticsearch data source name importBuilder.setSourceElasticsearch ("default")

Dsl profile dsl2ndSqlFile.xml and the corresponding dsl statement name example:

ScrollQuery is the dsl,scrollSliceQuery corresponding to this case is the dsl needed for slice to export, and they share the conditional fragment queryCondition. Another open source elasticsearch rest java client project based on bboss retrieves data from elasticsearch internally, and the client uses reference documents:

Https://esdoc.bbossgroups.com/#/development

3.9 scheduled task configuration / / scheduled task configuration, importBuilder.setFixedRate (false) / / refer to the description of fixedRate in the jdk timer task documentation / / .setScheduleDate (date) / / specify the start time of the task: date .setDeyLay (1000L) / / execution of the task is delayed after deylay milliseconds. SetPeriod (30000L) / / execute every period millisecond, if not set, only once

The above configuration indicates that the synchronization job task is delayed by 1 second and executed every 30 seconds.

3.10 Task context data definition and acquisition

In some specific scenarios, to avoid repeatedly loading data during task execution, you need to load some data that will not change during task execution before each task is scheduled for execution, put it into the task context TaskContext; task execution process, and obtain the data directly from the task context. For example, put the timestamp of each task execution into the task execution context.

The context data is added by the addTaskData method of the TaskContext object and the task context data is obtained by the getTaskData method of the TaskContext object.

3.10.1 define task context data

Task context data definition-add task context data through the TaskContext object of preCall of CallInterceptor interface

@ Overridepublic void preCall (TaskContext taskContext) {String formate = "yyyyMMddHHmmss"; / / HN_BOSS_TRADE00001_YYYYMMDDHHMM_000001.txt SimpleDateFormat dateFormat = new SimpleDateFormat (formate); String time = dateFormat.format (new Date ()); taskContext.addTaskData ("time", time); / / define the timestamp parameter time} when the task is executed

Complete Code-Task context data definition

/ / set task execution interceptor. You can add multiple importBuilder.addCallInterceptor (new CallInterceptor () {@ Override public void preCall (TaskContext taskContext) {String formate = "yyyyMMddHHmmss"; / / HN_BOSS_TRADE00001_YYYYMMDDHHMM_000001.txt SimpleDateFormat dateFormat = new SimpleDateFormat (formate); String time = dateFormat.format (new Date () TaskContext.addTaskData ("time", time); / / define the timestamp parameter time @ Override public void afterCall (TaskContext taskContext) {System.out.println ("afterCall 1");} @ Override public void throwException (TaskContext taskContext, Exception e) {System.out.println ("throwException 1") }}); / / set the end of the task execution interceptor, and you can add multiple 3.10.2 to obtain task context data

Get the task context data in the interface method that generates the file name

FileFtpOupputConfig.setFilenameGenerator (new FilenameGenerator () {@ Override public String genName (TaskContext taskContext,int fileSeq) {String time = (String) taskContext.getTaskData ("time"); / / get task execution timestamp parameters time String _ fileSeq = fileSeq+ ""; int t = 6-_ fileSeq.length (); if (t > 0) {String tmp = "; for (int I = 0; I < t) I + +) {tmp+ = "0";} _ fileSeq = tmp+_fileSeq;} return "HN_BOSS_TRADE" + _ fileSeq+ "_" + time + "_" + _ fileSeq+ ".txt";}})

Get task context data when generating records in a file

FileFtpOupputConfig.setReocordGenerator (new ReocordGenerator () {@ Override public void buildRecord (Context context, CommonRecord record, Writer builder) {/ / SerialUtil.normalObject2json (record.getDatas (), builder); String data = (String) context.getTaskContext (). GetTaskData ("data"); / / get global parameters / / System.out.println (data);}})

Get task context data in datarefactor method

/ * reset the es data structure * / importBuilder.setDataRefactor (new DataRefactor () {public void refactor (Context context) throws Exception {String data = (String) context.getTaskContext (). GetTaskData ("data");}}); 3.11 set the IP address information base address

We set the address of the IP address repository with the following code:

/ / set the address of the ip address information base, configure the reference document importBuilder.setGeoipDatabase ("E:/workspace/hnai/terminal/geolite2/GeoLite2-City.mmdb"); importBuilder.setGeoipAsnDatabase ("E:/workspace/hnai/terminal/geolite2/GeoLite2-ASN.mmdb"); importBuilder.setGeoip2regionDatabase ("E:/workspace/hnai/terminal/geolite2/ip2region.db")

IP address Base configuration details reference document: set the IP address information base address

3.12 adjust the content of recorded data

You can adjust the recording data content through the datarefactor API. The sample code is as follows:

/ * reset the es data structure * / importBuilder.setDataRefactor (new DataRefactor () {public void refactor (Context context) throws Exception {/ / you can define whether to discard the current record / / context.setDrop (true) according to the condition; return;// if (s.incrementAndGet ()% 2 = = 0) {/ / context.setDrop (true) / / return;//} String data = (String) context.getTaskContext (). GetTaskData ("data"); / / System.out.println (data); / / context.addFieldValue ("author", "duoduo"); / / the author variable context.addFieldValue ("title", "liberation") that will override the global setting Context.addFieldValue ("subtitle", "well-off"); / / context.addIgnoreFieldMapping ("title"); / / the above three attributes have been placed in docInfo. If you no longer need to place them in the indexed document, you can ignore these attributes / / context.addIgnoreFieldMapping ("author"). / modify the field name title to the new name newTitle, and modify the field value / / context.newName2ndData ("title", "newTitle", (String) context.getValue ("title") + "append new Value") / * obtain the operator and region information corresponding to ip. The field logVisitorial value corresponds to an ip address * / IpInfo ipInfo = (IpInfo) context.getIpInfo ("logVisitorial"); if (ipInfo! = null) context.addFieldValue ("ipinfo", ipInfo) Else {context.addFieldValue ("ipinfo", ");} DateFormat dateFormat = SerialUtil.getDateFormateMeta (). ToDateFormat (); / / Date optime = context.getDateValue (" LOG_OPERTIME ", dateFormat); / / context.addFieldValue (" logOpertime ", optime); context.addFieldValue (" newcollecttime ", new Date ()) / * * / / Associated query data, single-valued query Map headdata = SQLExecutor.queryObjectWithDBName (Map.class,context.getEsjdbc (). GetDbConfig (). GetDbName (), "select * from head where billid =? And othercondition=?, context.getIntegerValue ("billid"), "otherconditionvalue"); / / multiple conditions are separated by commas to append / / append the data in headdata, call the addFieldValue method to add the data to the current es document, and specify how to construct the document data structure to context.addFieldValue ("headdata", headdata) according to the requirements. / / Associated query data, multi-valued query List facedatas = SQLExecutor.queryListWithDBName (Map.class,context.getEsjdbc () .getDbConfig () .getDbName (), "select * from facedata where billid =?", context.getIntegerValue ("billid") / / add the data in facedatas to the current es document by calling the addFieldValue method, and how to construct the document data structure to determine context.addFieldValue ("facedatas", facedatas); * /}}); 3.13 incremental synchronization configuration / / incremental configuration starts importBuilder.setLastValueColumn ("collecttime") / / manually specify the date increment query field variable name importBuilder.setFromFirst (true); / / setFromfirst (false). If the job stops, the data will be collected from the last cut-off position after the job is restarted. / / setFromfirst (true) if the job stops and the job restarts, restart data collection importBuilder.setLastValueStorePath ("es2fileftp_batchsplitimport") / / record the file path of the last collected increment field value as the starting point of the next increment (or after restart). For different tasks, the path should be different / / importBuilder.setLastValueStoreTableName ("logs"); / / the table recording the last collected increment field value can be unspecified and use the default table name increament_tab importBuilder.setLastValueType (ImportIncreamentConfig.TIMESTAMP_TYPE). / / if no incremental query field name is specified, you need to specify the field type: ImportIncreamentConfig.NUMBER_TYPE numeric type / / or ImportIncreamentConfig.TIMESTAMP_TYPE date type / / specify the start time of incremental synchronization. By default, 1970-01-01 00:00:00 starts / / importBuilder.setLastValue (new Date ()); / / incremental configuration ends 3.14parallel synchronization configuration importBuilder.setParallel (false) / / set to multi-thread parallel batch import, true parallel, false serial importBuilder.setQueue (10); / / set batch import thread pool waiting queue length importBuilder.setThreadCount (50); / / set the number of worker threads in bulk import thread pool importBuilder.setContinueOnError (true); / / if the task is abnormal, whether to continue to execute the job: true (default) continues to execute false interrupt job execution importBuilder.setAsyn (false) / / true is executed asynchronously, and the method returns quickly without waiting for all import job tasks to finish. False (default) synchronous execution, wait for all import job tasks to finish, and the method returns 3.15Sync task log print switch importBuilder.setPrintTaskLog (true) after all jobs are finished; / / true print, false does not print 3.16Sync job execution / * start es data import file and upload sftp/ftp job * / DataStream dataStream = importBuilder.builder (); dataStream.execute () / / start the synchronization job at this point. I believe you have a better understanding of "how to synchronize data from Elasticsearch/DB to SFTP/FTP". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report