In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
What this article shares with you is about how to upload local files in HDFS. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
Public synchronized void write (byte b [], int off, int len)
Throws IOException {
If (closed) {/ / check whether it is closed. Naturally, you should no longer write data.
Throw new IOException ("Stream closed")
}
While (len > 0) {/ / the len here refers to the length of unwritten data left in the source buffer (in byte)
Int remaining = BUFFER_SIZE-pos; / / number of bytes that can be written in the destination buffer
Int toWrite = Math.min (remaining, len); / / compare with the number of bytes to be written, and take a smaller value as the number of bytes to be actually written
System.arraycopy (b, off, outBuf, pos, toWrite); / / start copying to operate as a write to destination buffer
Pos + = toWrite; / / Update destination buffer location pointer
Off + = toWrite; / / Update source buffer location pointer
Len-= toWrite; / / update the length of the remaining contents of the source buffer
FilePos + = toWrite; / / calculates the total written length of the entire file (including the contents of the buffer)
If ((bytesWrittenToBlock + pos > = BLOCK_SIZE) | |
(pos = = BUFFER_SIZE)) {
Flush (); / / here are two conditions that cause flush, one is that the total length (written + cache) exceeds one block size
/ / the second is that the destination buffer is full and all the space has been written, so flush is naturally needed.
}
}
}
/ / friendly reminder, the first half of the writing here is to write as much as you can, and then judge when you are finished!
Why are there two conditions for judgment? It must be easy for many people to understand that the buffer zone is full because there is no room left.
However, bytesWrittenToBlock + pos > = BLOCK_SIZE may not be very clear.
This is because a Block will start a new stove and open a new Block when it is full.
The flush () function is not explained for the time being, but it will be explained later!
-
Public synchronized void write (int b) throws IOException {
If (closed) {/ / still verifies whether to close or not
Throw new IOException ("Stream closed")
}
If ((bytesWrittenToBlock + pos = = BLOCK_SIZE) | |
(pos > = BUFFER_SIZE)) {
Flush ()
} / / is still the check of two conditions
OutBuf [pos++] = (byte) b
The meaning of these two sentences of filePos++;// is to actually write them to the destination buffer.
But why not adjust the order of these two paragraphs to make it easier to understand? What a unique way of thinking!
}
-
Public synchronized void flush () throws IOException {
If (closed) {
Throw new IOException ("Stream closed")
} / / check whether it is closed or not, the old rule
If (bytesWrittenToBlock + pos > = BLOCK_SIZE) {
FlushData (BLOCK_SIZE-bytesWrittenToBlock)
} / / if you need a new Block, write down the remaining insufficient bytes first
If (bytesWrittenToBlock = = BLOCK_SIZE) {
EndBlock (); / / then close the current block and start a new block
}
FlushData (pos); / / A pair of current blocks continues to write the rest
}
-
Continue to look at other functions.
Before looking at other functions, I first want the reader to establish a storage mechanism for files in 0.1.0.
When reading local files and uploading them to HDFS, the file stream looks like this.
Local file-- > local memory buffer Buffer--- > local file-> upload to the remote HDFS system.
The local memory buffer Buffer--- > local file is what flushData does, please review the flush function again, and then analyze flushData.
PS: reading code is more tiring than writing code. To read code is to understand other people's thinking, and to write code is to realize your own thinking.
Private synchronized void flushData (int maxPos) throws IOException {
Int workingPos = Math.min (pos, maxPos); / / calculate the number of bytes to write, which is superfluous.
If (workingPos > 0) {/ / if you really need to write
/ /
/ / To the local block backup, write just the bytes
/ /
BackupStream.write (outBuf, 0, workingPos); / / write to the local file
/ / Note, please read the initialization process of backupStream carefully, it is a local file.
/ / that is, you plan to write the contents of the memory buffer to a local file, write a block and then send it to HDFS.
/ / Smart readers should think that the size of the last block is 0) {/ / greater than 0?
BlockStream.writeLong ((long) bytesRead); / / number of bytes written
BlockStream.write (buf, 0, bytesRead); / / write to the buffer
BytesRead = in.read (buf); / / continue reading from the local file
}
InternalClose (); / / interaction with NameNode and DataNode to close
MustRecover = false;// indicates the end of the task
} catch (IOException ie) {
HandleSocketException (ie)
} finally {
In.close (); / / closes the input stream of the current file
}
}
/ /
/ / Delete local backup, start new one
/ / the following four lines are file buffers that re-establish the local file system and do not explain
BackupFile.delete ()
BackupFile = newBackupFile ()
BackupStream = new FileOutputStream (backupFile)
BytesWrittenToBlock = 0
}
After reading the above code, I personally think that if I write this logic in C language, I will directly call sendfile to achieve file transfer.
Of course, JAVA's API lag and OS probably didn't provide this way at that time, but today's kernels do.
-
So the next analysis is the function: nextBlockOutputStream ()
Private synchronized void nextBlockOutputStream () throws IOException {
Boolean retry = false;// does not explain
Long start = System.currentTimeMillis (); / / current start time
Do {
Retry = false;// reset to false
Long localstart = System.currentTimeMillis (); / / current start time
Boolean blockComplete = whether the false;// callout block is OK
LocatedBlock lb = null; / / initialized to null
While (! BlockComplete) {/ / if not finished
If (firstTime) {/ / if this is the first time to open a file
Lb = namenode.create (src.toString (), clientName.toString (), localName, overwrite); / / create a file
} else {
Lb = namenode.addBlock (src.toString (), localName)
} / / add a block
If (lb = = null) {/ / if not found
Try {
Thread.sleep / / just sleep for 400ms
If (System.currentTimeMillis ()-localstart > 5000) {
LOG.info ("Waiting to find new output block node for" + (System.currentTimeMillis ()-start) + "ms")
}
} catch (InterruptedException ie) {
}
} else {
BlockComplete = true;// sets blockComplete to true. Interpreted as finding a block.
}
}
Block = lb.getBlock (); / / get the information of block from lb
DatanodeInfo nodes [] = lb.getLocations (); / / get the DataNode array to be stored by block from lb
/ /
/ / Connect to first DataNode in the list. Abort if this fails.
/ / Please note the meaning of the above sentence: connect the first data node
/ / Why? Data transmission adopts daisy chain mode based on the principle of computer composition.
InetSocketAddress target = DataNode.createSocketAddr (nodes [0] .getName () .toString ()); / / parsing
Try {
S = new Socket ()
S.connect (target, READ_TIMEOUT); / / Connect the first DataNode
S.setSoTimeout (READ_TIMEOUT); / / set the read time
} catch (IOException ie) {/ / exception will not be analyzed here
/ / Connection failed. Let's wait a little bit and retry
Try {
If (System.currentTimeMillis ()-start > 5000) {
LOG.info ("Waiting to find target node:" + target)
}
Thread.sleep (6000)
} catch (InterruptedException iex) {
}
If (firstTime) {
Namenode.abandonFileInProgress (src.toString ())
} else {
Namenode.abandonBlock (block, src.toString ())
}
Retry = true
Continue
}
/ / you have successfully connected to the remote DataNode node, bingo!
/ / Xmit header info to datanode
/ /
DataOutputStream out = new DataOutputStream (new BufferedOutputStream (s.getOutputStream ()
/ / get the output stream handle
Out.write (OP_WRITE_BLOCK); / / output behavior identification
Out.writeBoolean (false); / / false?
Block.write (out); / / write block information. Note: the block obtained from namenode is written to DataNode.
Out.writeInt (nodes.length); / / this is the same as the following line for writing to DataNode for all storage and backups
For (int I = 0; I
< nodes.length; i++) { nodes[i].write(out);//不解释 } out.write(CHUNKED_ENCODING);//写CHUNKED_ENCODING bytesWrittenToBlock = 0;//重置为0 blockStream = out;//把句柄赋值给类的局部变量供后续使用 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解释 } while (retry); firstTime = false;//firstTime在至少有一个块信息返回后就为false =================================================== 接下来要分析的函数是 private synchronized void internalClose() throws IOException { blockStream.writeLong(0);//表明长度结束了 blockStream.flush();//把缓冲内容全部输出。 long complete = blockReplyStream.readLong();//读取响应 if (complete != WRITE_COMPLETE) {//如果不是结束 LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); } LocatedBlock lb = new LocatedBlock();//创建一个新对象 lb.readFields(blockReplyStream);//根据响应流来赋值 namenode.reportWrittenBlock(lb);//向namenode报告写入成功 s.close();//关闭此流 s = null; } ================ 最后就是close函数 public synchronized void close() throws IOException { if (closed) { throw new IOException("Stream closed"); }//校验是否关闭了 flush();//尽可能的输出内容 if (filePos == 0 || bytesWrittenToBlock != 0) { try { endBlock();//结束一个块 } catch (IOException e) { namenode.abandonFileInProgress(src.toString());//抛弃此file throw e; } } backupStream.close();//关闭流 backupFile.delete();//删除文件 if (s != null) { s.close();//不解释 s = null; } super.close(); long localstart = System.currentTimeMillis(); boolean fileComplete = false; while (! fileComplete) {//循环报告文件写完了 fileComplete = namenode.complete(src.toString(), clientName.toString()); if (!fileComplete) { try { Thread.sleep(400); if (System.currentTimeMillis() - localstart >5000) {
LOG.info ("Could not complete file, retrying...")
}
} catch (InterruptedException ie) {
}
}
}
Closed = true
}
The above is how to upload local files in HDFS. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.