In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the knowledge of "how to use StampLock". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Main member variable
Public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {/ / the location where the data is actually stored private final EntryLogger entryLogger; / /-/ / index related / record fence,exist,masterKey and other information private final LedgerMetadataIndex ledgerIndex; / / index private final EntryLocationIndex entryLocationIndex about the location / / temporary ledgerCache private final ConcurrentLongHashMap transientLedgerInfoCache; / /-/ / write correlation / /-/ / memtable,2 mutual swap private final StampedLock writeCacheRotationLock for writing = new StampedLock (); / / Write cache where all new entries are inserted into protected volatile WriteCache writeCache / / Write cache that is used to swap with writeCache during flushes protected volatile WriteCache writeCacheBeingFlushed; / / Cache where we insert entries for speculative reading private final ReadCache readCache; / / checkpoint related private final CheckpointSource checkpointSource; private Checkpoint lastCheckpoint = Checkpoint.MIN;} main role
Can read and write ledger, maintain the location of ledger (index)
Save ledger-related metadata
Support for checkpoint
Write to Entry
Writes are written directly to WriteCache, where StampLock is used to protect the operation of swap cache. StampLock is an optimistic read-write lock with higher concurrency.
Public long addEntry (ByteBuf entry) throws IOException, BookieException {long startTime = MathUtils.nowInNano (); long ledgerId = entry.getLong (entry.readerIndex ()); long entryId = entry.getLong (entry.readerIndex () + 8); long lac = entry.getLong (entry.readerIndex () + 16) / / the template here is the general template for StampLock optimistic reading / / the relative mutex operation is actually the swap cache operation / / First we try to do an optimistic locking to get access to the current write cache. / / This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the / / rest of the time, we can have multiple thread using the optimistic lock here without interfering. / / optimistic read lock long stamp = writeCacheRotationLock.tryOptimisticRead (); boolean inserted = false; inserted = writeCache.put (ledgerId, entryId, entry); / / if cache swap occurs during insertion, insert if (! writeCacheRotationLock.validate (stamp)) {/ / The write cache was rotated while we were inserting again. We need to acquire the proper read lock and repeat / / the operation because we might have inserted in a write cache that was already being flushed and cleared, / / without being sure about this last entry being flushed or not. / / it is inserted into the cache that is swap / / if the insert is true TODO / / if it is false, it does not affect stamp = writeCacheRotationLock.readLock (); try {inserted = writeCache.put (ledgerId, entryId, entry);} finally {writeCacheRotationLock.unlockRead (stamp) }} / / if writing to writeCache fails, trigger Flush WriteCache / / come here to indicate that both buffer may be full? If (! inserted) {triggerFlushAndAddEntry (ledgerId, entryId, entry);} / update LAC cache / / after successfully insert the entry, update LAC and notify the watchers updateCachedLacIfNeeded (ledgerId, lac); return entryId;} writeCache is full, triggering the flush process
The logic here is relatively easy, constantly looping into the writeCache, if the timeout, then jump out of the loop mark, this write failed.
If the flush action is not triggered, a flush task is submitted.
Private void triggerFlushAndAddEntry (long ledgerId, long entryId, ByteBuf entry) throws IOException, BookieException {/ / metric dot dbLedgerStorageStats.getThrottledWriteRequests () .inc ();. / / maximum waiting time for writing, and keep retrying while (System.nanoTime ()) before timeout
< absoluteTimeoutNanos) { // Write cache is full, we need to trigger a flush so that it gets rotated // If the flush has already been triggered or flush has already switched the // cache, we don't need to trigger another flush // 提交一个flush任务,如果之前有了就不提交了 if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) { // Trigger an early flush in background log.info("Write cache is full, triggering flush"); executor.execute(() ->{try {flush ();} catch (IOException e) {log.error ("Error during flush", e);}});} long stamp = writeCacheRotationLock.readLock () Try {if (writeCache.put (ledgerId, entryId, entry)) {/ / We succeeded in putting the entry in write cache in the return;}} finally {writeCacheRotationLock.unlockRead (stamp) } / / Wait some time and try again try {Thread.sleep (1);} catch (InterruptedException e) {Thread.currentThread () .interrupt (); throw new IOException ("Interrupted when adding entry" + ledgerId + "@" + entryId) } / / Timeout expired and we weren't able to insert in write cache dbLedgerStorageStats.getRejectedWriteRequests () .inc (); throw new OperationRejectedException ();} flush process
In fact, the flush process is the logic that triggers checkpoint.
Main action
Exchange 2 writeCache, and the cache being written will be swapped into the batch of flush
Go through writeCache and write the content to EntryLogger.
Sync EntryLogger unloads the content written in the previous step.
Update ledgerLocationIndex and flush the index into rocksDb at the same time
Public void flush () throws IOException {/ / journal Checkpoint cp = checkpointSource.newCheckpoint (); checkpoint (cp); checkpointSource.checkpointComplete (cp, true);} public void checkpoint (Checkpoint checkpoint) throws IOException {/ / journal Checkpoint thisCheckpoint = checkpointSource.newCheckpoint (); / / check here whether checkpoint has been done before this point if (lastCheckpoint.compareTo (checkpoint) > 0) {return } long startTime = MathUtils.nowInNano (); / / Only a single flush operation can happen at a time flushMutex.lock () Try {/ / Swap the write cache so that writes can continue to happen while the flush is / / ongoing / / the logic here is relatively easy, exchanging the current writeCache and the backup writeCache / / gets the writeLock swapWriteCache () of StampLock; long sizeToFlush = writeCacheBeingFlushed.size () / / Write all the pending entries into the entry logger and collect the offset / / position for each entry / / swipe cache to the actual save location, / / build a rocksDb batch Batch batch = entryLocationIndex.newBatch () WriteCacheBeingFlushed.forEach ((ledgerId, entryId, entry)-> {try {/ / brush the written entry into the entryLogger / / the offset long location = entryLogger.addEntry of the entry returned here (ledgerId, entry, true) / / the logic here is actually to split the three long into KHV and write them into RocksDb's batch entryLocationIndex.addLocation (batch, ledgerId, entryId, location);} catch (IOException e) {throw new RuntimeException (e);}}) / / I'm not going to expand here, but I will actually flush & & fsync the entryLogger I just wrote to disk. EntryLogger.flush (); / / the batch flush that triggers RocksDb here / / this write is long batchFlushStarTime = System.nanoTime () of sync; batch.flush (); batch.close () / / flush ledgerIndex / / the content here changes a little, because what is recorded is metadata ledgerIndex.flush () / / schedule a cleanUp's logical cleanupExecutor.execute (()-> {/ / There can only be one single cleanup task running because the cleanupExecutor / / is single-threaded try {if (log.isDebugEnabled ()) {log.debug ("Removing deleted ledgers from db indexes")) } entryLocationIndex.removeOffsetFromDeletedLedgers (); ledgerIndex.removeDeletedLedgers ();} catch (Throwable t) {log.warn ("Failed to cleanup db indexes", t);}}); / / Save checkpoint lastCheckpoint = thisCheckpoint / / clear this cache / / Discard all the entry from the write cache, since they're now persisted writeCacheBeingFlushed.clear ();} catch (IOException e) {/ / Leave IOExecption as it is throw e;} catch (RuntimeException e) {/ / Wrap unchecked exceptions throw new IOException (e) } finally {try {isFlushOngoing.set (false);} finally {flushMutex.unlock ();}
So the write is complete.
Read Entry
This will start reading from three locations.
WriteCache, including those being refreshed and those being written
ReadCache, read-ahead cache
EntryLogger, read the file, this part has been closed.
After a successful read, an attempt is made to increase the pre-read buffer
What if a read is triggered at this time of flush?
The above flush process empties the refreshed writeCache only after all the content has been closed.
Even if there is parallel reading, if you end up reading the file, how can you read it?
Another problem is that in this order, it is uncertain whether there are requests with the same ledgerId,entry but different contents.
In that case, there may be a problem.
Public ByteBuf getEntry (long ledgerId, long entryId) throws IOException {long startTime = MathUtils.nowInNano (); / / read LAC if (entryId = = BookieProtocol.LAST_ADD_CONFIRMED) {return getLastEntry (ledgerId);} / / We need to try to read from both write caches, since recent entries could be found in either of the two. The / / write caches are already thread safe on their own, here we just need to make sure we get references to both / / of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches. Long stamp = writeCacheRotationLock.tryOptimisticRead (); WriteCache localWriteCache = writeCache; WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed; if (! writeCacheRotationLock.validate (stamp)) {/ / Fallback to regular read lock approach stamp = writeCacheRotationLock.readLock (); try {localWriteCache = writeCache; localWriteCacheBeingFlushed = writeCacheBeingFlushed;} finally {writeCacheRotationLock.unlockRead (stamp) }} / / First try to read from the write cache of recent entries ByteBuf entry = localWriteCache.get (ledgerId, entryId); if (entry! = null) {recordSuccessfulEvent (dbLedgerStorageStats.getReadCacheHitStats (), startTime); recordSuccessfulEvent (dbLedgerStorageStats.getReadEntryStats (), startTime); return entry } / / If there's a flush going on, the entry might be in the flush buffer entry = localWriteCacheBeingFlushed.get (ledgerId, entryId); if (entry! = null) {recordSuccessfulEvent (dbLedgerStorageStats.getReadCacheHitStats (), startTime); recordSuccessfulEvent (dbLedgerStorageStats.getReadEntryStats (), startTime); return entry;} / / Try reading from read-ahead cache entry = readCache.get (ledgerId, entryId) If (entry! = null) {recordSuccessfulEvent (dbLedgerStorageStats.getReadCacheHitStats (), startTime); recordSuccessfulEvent (dbLedgerStorageStats.getReadEntryStats (), startTime); return entry;} / / Read from main storage long entryLocation; try {entryLocation = entryLocationIndex.getLocation (ledgerId, entryId); if (entryLocation = = 0) {throw new NoEntryException (ledgerId, entryId) } entry = entryLogger.readEntry (ledgerId, entryId, entryLocation);} catch (NoEntryException e) {recordFailedEvent (dbLedgerStorageStats.getReadEntryStats (), startTime); throw e;} readCache.put (ledgerId, entryId, entry); / / Try to read more entries long nextEntryLocation = entryLocation + 4 / * size header * / + entry.readableBytes (); fillReadAheadCache (ledgerId, entryId + 1, nextEntryLocation) So much for recordSuccessfulEvent (dbLedgerStorageStats.getReadCacheMissStats (), startTime); recordSuccessfulEvent (dbLedgerStorageStats.getReadEntryStats (), startTime); return entry;} "how to use StampLock". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.