Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark imports data into Elasticsearch in batches, which leads to the location of duplicate records.

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Take a look at the source code of the es-hadoop plug-in:

It is found that the ES import data retry occurs, except that the es.batch.write.retry.policy parameter is enabled by default and the es-hadoop plug-in sends a bulk write request to the ES cluster to receive the 503 response code will retry 3 times.

When the http request is executed itself, there is also a retry (hadoop/rest/NetworkClient.java):

Public Response execute (Request request) {Response response = null; boolean newNode; do {SimpleRequest routedRequest = new SimpleRequest (request.method (), null, request.path (), request.params (), request.body ()); newNode = false; try {response = currentTransport.execute (routedRequest); ByteSequence body = routedRequest.body () If (body! = null) {stats.bytesSent + = body.length ();}} catch (Exception ex) {/ / configuration error-including SSL/PKI-bail out if (ex instanceof EsHadoopIllegalStateException) {throw (EsHadoopException) ex } / / issues with the SSL handshake, bail out instead of retry, for security reasons if (ex instanceof javax.net.ssl.SSLException) {throw new EsHadoopTransportException (ex);} / / check for fatal, non-recoverable network exceptions if (ex instanceof BindException) {throw new EsHadoopTransportException (ex) } if (log.isTraceEnabled ()) {log.trace (String.format ("Caught exception while performing request [% s] [% s]-falling back to the next node in line...", currentNode Request.path (), ex) } String failed = currentNode; failedNodes.put (failed, ex); newNode = selectNextNode (); log.error (String.format ("Node [% s] failed (% s);" + (newNode? "selected next node [" + currentNode + "]": "no other nodes left-aborting..."), failed, ex.getMessage ()); if (! newNode) {throw new EsHadoopNoNodesLeftException (failedNodes);} while (newNode); return response;}

When a request times out, the es-hadoop plug-in requests another ES node to send a write request. That is, the import plug-in thinks that the current insert node times out (the default is one minute) and considers the node unavailable, so it changes to the next node. In fact, ES did not finish the insertion task in one minute.

After you increase the timeout es.http.timeout parameter, leave sufficient storage time for ES, and this problem will not occur again.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report