In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
This article focuses on "StartLogStreamer Analysis in PostgreSQL". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "StartLogStreamer Analysis in PostgreSQL".
This section briefly introduces the PostgreSQL backup tool pg_basebackup source code in the actual implementation of backup logic in BaseBackup to backup WAL data backup function StartLogStreamer.
I. data structure
Logstreamer_param
WAL data streamer parameter.
Typedef struct {/ background connection PGconn * bgconn; / / start location XLogRecPtr startptr; / / directory or tar file, depending on the mode used char xlog [MAXPGPATH]; / * directory or tarfile depending on mode * / / system identifier char * sysidentifier; / / timeline int timeline;} logstreamer_param
StreamCtl
Global parameters when receiving xlog stream data
/ * Global parameters when receiving xlog stream. For details about the individual fields, * see the function comment for ReceiveXlogStream (). * Global parameters when receiving xlog stream data. * for a detailed explanation of each domain field, see the notes for the ReceiveXlogStream () function. * / typedef struct StreamCtl {/ / streaming start position XLogRecPtr startpos; / * Start position for streaming * / / timeline TimeLineID timeline; / * Timeline to stream data from * / / system identifier char * sysidentifier; / * Validate this system identifier and * timeline * / / standby timeout information int standby_message_timeout / * Send status messages this often * / / whether to synchronize (whether to Flush WAL data immediately when writing) bool synchronous; / * Flush immediately WAL data on write * / / Mark segment as completed bool mark_done in archived data / * Mark segment as done in generated archive * / / flush to disk to ensure data consistency status (whether flushed to disk) bool do_sync; / * Flush to disk to ensure consistent state of * data * / / stop streaming stream_stop_callback stream_stop when T is returned / * Stop streaming when returns true * / / if valid, monitor the input in the socket and check the return pgsocket stop_socket; / * if valid of stream_stop (), watch for input on this socket * and check stream_stop () when there is any * / / how to write WAL WalWriteMethod * walmethod / * How to write the WAL * / / append to the suffix char * partial_suffix; / * Suffix appended to partially received files * / of the partially accepted file; if not, NULL char * replication_slot; / * Replication slot to use, or NULL * /} StreamCtl; II, source code interpretation
StartLogStreamer
StartLogStreamer is used to initialize the background process to receive WAL. The receiving process will create its own database connection to streaming copy files in parallel.
/ * Initiate background process for receiving xlog during the backup. * The background stream will use its own database connection so we can * stream the logfile in parallel with the backups. * initialize the background process to receive WAL during backup. * the background stream process will use its own database connection to stream files in parallel. * / static voidStartLogStreamer (char * startpos, uint32 timeline, char * sysidentifier) {/ / Parameter logstreamer_param * param; uint32 hi, lo;// high / low char statusdir [MAXPGPATH]; param = pg_malloc0 (sizeof (logstreamer_param)); param- > timeline = timeline; param- > sysidentifier = sysidentifier / * Convert the starting position * / / conversion start position (high-low conversion) if (sscanf (startpos, "% Xamp% X", & hi, & lo)! = 2) {fprintf (stderr, _ ("% s: could not parse write-ahead log location\"% s\ "\ n"), progname, startpos); exit (1) } / / start position, address translated to 64bit param- > startptr = ((uint64) hi) startptr-= XLogSegmentOffset (param- > startptr, WalSegSz); # ifndef WIN32 / / WIN32 code / * Create our background pipe * / if (pipe (bgpipe)
< 0) { fprintf(stderr, _("%s: could not create pipe for background process: %s\n"), progname, strerror(errno)); exit(1); }#endif /* Get a second connection */ //获取第二个连接 param->Bgconn = GetConnection (); if (! param- > bgconn) / * Error message already written in GetConnection () * / exit (1); / * In post-10 cluster, pg_xlog has been renamed to pg_wal * / / has been named pg_wal snprintf (param- > xlog, sizeof (param- > xlog), "% samp% s", basedir, PQserverVersion (conn) in PG 10.
< MINIMUM_VERSION_FOR_PG_WAL ? "pg_xlog" : "pg_wal"); /* Temporary replication slots are only supported in 10 and newer */ //临时复制slots只在PG10+支持 if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS) temp_replication_slot = false; /* * Create replication slot if requested * 如要求,则创建复制slot */ //static char *replication_slot = NULL; //static bool temp_replication_slot = true; if (temp_replication_slot && !replication_slot) //创建replication slot replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->Bgconn); if (temp_replication_slot | | create_slot) {/ / create replication slot if (! CreateReplicationSlot (param- > bgconn, replication_slot, NULL, temp_replication_slot, true, true, false)) exit (1) If (verbose) {/ / display diagnostic information if (temp_replication_slot) fprintf (stderr, _ ("% s: created temporary replication slot\"% s\ "\ n"), progname, replication_slot) Else fprintf (stderr, _ (% s: created replication slot\ "% s\"\ n "), progname, replication_slot) }} if (format = ='p') {/ * Create pg_wal/archive_status or pg_xlog/archive_status (and thus * pg_wal or pg_xlog) depending on the target server so we can write * to basedir/pg_wal or basedir/pg_xlog as the directory entry in the * tar file may arrive later. * create a pg_wal/archive_status or pg_xlog/archive_status based on the target server, * this can be written to the basedir/pg_wal goods basedir/pg_xlog, which can be used as a directory entry for subsequent access to tar files * / snprintf (statusdir, sizeof (statusdir), "% s/%s/archive_status", basedir, PQserverVersion (conn))
< MINIMUM_VERSION_FOR_PG_WAL ? "pg_xlog" : "pg_wal"); if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST) { fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), progname, statusdir, strerror(errno)); exit(1); } } /* * Start a child process and tell it to start streaming. On Unix, this is * a fork(). On Windows, we create a thread. * 启动子进程开始streaming. * 在UNIX平台,是一个fork进程,在Windows平台,创建线程. */#ifndef WIN32 //UNIX:fork进程 bgchild = fork(); if (bgchild == 0) { //这是子进程,返回0 /* in child process */ //启动新进程 exit(LogStreamerMain(param)); } else if (bgchild < 0) { fprintf(stderr, _("%s: could not create background process: %s\n"), progname, strerror(errno)); exit(1); } /* * Else we are in the parent process and all is well. * 在父进程中,返回的bgchild是子进程PID. */ atexit(kill_bgchild_atexit);#else /* WIN32 */ //WIN32:创建线程 bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL); if (bgchild == 0) { fprintf(stderr, _("%s: could not create background thread: %s\n"), progname, strerror(errno)); exit(1); }#endif} LogStreamerMain WAL流复制主函数,用于fork后的子进程调用 static intLogStreamerMain(logstreamer_param *param){ StreamCtl stream;//接收xlog流数据时的全局参数 in_log_streamer = true; //初始化StreamCtl结构体 MemSet(&stream, 0, sizeof(stream)); stream.startpos = param->Startptr; stream.timeline = param- > timeline; stream.sysidentifier = param- > sysidentifier; stream.stream_stop = reached_end_position;#ifndef WIN32 stream.stop_socket = bgpipe [0]; # else stream.stop_socket = PGINVALID_SOCKET;#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; stream.do_sync = do_sync; stream.mark_done = true; stream.partial_suffix = NULL Stream.replication_slot = replication_slot; if (format = ='p') stream.walmethod = CreateWalDirectoryMethod (param- > xlog, 0, do_sync); else stream.walmethod = CreateWalTarMethod (param- > xlog, compresslevel, do_sync) / / receive data if (! ReceiveXlogStream (param- > bgconn, & stream)) / * * Any errors will already have been reported in the function process, * but we need to tell the parent that we didn't shutdown in a nice * way. * errors during function execution have been issued by warning, * but you still need to tell the parent process that it cannot gracefully shut down the process. * / return 1; if (! stream.walmethod- > finish ()) {fprintf (stderr, _ ("% s: could not finish writing WAL files:% s\ n"), progname, strerror (errno)); return 1;} / end connection PQfinish (param- > bgconn) / / normal file format if (format = ='p') FreeWalDirectoryMethod (); else FreeWalTarMethod (); / / whether memory pg_free (stream.walmethod); return 0;} III. Tracking analysis
Backup command
Pg_basebackup-h 192.168.26.25-U replicator-p 5432-D / data/backup-P-Xs-R-v
Start gdb trace
[xdb@localhost] $gdb pg_basebackupGNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7Copyright (C) 2013 Free Software Foundation, Inc.License GPLv3+: GNU GPL version 3 or later This is free software: you are free to change and redistribute it.There is NO WARRANTY, to the extent permitted by law. Type "show copying" and "show warranty" for details.This GDB was configured as "x86_64-redhat-linux-gnu" .for bug reporting instructions, please see:...Reading symbols from / appdb/atlasdb/pg11.2/bin/pg_basebackup...done. (gdb) b StartLogStreamerBreakpoint 1 at 0x403e6b: file pg_basebackup.c Line 555. (gdb) set args-h 192.168.26.25-U replicator-p 5432-D / data/backup-P-Xs-R-v (gdb) rStarting program: / appdb/xdb/pg11.2/bin/pg_basebackup-h 192.168.26.25-U replicator-p 5432-D / data/backup-P-Xs-R-v [Thread debugging using libthread_db enabled] Using host libthread_db library "/ lib64/libthread_db.so.1" .password: pg_basebackup: initiating basebackup Waiting for checkpoint to completepg_basebackup: checkpoint completedpg_basebackup: write-ahead log start point: 0Accord 57000060 on timeline 16pg_basebackup: starting background WAL receiverBreakpoint 1, StartLogStreamer (startpos=0x7fffffffdf60 "0Accord 57000060", timeline=16, sysidentifier=0x61f1a0 "6666964067616600474") at pg_basebackup.c:555555 param = pg_malloc0 (sizeof (logstreamer_param)) (gdb)
Input parameters
Startpos=0x7fffffffdf60 "0Accord 57000060"
Timeline=16
Sysidentifier=0x61f1a0 "6666964067616600474"
Structural parameters
(gdb) n556 param- > timeline = timeline; (gdb) 557 param- > sysidentifier = sysidentifier; (gdb) 560 if (sscanf (startpos, "% X hi, & lo)! = 2) (gdb) 567 param- > startptr = ((uint64) hi) startptr-= XLogSegmentOffset (param- > startptr, WalSegSz); (gdb) n573 if (pipe (bgpipe))
< 0)(gdb) p *param$3 = {bgconn = 0x0, startptr = 1459617792, xlog = '\000' , sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}(gdb) 建立连接,创建replication slot (gdb) n583 param->Bgconn = GetConnection (); (gdb) 584 if (! param- > bgconn) (gdb) 591 PQserverVersion (conn)
< MINIMUM_VERSION_FOR_PG_WAL ?(gdb) 589 snprintf(param->Xlog, sizeof (param- > xlog), "% s if", (gdb) 595 if (PQserverVersion (conn))
< MINIMUM_VERSION_FOR_TEMP_SLOTS)(gdb) 601 if (temp_replication_slot && !replication_slot)(gdb) 602 replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->Bgconn)) (gdb) 603 if (temp_replication_slot | | create_slot) (gdb) 605 if (! CreateReplicationSlot (param- > bgconn, replication_slot, NULL, (gdb) 609 if (verbose) (gdb) 611 if (temp_replication_slot) (gdb) 612 fprintf (stderr, _ ("% s: created temporary replication slot\"% s\ "\ n") (gdb) pg_basebackup: created temporary replication slot "pg_basebackup_59378" 620 if (format = ='p') (gdb) (gdb) n630 PQserverVersion (conn) < MINIMUM_VERSION_FOR_PG_WAL? (gdb) 628 snprintf (statusdir, sizeof (statusdir), "% s/%s/archive_status"
Create a backup directory
(gdb) 633 if (pg_mkdir_p (statusdir, pg_dir_create_mode)! = 0 & & errno! = EEXIST) (gdb) p * param$4 = {bgconn = 0x62a280, startptr = 1459617792, xlog = "/ data/backup/pg_wal",'\ 000', sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16} (gdb) n647 bgchild = fork (); (gdb) # [xdb@localhost backup] $lspg_wal
Fork process, the parent process returns the PID of the child process
(gdb) n647 bgchild = fork (); (gdb) nDetaching after fork from child process 43001.648 if (bgchild = = 0) (gdb) p bgchild$5 = 43001 (gdb)
Child process (PID=43001)
[xdb@localhost backup] $ps-ef | grep 43001xdb 43001 42820 1 11:54 pts/1 00:00:01 / appdb/xdb/pg11.2/bin/pg_basebackup-h 192.168.26.25-U replicator-p 5432-D / data/backup-P-Xs-R-v [xdb@localhost backup] $ps-ef | grep 192.168.26.25xdb 42820 42756 0 11:48 pts/1 00:00:00 / appdb/xdb/pg11.2/bin/pg_basebackup- H 192.168.26.25-U replicator-p 5432-D / data/backup-P-Xs-R-vxdb 43001 42820 11:54 pts/1 00:00:01 / appdb/xdb/pg11.2/bin/pg_basebackup-h 192.168.26.25-U replicator-p 5432-D / data/backup-P-Xs-R-v
Complete the call
(gdb) n653 else if (bgchild < 0) (gdb) 672} (gdb) BaseBackup () at pg_basebackup.c:19371937 for (I = 0; I < PQntuples (res); iTunes +) (gdb)
Data in the pg_wal directory
[xdb@localhost backup] $ls-l. / pg_wal/total 16388 RWMI. 1 xdb xdb 16777216 Mar 18 11:54 00000010000000000057RW Mar. 1 xdb xdb 217 Mar 18 11:54 00000010.copyright ydrwxmuri. 2 xdb xdb 35 Mar 18 11:54 archive_ status [XDB @ localhost backup] $so far, I believe you have a better understanding of "StartLogStreamer analysis in PostgreSQL". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.