Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

7. Principle of uploading and downloading HDFS (source code analysis)

2025-01-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

1. Basic principles of HDFS file upload 1. Basic process

1) the client establishes rpc communication with namenode through RPC locally, and then requests to upload the file

2) after receiving the request, namenode checks whether the file can be created (such as verifying whether the user has this permission, whether the file already exists, etc.). If the check passes, namenode starts recording the meta-information of the new file (first writes to the edits file, then updates the metadata in memory) and starts uploading in response to client.

3) client chunks the file locally (according to the specified block size). Then request namemode to upload the first block.

4) according to the policy and the situation of each datanode, namenode returns 3 datanode addresses to client (the default is 3 copies here).

5) client establishes a pipeline with the three datanode returned by the request namenode, that is, client request dn1,dn1 request dn2,dn2 request dn3, which is a serial channel.

6) the 3 datanode respond step by step, and the final response is sent to client. Indicates that data can be transferred

7) client will split each block into packet, and then put it into data queue, waiting for upload. Each time a packet is transferred, the packet is added to another ack queue, and when the datanode response in the pipeline is transferred, the corresponding packet is removed from the ack queue.

8) then repeat the above process until client closes the channel and writes all the packet in queue to pipeline, and datanode marks the file as complete.

Note: after the client has finished writing, the block is visible and the block being written is not visible. When the sync method is called (the buffer data is brushed to disk), client confirms that the write is complete. When client closes the close method that is called when the stream is closed, the underlying sync is called. The need for manual calls depends on the tradeoff between data robustness and throughput according to the program's needs.

2. The solution to the error in datanode

Question: when an error occurs in a certain datanode during transmission, how does hdfs solve it?

1) pipeline is turned off

2) to prevent packet loss, the packet in ack queue is synchronized to data queue. Resume the next transmission.

3) delete the unfinished datanode that is currently writing on the block that caused the error

4) the remaining block is written to the remaining two normal datanode.

5) namenode will automatically find another suitable datanode to copy the block written in the other two datanode, and complete the writing of 3 copies. Of course, the internal mechanism for manipulating namenode is imperceptible to client.

3. Metadata storage

Namenode uses two files to save metadata, fsimag and edits files.

Fsimage: metadata image file, which stores the memory metadata information of namenode within a certain period of time

Edits: operation log file.

Fstime: the time when the last checkpoint was saved.

For a more detailed explanation of the fsimage and edits files, see "hdfs Architecture"

4. Merging of metadata

All the metadata information of ​ namenode has been loaded into memory from startup (to improve query performance) and is used to handle query operations of read requests. When there is a write operation, namenode will first write the operation log to the edits file, and then modify the metadata in memory after completion. This is mainly to ensure that the metadata has been stored on disk and will not be lost.

The fsimage file maintained internally by ​ hdfs is actually a mirror of metadata in memory, but the two are not consistent in real time. The update of fsimage is achieved by merging edits. The merge operation is completed by secondaryNameNode, and the main process is as follows:

1) first, SNN informs NN to switch the edits file, mainly to ensure that the edits file can be written normally when there is a new write operation in the merge process.

2) SNN obtains fsimage and edits files from NN through http request.

3) SNN loads fsiamge into memory and begins to merge edits into fsimage to generate a new fsimage

4) SNN sends the new fsimage to NN

5) NN replaces the old fsimage with the new fsimage.

4. Network topology selection when writing

When writing to ​, the default is 3 replicas, so which datanode nodes the replicas are distributed on will affect the write speed. In the network topology of hdfs, there are four physical ranges: the same node, different nodes on the same rack, different nodes in the same computer room, and different nodes in different computer rooms. The physical range of these 4 indicates that the distance between nodes increases gradually. The farther the physical distance is, the lower the transmission efficiency is.

5. Rack awareness

It is mentioned above that the location of the selected node of the copy will affect the write efficiency, so how does hdfs choose the location of the node.

(1) the way of the old version

The path is r1/n1-- > r2/n1-- > r2/n2

(2) New version mode

The path is r1/n1-- > r1/n2-- > r2/n2 (the latter one is actually any, mainly on a different rack)

