In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces how java uses multithreading to read super-large files, which has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let the editor take you to understand it.
The basic ideas are as follows:
1. Calculate the total file size
two。 Segmented processing to calculate the start and end positions of each thread reading the file
(file size / number of threads) * Nfocus N refers to the number of threads, so that you can get the approximate starting position of each thread when reading the file.
Use "approximate start position" as the start offset of the read file (fileChannel.position) to read the file until you read the first newline character, and record the location of the newline character as the exact starting position of the thread. It is also the end position of the last thread. The end position of the last thread is also directly set to-1.
3. Start threads, and each thread reads from the start position to the end position
The code is as follows:
Read file tool class
Import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.util.Observable; / * * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 3:12 * read file * / public class ReadFile extends Observable {private int bufSize = 1024; / / newline character private byte key = "\ n" .getBytes () [0]; / / current number of lines private long lineNum = 0; / / File encoding. Default is gb2312 private String encode = "gb2312"; / / specific business logic listener private ReaderFileListener readerListener; public void setEncode (String encode) {this.encode = encode } public void setReaderListener (ReaderFileListener readerListener) {this.readerListener = readerListener;} / * get the exact start position * @ param file * @ param position * @ return * @ throws Exception * / public long getStartNum (File file, long position) throws Exception {long startNum = position; FileChannel fcin = new RandomAccessFile (file, "r"). GetChannel (); fcin.position (position); try {int cache = 1024; ByteBuffer rBuffer = ByteBuffer.allocate (cache) / / content of each read byte [] bs = new byte [cache]; / / cache byte [] tempBs = new byte [0]; String line = ""; while (fcin.read (rBuffer)! =-1) {int rSize = rBuffer.position (); rBuffer.rewind (); rBuffer.get (bs); rBuffer.clear (); byte [] newStrByte = bs / / if you find a cache that was not read last time, add it to the current read before if (null! = tempBs) {int tL = tempBs.length; newStrByte = new byte [rSize + tL]; System.arraycopy (tempBs, 0, newStrByte, 0, tL); System.arraycopy (bs, 0, newStrByte, tL, rSize);} / / get the first line feed int endIndex = indexOf (newStrByte, 0) after the start position If (endIndex! =-1) {return startNum + endIndex;} tempBs = substring (newStrByte, 0, newStrByte.length); startNum + = 1024;} catch (Exception e) {e.printStackTrace ();} finally {fcin.close ();} return position;} / * reads the file from the start location set to the end. If end is set to a negative number, you just read * @ param fullPath * @ param start * @ param end * @ throws Exception * / public void readFileByLine (String fullPath, long start, long end) throws Exception {File fin = new File (fullPath); if (fin.exists ()) {FileChannel fcin = new RandomAccessFile (fin, "r"). GetChannel (); fcin.position (start); try {ByteBuffer rBuffer = ByteBuffer.allocate (bufSize) / / content of each read byte [] bs = new byte [bufSize]; / / cache byte [] tempBs = new byte [0]; String line = ""; / / current read file location long nowCur = start; while (fcin.read (rBuffer)! =-1) {nowCur + = bufSize; int rSize = rBuffer.position (); rBuffer.rewind (); rBuffer.get (bs); rBuffer.clear (); byte [] newStrByte = bs / / if you find a cache that was not read last time, add it to the current read before if (null! = tempBs) {int tL = tempBs.length; newStrByte = new byte [rSize + tL]; System.arraycopy (tempBs, 0, newStrByte, 0, tL); System.arraycopy (bs, 0, newStrByte, tL, rSize);} / whether the last boolean isEnd = false has been read / / if the number of digits currently read is larger than the set end position, truncate the read to the set end position if (end > 0 & & nowCur > end) {/ / cache length-current read digits-last digits int l = newStrByte.length-(int) (nowCur-end); newStrByte = substring (newStrByte, 0, l); isEnd = true;} int fromIndex = 0; int endIndex = 0 / / read one line at a time with key (default\ n) as the Terminator while ((endIndex = indexOf (newStrByte, fromIndex))! =-1) {byte [] bLine = substring (newStrByte, fromIndex, endIndex); line = new String (bLine, 0, bLine.length, encode); lineNum++; / / outputs a line of content with readerListener.outLine (line.trim (), lineNum, false) provided by the caller; fromIndex = endIndex + 1 } / / put the unread content into the cache tempBs = substring (newStrByte, fromIndex, newStrByte.length); if (isEnd) {break;}} / / output the last remaining content as one line, and indicate that this is the last line String lineStr = new String (tempBs, 0, tempBs.length, encode); readerListener.outLine (lineStr.trim (), lineNum, true);} catch (Exception e) {e.printStackTrace ();} finally {fcin.close () }} else {throw new FileNotFoundException ("File not found:" + fullPath);} / / notify the observer that the current work has been completed setChanged (); notifyObservers (start+ "-" + end) } / * find a byte [] from a newline position after the specified location * * @ param src * @ param fromIndex * @ return * @ throws Exception * / private int indexOf (byte [] src, int fromIndex) throws Exception {for (int I = fromIndex; I < src.length; iTunes +) {if (src [I] = key) {return I;}} return-1 } / * read a byte [] from the specified start position until the end location is specified to generate a new byte [] * * @ param src * @ param fromIndex * @ param endIndex * @ return * / private byte [] substring (byte [] src, int fromIndex, int endIndex) throws Exception {int size = endIndex-fromIndex; byte [] ret = new byte [size]; System.arraycopy (src, fromIndex, ret, 0, size); return ret;}}
Read file thread
/ * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 4:50 * To change this template use File | Settings | File Templates. * / public class ReadFileThread extends Thread {private ReaderFileListener processPoiDataListeners; private String filePath; private long start; private long end; public ReadFileThread (ReaderFileListener processPoiDataListeners,long start,long end,String file) {this.setName (this.getName () + "- ReadFileThread"); this.start = start; this.end = end; this.filePath = file; this.processPoiDataListeners = processPoiDataListeners;} @ Override public void run () {ReadFile readFile = new ReadFile (); readFile.setReaderListener (processPoiDataListeners); readFile.setEncode (processPoiDataListeners.getEncode ()); / / readFile.addObserver () Try {readFile.readFileByLine (filePath, start, end + 1);} catch (Exception e) {e.printStackTrace ();}
Specific business logic monitoring
/ * Created with Okey * User: Okey * Date: 13-3-14 * Time: 3:19 * NIO line-by-line read callback method * / public abstract class ReaderFileListener {/ / read rows at a time, default is 500 private int readColNum = 500; private String encode; private List list = new ArrayList (); / * * set the number of rows read at a time * @ param readColNum * / protected void setReadColNum (int readColNum) {this.readColNum = readColNum;} public String getEncode () {return encode } public void setEncode (String encode) {this.encode = encode;} / * each row of data read by * @ param lineStr is added to the cache * @ param lineNum line number * @ param over is read * @ throws Exception * / public void outLine (String lineStr, long lineNum, boolean over) throws Exception {if (null! = lineStr) list.add (lineStr) If (! over & & (lineNum% readColNum = = 0)) {output (list); list.clear ();} else if (over) {output (list); list.clear ();}} / * bulk output * * @ param stringList * @ throws Exception * / public abstract void output (List stringList) throws Exception;}
Thread scheduling
Import java.io.File;import java.io.FileInputStream;import java.io.IOException; / * * Created with IntelliJ IDEA. * User: okey * Date: 14-4-1 * Time: 6:03 * To change this template use File | Settings | File Templates. * / public class BuildData {public static void main (String [] args) throws Exception {File file = new File ("E:\\ 1396341974289.csv"); FileInputStream fis = null; try {ReadFile readFile = new ReadFile (); fis = new FileInputStream (file); int available = fis.available (); int maxThreadNum = 50; / / rough thread start position int I = available / maxThreadNum; for (int j = 0; j < maxThreadNum) Long startNum = j = = 0? 0: readFile.getStartNum (file, I * j); long endNum = j + 1 < maxThreadNum? ReadFile.getStartNum (file, I * (j + 1)):-2; / / specific listening implementation ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners ("gbk"); new ReadFileThread (listeners, startNum, endNum, file.getPath ()). Start ();}} catch (IOException e) {e.printStackTrace ();} catch (Exception e) {e.printStackTrace ();}
Now you can adjust the maxThreadNum to enjoy the wind speed!
Thank you for reading this article carefully. I hope the article "how to use java to read super-large files with multi-threads" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.