Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Interpretation of PostgreSQL Source Code-background process # 6 (walsender#2)

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

< cmd->

Startpoint) {ereport (ERROR, (errmsg ("requested starting point% X u is not in this server's history% X on timeline% u is not in this server's history", (uint32) (cmd- > startpoint > > 32), (uint32) (cmd- > startpoint) Cmd- > timeline), errdetail ("This server's history forked from timeline% u at% X at% X.", cmd- > timeline, (uint32) (switchpoint > > 32) (uint32) (switchpoint) } sendTimeLineValidUpto = switchpoint;}} else {sendTimeLine = ThisTimeLineID; sendTimeLineValidUpto = InvalidXLogRecPtr; sendTimeLineIsHistoric = false;} streamingDoneSending = streamingDoneReceiving = false; / * If there is nothing to stream, don't even enter COPY mode * / if nothing needs stream, there is no need to start the COPY command if (! sendTimeLineIsHistoric | | cmd- > startpoint)

< sendTimeLineValidUpto) { /* * When we first start replication the standby will be behind the * primary. For some applications, for example synchronous * replication, it is important to have a clear state for this initial * catchup mode, so we can trigger actions when we change streaming * state later. We may stay in this state for a long time, which is * exactly why we want to be able to monitor whether or not we are * still here. * 在首次启动复制时,standby节点会落后于master节点. * 对于某些应用,比如同步复制,对于这种初始的catchup模式有一个干净的状态是十分重要的, * 因此在改变streaming状态时我们可以触发相关的动作. * 我们可以处于这种状态很长时间,这正是我们希望有能力监控我们是否仍在这里的原因. */ //设置状态 WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ //发送CopyBothResponse消息,启动streaming pq_beginmessage(&buf, 'W');//W->

COPY command? Pq_sendbyte (& buf, 0); pq_sendint16 (& buf, 0); pq_endmessage (& buf); pq_flush (); / * Don't allow a request to stream from a future point in WAL that * hasn't been flushed to disk in this server yet. * it is not allowed to request a future WAL location on the server that has not been flushed to disk. * / if (FlushPtr

< cmd->

Startpoint) {ereport (ERROR, (errmsg ("requested starting point% X cmd-% X is ahead of the WAL flush position of this server% X", (uint32) (cmd- > startpoint > > 32), (uint32) (cmd- > startpoint), (uint32) (FlushPtr > > 32) (uint32) (FlushPtr) } / * Start streaming from the requested point * / / from the request point streaming sentPtr = cmd- > startpoint; / * Initialize shared memory status, too * / / initialize shared memory status SpinLockAcquire (& MyWalSnd- > mutex); MyWalSnd- > sentPtr = sentPtr; SpinLockRelease (& MyWalSnd- > mutex); SyncRepInitConfig () / * Main loop of walsender * / / walsender main cycle, start replication, activate replication replication_active = true; / / main cycle WalSndLoop (XLogSendPhysical); / / set to inactive state replication_active = false; if (got_STOPPING) proc_exit (0) / / exit / / set the status WalSndSetState (WALSNDSTATE_STARTUP); Assert (streamingDoneSending & & streamingDoneReceiving);} if (cmd- > slotname) ReplicationSlotRelease (); / * * Copy is finished now. Send a single-row result set indicating the next * timeline. * Copy command has been completed. Send a single-row result set to promote the next timeline * / if (sendTimeLineIsHistoric) {char startpos_str [8 + 1 + 8 + 1]; DestReceiver * dest; TupOutputState * tstate; TupleDesc tupdesc; Datum values [2]; bool nulls [2] Snprintf (startpos_str, sizeof (startpos_str), "% X CreateDestReceiver% X", (uint32) (sendTimeLineValidUpto > > 32), (uint32) sendTimeLineValidUpto); dest = CreateDestReceiver (DestRemoteSimple); MemSet (nulls, false, sizeof (nulls)); / * Need a tuple descriptor representing two columns. Int8 may seem * like a surprising data type for this, but in theory int4 would not * be wide enough for this, as TimeLineID is unsigned. * / tupdesc = CreateTemplateTupleDesc (2); TupleDescInitBuiltinEntry (tupdesc, (AttrNumber) 1, "next_tli", INT8OID,-1,0); TupleDescInitBuiltinEntry (tupdesc, (AttrNumber) 2, "next_tli_startpos", TEXTOID,-1,0) / * prepare for projection of tuple * / tstate = begin_tup_output_tupdesc (dest, tupdesc, & TTSOpsVirtual); values [0] = Int64GetDatum ((int64) sendTimeLineNextTLI); values [1] = CStringGetTextDatum (startpos_str); / * send it to dest * / do_tup_output (tstate, values, nulls); end_tup_output (tstate) } / * Send CommandComplete message * / pq_puttextmessage ('Clearing, "START_STREAMING");} III. Tracking analysis

Use gdb to track postmaster on the primary node, set a breakpoint on PostgresMain, start the standby node and enter the breakpoint

[xdb@localhost ~] $ps-ef | grep postgresxdb 1339 1 2 14:45 pts/0 00:00:00 / appdb/xdb/pg11.2/bin/postgres [xdb@localhost ~] $gdb-p 1339GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7... (gdb) set follow-fork-mode child (gdb) b exec_replication_commandBreakpoint 1 at 0x852fd2: file walsender.c Line 1438. (gdb) cContinuing. [New process 1356] [Thread debugging using libthread_db enabled] Using host libthread_db library "/ lib64/libthread_db.so.1". [switching to Thread 0x7f5df9d2d8c0 (LWP 1356)] Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:14381438 if (got_STOPPING) (gdb)

The first command is IDENTIFY_SYSTEM, and the second command is the object START_REPLICATION to be tracked.

Gdb) cContinuing.Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:14381438 if (got_STOPPING) (gdb)

1. Perform relevant initialization and verification

(gdb) n1446 if (MyWalSnd- > state = = WALSNDSTATE_STOPPING) (gdb) 1454 SnapBuildClearExportedSnapshot (); (gdb) p * MyWalSnd$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, writeLag =-1, flushLag =-1, applyLag =-1, mutex = 0000 CHECK_FOR_INTERRUPTS, latch = 0x7f5dee92c4d4, sync_standby_priority = 0} (gdb) n1456 CHECK_FOR_INTERRUPTS (); (gdb)

two。 Toggle memory context

(gdb) 1458 cmd_context = AllocSetContextCreate (CurrentMemoryContext, (gdb) 1461 old_context = MemoryContextSwitchTo (cmd_context); (gdb)

3. Initialize the replication scanner

(gdb) 1463 replication_scanner_init (cmd_string); (gdb) n1464 parse_rc = replication_yyparse (); (gdb) 1465 if (parse_rc! = 0) (gdb) p parse_rc$3 = 0 (gdb) (gdb) n1471 cmd_node = replication_parse_result; (gdb) (gdb) 1479 if (cmd_node- > type! = T_SQLCmd) (gdb) n1480 ereport (log_replication_commands? LOG: DEBUG1, (gdb) p cmd_node$4 = (Node *) 0x1df4710 (gdb) p * cmd_node$5 = {type = T_StartReplicationCmd} (gdb)

4. Perform judgment or verification related to the transaction

(gdb) n1487 if (! IsTransactionBlock ()) (gdb) 1488 SnapBuildClearExportedSnapshot (); (gdb) 1494 if (IsAbortedTransactionBlockState () & &! IsA (cmd_node, SQLCmd)) (gdb) 1500 CHECK_FOR_INTERRUPTS (); (gdb)

5. Initialize input and output messages

(gdb) 1506 initStringInfo & output_message; (gdb) 1507 initStringInfo & reply_message; (gdb) 1508 initStringInfo & tmpbuf; (gdb) 1511 pgstat_report_activity (STATE_RUNNING, NULL)

6. Execute the corresponding command according to the command type

6.1Command type is T_StartReplicationCmd, call StartReplication

(gdb) n1513 switch (cmd_node- > type) (gdb) 1534 StartReplicationCmd * cmd = (StartReplicationCmd *) cmd_node; (gdb) 1536 PreventInTransactionBlock (true, "START_REPLICATION"); (gdb) 1538 if (cmd- > kind = = REPLICATION_KIND_PHYSICAL) (gdb) 1539 StartReplication (cmd)

Enter StartReplication

1539 StartReplication (cmd); (gdb) stepStartReplication (cmd=0x1df4710) at walsender.c:532532 if (ThisTimeLineID = = 0) (gdb)

1. Perform relevant initialization and verification

(gdb) n546 if (cmd- > slotname) (gdb) 560 if (am_cascading_walsender) (gdb)

two。 Select a timeline

(gdb) n568 if (cmd- > timeline! = 0) (gdb) 572 sendTimeLine = cmd- > timeline; (gdb) 573 if (sendTimeLine = = ThisTimeLineID) (gdb) 575 sendTimeLineIsHistoric = false; (gdb) p FlushPtr$9 = 1560397696 (gdb) n576 sendTimeLineValidUpto = InvalidXLogRecPtr; (gdb) 634 streamingDoneSending = streamingDoneReceiving = false (gdb) p sendTimeLine$10 = 16 (gdb) p ThisTimeLineID$11 = 16 (gdb) p * cmd$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, startpoint = 1560281088, options = 0x0} (gdb)

3. Enter COPY mode

(gdb) n637 if (! sendTimeLineIsHistoric | | cmd- > startpoint

< sendTimeLineValidUpto)(gdb) 3.1设置状态 648 WalSndSetState(WALSNDSTATE_CATCHUP);(gdb) p sendTimeLineValidUpto$13 = 0(gdb) p cmd->

Startpoint$14 = 1560281088 (gdb)

3.2 send a CopyBothResponse message to start streaming

(gdb) n651 pq_beginmessage (& buf,'W'); (gdb) 652 pq_sendbyte (& buf, 0); (gdb) 653 pq_sendint16 (& buf, 0); (gdb) 654 pq_endmessage (& buf) (gdb) p buf$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87} (gdb) p buf- > data$16 = 0x1df53b0 "" (gdb) x/hb buf- > data0x1df53b0: 0 (gdb) x/32hb buf- > data0x1df53b0: 0 (gdb) 127 1270x1df53b8: 127 127 127 1270x1df53c0: 127 127 127 1270x1df53c8: 127 127 127 (gdb)

3.3 initialize related variables, such as shared memory status, etc.

(gdb) n655 pq_flush (); (gdb) 661 if (FlushPtr)

< cmd->

Startpoint) (gdb) p FlushPtr$17 = 1560397696 (gdb) p cmd- > startpoint$18 = 1560281088 (gdb) n672 sentPtr = cmd- > startpoint; (gdb) 675 SpinLockAcquire (& MyWalSnd- > mutex); (gdb) 676 MyWalSnd- > sentPtr = sentPtr; (gdb) 677 SpinLockRelease (& MyWalSnd- > mutex); (gdb) 679 SyncRepInitConfig (); (gdb) 682 replication_active = true

3.4 enter the main cycle (WalSndLoop)

(gdb) 684WalSndLoop (XLogSendPhysical); (gdb)

DONE!

IV. Reference materials

PG Source Code

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report