In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "what is the role of ReceiveXlogStream in PostgreSQL". In daily operation, I believe that many people have doubts about the role of ReceiveXlogStream in PostgreSQL. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful for you to answer the question of "what is the role of ReceiveXlogStream in PostgreSQL?" Next, please follow the editor to study!
This section briefly introduces the PostgreSQL backup tool pg_basebackup source code in the actual implementation of backup logic in BaseBackup to backup WAL data backup function StartLogStreamer- > LogStreamerMain and its main implementation function ReceiveXlogStream.
I. data structure
Logstreamer_param
WAL data streamer parameter.
Typedef struct {/ background connection PGconn * bgconn; / / start location XLogRecPtr startptr; / / directory or tar file, depending on the mode used char xlog [MAXPGPATH]; / * directory or tarfile depending on mode * / / system identifier char * sysidentifier; / / timeline int timeline;} logstreamer_param
StreamCtl
Global parameters when receiving xlog stream data
/ * Global parameters when receiving xlog stream. For details about the individual fields, * see the function comment for ReceiveXlogStream (). * Global parameters when receiving xlog stream data. * for a detailed explanation of each domain field, see the notes for the ReceiveXlogStream () function. * / typedef struct StreamCtl {/ / streaming start position XLogRecPtr startpos; / * Start position for streaming * / / timeline TimeLineID timeline; / * Timeline to stream data from * / / system identifier char * sysidentifier; / * Validate this system identifier and * timeline * / / standby timeout information int standby_message_timeout / * Send status messages this often * / / whether to synchronize (whether to Flush WAL data immediately when writing) bool synchronous; / * Flush immediately WAL data on write * / / Mark segment as completed bool mark_done in archived data / * Mark segment as done in generated archive * / / flush to disk to ensure data consistency status (whether flushed to disk) bool do_sync; / * Flush to disk to ensure consistent state of * data * / / stop streaming stream_stop_callback stream_stop when T is returned / * Stop streaming when returns true * / / if valid, monitor the input in the socket and check the return pgsocket stop_socket; / * if valid of stream_stop (), watch for input on this socket * and check stream_stop () when there is any * / / how to write WAL WalWriteMethod * walmethod / * How to write the WAL * / / append to the suffix char * partial_suffix; / * Suffix appended to partially received files * / of the partially accepted file; if not, NULL char * replication_slot; / * Replication slot to use, or NULL * /} StreamCtl; II, source code interpretation
LogStreamerMain
WAL stream copies the main function, which is used for child process calls after fork
Static intLogStreamerMain (logstreamer_param * param) {Global parameters in_log_streamer = true; / / initialize StreamCtl structure MemSet (& stream, 0, sizeof (stream)) when StreamCtl stream;// receives xlog stream data; stream.startpos = param- > startptr; stream.timeline = param- > timeline; stream.sysidentifier = param- > sysidentifier; stream.stream_stop = reached_end_position;#ifndef WIN32 stream.stop_socket = bgpipe [0] # else stream.stop_socket = PGINVALID_SOCKET;#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; stream.do_sync = do_sync; stream.mark_done = true; stream.partial_suffix = NULL; stream.replication_slot = replication_slot; if (format = ='p') stream.walmethod = CreateWalDirectoryMethod (param- > xlog, 0, do_sync) Else stream.walmethod = CreateWalTarMethod (param- > xlog, compresslevel, do_sync); / / receive data if (! ReceiveXlogStream (param- > bgconn, & stream)) / * * Any errors will already have been reported in the function process, * but we need to tell the parent that we didn't shutdown in a nice * way. * errors during function execution have been issued by warning, * but you still need to tell the parent process that it cannot gracefully shut down the process. * / return 1; if (! stream.walmethod- > finish ()) {fprintf (stderr, _ ("% s: could not finish writing WAL files:% s\ n"), progname, strerror (errno)); return 1;} / end connection PQfinish (param- > bgconn) / / normal file format if (format = ='p') FreeWalDirectoryMethod (); else FreeWalTarMethod (); / / whether memory pg_free (stream.walmethod); return 0;}
ReceiveXlogStream
Receive log stream at the specified start location
/ * Receive a log stream starting at the specified position. * receive log stream * * Individual parameters are passed through the StreamCtl structure at the specified start location. * pass parameters through the StreamCtl structure. * * If sysidentifier is specified, validate that both the system * identifier and the timeline matches the specified ones * (by sending an extra IDENTIFY_SYSTEM command) * if a system identifier is specified, verify whether the system identifier and timeline match the specified information. * (by sending additional IDENTIFY_SYSTEM commands) * * All received segments will be written to the directory * specified by basedir. This will also fetch any missing timeline history * files. * all received segments will be written to basedir. * this will also advance all missing timeline history files. * The stream_stop callback will be called every time data * is received, and whenever a segment is completed. If it returns * true, the streaming will stop and the function * return. As long as it returns false, streaming will continue * indefinitely. * the stream_stop callback function is called each time the data is received and the segment completes the transmission. * if T is returned, the function will stop and the function will return. * if you return to FMagna, you will continue. * * If stream_stop () checks for external input, stop_socket should be set to * the FD it checks. This will allow such input to be detected promptly * rather than after standby_message_timeout (which might be indefinite). * Note that signals will interrupt waits for input as well, but that is * race-y since a signal received while busy won't interrupt the wait. * if stream_stop () is used to detect additional input, the stop_socket variable should be set to the FD that the function needs to check. * this allows such input to be detected immediately, not after standby_message_timeout (which may loop indefinitely). * Note that the signal will also interrupt the input wait, but this is competitive, because the signal received during busy times will not interrupt the wait. * standby_message_timeout controls how often we send a message * back to the master letting it know our progress, in milliseconds. * Zero means no messages are sent. * This message will only contain the write location, and never * flush or replay. * standby_message_timeout controls the frequency (in ms) at which progress messages are sent back to master. * 0 means that no message will be sent. * this message only saves the write location and will never flush or replay. * * If 'partial_suffix' is not NULL, files are initially created with the * given suffix, and the suffix is removed once the file is finished. That * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * if 'partial_suffix' is not NULL, the file has been created by the given suffix, * once the file is transferred, the suffix will be cleared. * this is the similarities and differences between partial and complete documents so that they can continue after leaving. * * If 'synchronous' is true, the received WAL is flushed as soon as written, * otherwise only when the WAL file is closed. * if 'synchronous' is T, the received WAL will be refreshed to write, otherwise it will only be written when WAL file is turned off. * * Note: The WAL location * must* be at a log segment start! * Note: the WAL position must be the starting position of the log segment. * / boolReceiveXlogStream (PGconn * conn, StreamCtl * stream) {char query; char slotcmd [128]; PGresult * res; XLogRecPtr stoppos; / * * The caller should've checked the server version already, but doesn't do * any harm to check it here too. * the caller has completed the version verification, but there is no problem with repeated verification here. * / if (! CheckServerVersionForStreaming (conn)) return false; / * Decide whether we want to report the flush position. If we report the * flush position, the primary will know what WAL we'll possibly * re-request, and it can then remove older WAL safely. We must always do * that when we are using slots. * determine whether the flush location needs to be reported. * if we report the flush location, the master server will know the WAL file that may be requested repeatedly, * this will safely remove the older WAL. * if you use slots, you should do this frequently. * Reporting the flush position makes one eligible as a synchronous * replica. People shouldn't include generic names in * synchronous_standby_names, but we've protected them against it so far, * so let's continue to do so unless specifically requested. * report the location of the flush to make it eligible for synchronized copies. * DBA should not include regular names in synchronous_standby_names, but we have protected them well so far, * so we can continue to do so unless specifically requested. * / if (stream- > replication_slot! = NULL) {/ / exists slot reportFlushPosition = true; sprintf (slotcmd, "SLOT\"% s\ ", stream- > replication_slot);} else {if (stream- > synchronous) reportFlushPosition = true;// synchronous else reportFlushPosition = false;// Asynchronous slotcmd [0] = 0 / / ASCII 0} if (stream- > sysidentifier! = NULL) {/ / the system identifier is not NULL / * Validate system identifier hasn't changed * / / verify that the system identifier has not changed / / send the IDENTIFY_SYSTEM command res = PQexec (conn, "IDENTIFY_SYSTEM") If (PQresultStatus (res)! = PGRES_TUPLES_OK) {fprintf (stderr, _ ("% s: could not send replication command\"% s\ ":% s"), progname, "IDENTIFY_SYSTEM", PQerrorMessage (conn)); PQclear (res); return false } if (PQntuples (res)! = 1 | | PQnfields (res))
< 3) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"), progname, PQntuples(res), PQnfields(res), 1, 3); PQclear(res); return false; } if (strcmp(stream-> < blockpos && walfile != NULL) { if (stream->Walmethod- > sync (walfile)! = 0) {fprintf (stderr, _ ("% s: could not fsync file\"% s\ ":% s\ n"), progname, current_walfile_name, stream- > walmethod- > getlasterror (); goto error;} lastFlushPosition = blockpos / * * Send feedback so that the server sees the latest WAL locations * immediately. * send feedback so that the server can see the last WAL location immediately. * / if (! sendFeedback (conn, blockpos, now, false)) goto error; last_status = now } / * * Potentially send a status message to the master * may send a status message * / if (still_sending & & stream- > standby_message_timeout > 0 & & feTimestampDifferenceExceeds (last_status, now) to the master server Stream- > standby_message_timeout) {/ * Time to send feedback! * / / it's time to send feedback. If (! sendFeedback (conn, blockpos, now, false)) goto error; last_status = now } / * * Calculate how long send/receive loops should sleep * calculate how long the send/receive cycle should sleep * / sleeptime = CalculateCopyStreamSleeptime (now, stream- > standby_message_timeout, last_status) / / copy the content received in stream r = CopyStreamReceive (conn, sleeptime, stream- > stop_socket, ©buf); while (r! = 0) {if (r =-1) goto error / / error if (r =-2) {/ / completed or error PGresult * res = HandleEndOfCopyStream (conn, stream, copybuf, blockpos, stoppos); if (res = = NULL) goto error; else return res } / * Check the message type. * / / check the message type if (copybuf [0] = ='k') {if (! ProcessKeepaliveMsg (conn, stream, copybuf, r, blockpos, & last_status)) goto error } else if (copybuf [0] = ='w') {if (! ProcessXLogDataMsg (conn, stream, copybuf, r, & blockpos)) goto error; / * * Check if we should continue streaming, or abort at this * point. * check whether we should continue streaming or stop * / if (! CheckCopyStreamStop (conn, stream, blockpos, stoppos)) goto error here } else {fprintf (stderr, _ ("% s: unrecognized streaming header:\"% c\ "\ n"), progname, copybuf [0]); goto error;} / * * Process the received data, and any subsequent data we can read * without blocking. * deal with the received data, and the subsequent data can be read without blocking. * / r = CopyStreamReceive (conn, 0, stream- > stop_socket, ©buf);} error: if (copybuf! = NULL) PQfreemem (copybuf); return NULL;} / * * Check if we should continue streaming, or abort at this point. * / static boolCheckCopyStreamStop (PGconn * conn, StreamCtl * stream, XLogRecPtr blockpos, XLogRecPtr * stoppos) {if (still_sending & & stream- > stream_stop (blockpos, stream- > timeline, false)) {if (! close_walfile (stream, blockpos)) {/ * Potential error message is written by close_walfile * / return false;} if (PQputCopyEnd (conn, NULL)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.