This approach is better than the first, because the total path of data is shorter, as long as one copy needs to be transferred across racks, and two copies above need to be transferred across racks.

II. Source code analysis of HDFS uploaded files

The following analysis process is based on the source code analysis of hadoop2.8.4.

1. Client initialization source code analysis

Generally speaking, the client object that manipulates the hdfs is first obtained through FileSystem.get (), and all subsequent operations are done by calling the method of the object.

FileSystem client = FileSystem.get (new URI ("hdfs://bigdata121:9000"), conf)

Then let's take a look at the implementation of FileSystem.get ()

Public static FileSystem get (URI uri, Configuration conf) throws IOException {String scheme = uri.getScheme (); String authority = uri.getAuthority (); if (scheme = = null & & authority = = null) {return get (conf);} else {if (scheme! = null & & authority = null) {URI defaultUri = getDefaultUri (conf) If (scheme.equals (defaultUri.getScheme ()) & & defaultUri.getAuthority ()! = null) {return get (defaultUri, conf);} String disableCacheName = String.format ("fs.%s.impl.disable.cache", scheme) / * here is the key code to enter the CACHE.get () method * / return conf.getBoolean (disableCacheName, false)? CreateFileSystem (uri, conf): CACHE.get (uri, conf);}}

CACHE is an object of a static inner class Cache of FileSystem. Continue to look at the CACHE.get () method

FileSystem get (URI uri, Configuration conf) throws IOException {FileSystem.Cache.Key key = new FileSystem.Cache.Key (uri, conf); / / enter the getInternal () method return this.getInternal (uri, conf, key) of the CACHE object;}

Enter the getInternal () method of the CACHE object

Private FileSystem getInternal (URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException {FileSystem fs; synchronized (this) {/ * gets the filesytem object in map, indicating that the filesystem object has been initialized and stored in the map collection, and now it can be obtained directly from map. * / fs = (FileSystem) this.map.get (key);} if (fs! = null) {/ / if fs exists, return the existing filesytem instance directly to return fs } else {/ / if you are using filesystem for the first time, you have to create and initialize fs = FileSystem.createFileSystem (uri, conf); synchronized (this) {FileSystem oldfs = (FileSystem) this.map.get (key); if (oldfs! = null) {fs.close () Return oldfs;} else {if (this.map.isEmpty () & &! ShutdownHookManager.get () .isShutdownInProgress ()) {ShutdownHookManager.get () .addShutdownHook (this.clientFinalizer, 10);} fs.key = key This.map.put (key, fs); if (conf.getBoolean ("fs.automatic.close", true)) {this.toAutoClose.add (key);} return fs }

We have seen that there are two ways above. One is that if the filesytem object already exists, you can get it directly from the map and return it. If it does not exist, it calls the FileSystem.createFileSystem () method to create it, and returns fs when the creation is complete. Let's take a look at this method.

Private static FileSystem createFileSystem (URI uri, Configuration conf) throws IOException {Tracer tracer = FsTracer.get (conf); TraceScope scope = tracer.newScope ("FileSystem#createFileSystem"); scope.addKVAnnotation ("scheme", uri.getScheme ()); FileSystem var6; try {Class clazz = getFileSystemClass (uri.getScheme (), conf); FileSystem fs = (FileSystem) ReflectionUtils.newInstance (clazz, conf) / / this is the key code, as you can see by the name. Initialize filesytem fs.initialize (uri, conf); var6 = fs;} finally {scope.close ();} return var6;}

We should note that FileSystem is an abstract class, and its implementation subclass is DistributedFileSystem, so although fs is of type FileSystem, the object itself is of type DistributedFileSystem, which is the polymorphic nature of java. So fs.initialize () actually calls the initialize () method in DistributedFileSystem. Let's take a look at this method.

/ * DistributedFileSystem.class*/public void initialize (URI uri, Configuration conf) throws IOException {super.initialize (uri, conf); this.setConf (conf); String host = uri.getHost (); if (host = = null) {throw new IOException ("Incomplete HDFS URI, no host:" + uri) } else {this.homeDirPrefix = conf.get ("dfs.user.home.dir.prefix", "/ user"); / / this is the key code to create a DFSClient object, which, as the name implies, is the client side of RPC this.dfs = new DFSClient (uri, conf, this.statistics); this.uri = URI.create (uri.getScheme () + ": / /" + uri.getAuthority ()) This.workingDir = this.getHomeDirectory (); this.storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE.put ("DFSOpsCountStatistics", new StorageStatisticsProvider () {public StorageStatistics provide () {return new DFSOpsCountStatistics ();});}}

See that a DFSClient () object is created above and assigned to this.dfs. Let's take a look at the constructor of this class.

/ * DFSClient.class*/public DFSClient (URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {.. / * the source code is relatively long, so intercepting the important part shows that * / this is a key variable, which is actually the namenode proxy object, but the object ProxyAndInfo proxyInfo = null;. / / start creating the namenode proxy object if (proxyInfo! = null) {this.dtService = proxyInfo.getDelegationTokenService (); this.namenode = (ClientProtocol) proxyInfo.getProxy ();} else if (rpcNamenode! = null) {Preconditions.checkArgument (nameNodeUri = = null); this.namenode = rpcNamenode; this.dtService = null } else {Preconditions.checkArgument (nameNodeUri! = null, "null URI"); / / create proxy object information proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol (conf, nameNodeUri, nnFallbackToSimpleAuth); this.dtService = proxyInfo.getDelegationTokenService (); / / you can see here that the namenode proxy object is obtained directly through proxyInfo.getProxy () and the reference is assigned to this.namenode. And the type is of type ClientProtocol. This.namenode = (ClientProtocol) proxyInfo.getProxy ();} / * omit a bunch of code * /} below

You can see that the proxy object of namenode, that is, the client object of rpc, has been obtained above through this.namenode = (ClientProtocol) proxyInfo.getProxy ();. Let's take a look at what ClientProtocol is, because the proxy object is of this type.

/ * ClientProtocol.class this is an interface * / public interface ClientProtocol {long versionID = 69L; / * the following mainly defines many abstract methods, mainly the interfaces used to operate on hdfs, such as open,create and other common methods. * /}

Let's take a look at how proxyInfo creates a proxy object

/ * NameNodeProxiesClient*/ public static NameNodeProxiesClient.ProxyAndInfo createProxyWithClientProtocol (Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException {AbstractNNFailoverProxyProvider failoverProxyProvider = createFailoverProxyProvider (conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth); if (failoverProxyProvider = = null) {/ / create a HA-free proxy object InetSocketAddress nnAddr = DFSUtilClient.getNNAddress (nameNodeUri); Text dtService = SecurityUtil.buildTokenService (nnAddr) / / create proxy object ClientProtocol proxy = createNonHAProxyWithClientProtocol (nnAddr, conf, UserGroupInformation.getCurrentUser (), true, fallbackToSimpleAuth); / / ProxyAndInfo is a static inner class, which encapsulates the previous proxy and returns it. You can return the created proxy object return new NameNodeProxiesClient.ProxyAndInfo (proxy, dtService, nnAddr) through the getProxy method of this class } else {/ / create a proxy object return createHAProxy (conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider) with HA;}}

You can see that the proxy object has been created and returned, and we can also see that the proxy object created is of type clientProtocol. Let's take a look at the method createNonHAProxyWithClientProtocol () that creates the proxy object

/ * NameNodeProxiesClient*/ public static ClientProtocol createNonHAProxyWithClientProtocol (InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {RPC.setProtocolEngine (conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy (conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000, 6pm, 60000", SafeModeException.class.getName ()); long version = RPC.getProtocolVersion (ClientNamenodeProtocolPB.class) / / here is the core code. You can clearly see that calling the methods in the RPC module to create the proxy object ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) RPC.getProtocolProxy (ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory (conf), Client.getTimeout (conf), defaultPolicy, fallbackToSimpleAuth). GetProxy (); if (withRetries) {Map methodNameToPolicyMap = new HashMap (); ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB (proxy) Return (ClientProtocol) RetryProxy.create (ClientProtocol.class, new DefaultFailoverProxyProvider (ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);} else {return new ClientNamenodeProtocolTranslatorPB (proxy);}}

So at this point, we can find that the way the client communicates with namenode is through RPC.

To sum up, the calling sequence diagram of the method is as follows:

2. Upload source code analysis

Generally speaking, in order to upload, you have to

OutputStream os = fs.create (new Path ("xxxx"))

That is, create the file, and then upload the file data. The process of uploading data is no different from a normal stream operation.

Let's look at this create method.

/ * FileSystem.class*/ public abstract FSDataOutputStream create (Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException

You can see that this is an abstract method. As mentioned earlier, its implementation subclass is DistributedFileSystem. Here is to call the create method of the subclass. Continue to see

/ * DistributedFileSystem.class*/ public FSDataOutputStream create (Path f, final FsPermission permission, final EnumSet cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {this.statistics.incrementWriteOps (1); this.storageStatistics.incrementOpCounter (OpType.CREATE); Path absF = this.fixRelativePart (f) Return (FSDataOutputStream) (new FileSystemLinkResolver () {public FSDataOutputStream doCall (Path p) throws IOException {/ / here is the core code, which this.dfs mentioned earlier stores references to DFSClient objects. Many methods that operate on hdfs can be called through this client. The create method is called here to create a DFSOutputStream object. Output stream object DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create (DistributedFileSystem.this.getPathName (p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); / / here the dfsos created above is wrapped and returned to return DistributedFileSystem.this.dfs.createWrappedOutputStream (dfsos, DistributedFileSystem.this.statistics) } public FSDataOutputStream next (FileSystem fs, Path p) throws IOException {return fs.create (p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);}}) .requests (this, absF);}

You can see that the above creation returns a DFSOutputStream output stream object. Let's take a look at the implementation code of the DFSClient.create method.

/ * DFSClient.class*/ public DFSOutputStream create (String src, FsPermission permission, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress [] favoredNodes) throws IOException {this.checkOpen (); FsPermission masked= this.applyUMask (permission); LOG.debug ("{}: masked= {}", src, masked) / / create output stream object DFSOutputStream result = DFSOutputStream.newStreamForCreate (this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum (checksumOpt), this.getFavoredNodesStr (favoredNodes)); this.beginFileLease (result.getFileId (), result); return result;}

Continue to look at the DFSOutputStream.newStreamForCreate method.

/ * DistributedFileSystem.class*/ static DFSOutputStream newStreamForCreate (DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String [] favoredNodes) throws IOException {TraceScope ignored = dfsClient.newPathTraceScope ("newStreamForCreate", src); Throwable var12 = null; try {HdfsFileStatus stat = null; boolean shouldRetry = true; int retryCount = 10 While (true) {if (shouldRetry) {shouldRetry = false Try {/ / here is the core code. You can see that the file is created by calling the create method in the proxy object dfsclient.namenode and returns the status stat = dfsClient.namenode.create (src, masked, dfsClient.clientName, new EnumSetWritable (flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS). } catch (RemoteException var27) {IOException e = var27.unwrapRemoteException (new Class [] {AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class}) If (e instanceof RetryStartFileException) {if (retryCount 0 & & this.lastBlockBeingWrittenLength = =-1L;-- retriesForLastBlockLength) {DFSClient.LOG.warn ("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for "+ retriesForLastBlockLength +" times "); this.waitFor (conf.getRetryIntervalForGetLastBlockLength ()); this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength (true);} if (this.lastBlockBeingWrittenLength =-1L & & retriesForLastBlockLength = = 0) {throw new IOException (" Could not obtain the last block locations. ");}

Let's take a look at fetchLocatedBlocksAndGetLastBlockLength, the way to get block information.

/ * DFSInputStream.class*/private long fetchLocatedBlocksAndGetLastBlockLength (boolean refresh) throws IOException {LocatedBlocks newInfo = this.locatedBlocks; if (this.locatedBlocks = = null | | refresh) {/ / you can see that here you can call two methods in dfsclient to get block information newInfo = this.dfsClient.getLocatedBlocks (this.src, 0L);} DFSClient.LOG.debug ("newInfo = {}", newInfo) If (newInfo = = null) {throw new IOException ("Cannot open filename" + this.src);} else {if (this.locatedBlocks! = null) {Iterator oldIter = this.locatedBlocks.getLocatedBlocks () .iterator (); Iterator newIter = newInfo.getLocatedBlocks () .iterator () While (oldIter.hasNext () & & newIter.hasNext ()) {if (! ((LocatedBlock) oldIter.next ()) .getBlock () .equals (LocatedBlock) newIter.next ()). GetBlock ()) {throw new IOException ("Blocklist for" + this.src + "has changed!") } this.locatedBlocks = newInfo; long lastBlockBeingWrittenLength = 0L; if (! this.locatedBlocks.isLastBlockComplete ()) {LocatedBlock last = this.locatedBlocks.getLastLocatedBlock () If (last! = null) {if (last.getLocations (). Length = = 0) {if (last.getBlockSize () = = 0L) {return 0L;} return-1L } long len = this.readBlockLength (last); last.getBlock () .setNumBytes (len); lastBlockBeingWrittenLength = len;}} this.fileEncryptionInfo = this.locatedBlocks.getFileEncryptionInfo (); return lastBlockBeingWrittenLength;}}

Seeing that the above goes back to calling dfsClient.getLocatedBlocks, take a look at this method

/ * DFSClient.class*/public LocatedBlocks getLocatedBlocks (String src, long start) throws IOException {return this.getLocatedBlocks (src, start, this.dfsClientConf.getPrefetchSize ());} / / continue to call the following method public LocatedBlocks getLocatedBlocks (String src, long start, long length) throws IOException {TraceScope ignored = this.newPathTraceScope ("getBlockLocations", src); Throwable var7 = null; LocatedBlocks var8 Try {/ / call this static method to get block location information var8 = callGetBlockLocations (this.namenode, src, start, length);} catch (Throwable var17) {var7 = var17; throw var17 } finally {if (ignored! = null) {if (var7! = null) {try {ignored.close ();} catch (Throwable var16) {var7.addSuppressed (var16) }} else {ignored.close ();} return var8 } / / continue to watch static LocatedBlocks callGetBlockLocations (ClientProtocol namenode, String src, long start, long length) throws IOException {try {/ / familiar taste, obtain block information return namenode.getBlockLocations (src, start, length) through namenode proxy object;} catch (RemoteException var7) {throw var7.unwrapRemoteException (new Class [] {AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class});}}

As you can see above, the operation is still initiated through the namenode proxy object, so take a look at namenode.getBlockLocations below. Because the type of the proxy object is of type ClientProtocol and is an interface, you can see the class ClientNamenodeProtocolTranslatorPB in the implementation subclass.

/ * ClientNamenodeProtocolTranslatorPB.class*/public LocatedBlocks getBlockLocations (String src, long offset, long length) throws IOException {GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder () .setSrc (src) .setOffset (offset) .setLength (length) .build (); try {/ / familiar taste, call rcpProxy to initiate operation to namenode server. GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations ((RpcController) null, req); return resp.hasLocations ()? PBHelperClient.convert (resp.getLocations ()): null;} catch (ServiceException var8) {throw ProtobufHelper.getRemoteException (var8);}}

See here, the following is the underlying operation of RPC.

The sequence of the method is as follows:

1. FileSystem initializes, Client gets the NameNodeRpcServer proxy object and establishes RPC communication with NameNode (same as before)

2. Call the open () method of FileSystem. Since the implementation class is DistributedFileSystem, all call the open () method in this class.

3. DistributedFileSystem holds a reference to DFSClient and continues to call the open () method in DFSClient

4. Instantiate the DFSInputStream input stream

5. Call the openinfo () method

6. Call the fetchLocatedBlocksAndGetLastBlockLength () method to grab the block information and obtain the final block length

7. Call the getLocatedBlocks () method in DFSClient to obtain block information

8. Call the getBlockLocations () method of NameNodeRpcServer through the NameNode proxy object in the callGetBlockLocations () method

9. Write the block information to the output stream, and save the block location information to the member variables in the DFSInputStream input stream object in 8

10. Give it to IOUtil and download the file locally.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report