Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Interpretation of PostgreSQL Source Code (150)-PG Tools#2 (BaseBackup function)

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

This section briefly introduces the PostgreSQL backup tool pg_basebackup source code in the actual implementation of backup logic function BaseBackup.

I. data structure

Option

Data structure for storing options when using the tool

# ifndef HAVE_STRUCT_OPTION// tool software option struct option {const char * name;// name whether int has_arg;// contains parameters, no_argument/required_argument/optional_argument int * flag;// tag int val;// parameter value} # define no_argument 0#define required_argument 1#define optional_argument 2 * On OpenBSD and some versions of Solaris, opterr and friends are defined in * core libc rather than in a separate getopt module. Define these variables * only if configure found they aren't there by default; otherwise, this * module and its callers will just use libc's variables. (We assume that * testing opterr is sufficient for all of these.) * / # ifndef HAVE_INT_OPTERRint opterr = 1, / * if error message should be printed * / optind = 1, / * index into parent argv vector * / optopt; / * character checked for validity * / char * optarg / * argument associated with option * / # endif#define BADCH (int)'?'# define BADARG (int)':'# define EMSG ""

Pg_result

Used to receive the return result of PQgetResult.

Struct pg_result {/ / tuple number int ntups; / / attribute number int numAttributes; PGresAttDesc * attDescs; / / PGresTuple array PGresAttValue * * tuples; / * each PGresTuple is an array of * PGresAttValue's * / / size int tupArrSize of the tuple array / * allocated size of tuples array * / / Parameter format int numParameters; / / Parameter descriptor PGresParamDesc * paramDescs; / / execution status Type (enumerated variables) ExecStatusType resultStatus; / / Command status returned from query char cmdStatus [CMDSTATUS _ LEN] / * cmd status from the query * / / 1-binary tuple data, otherwise it is text data int binary; / * binary tuple values if binary = = 1, * otherwise text * / / * These fields are copied from the originating PGconn, so that operations * on the PGresult don't have to reference the PGconn. * these fields are copied from the original PGconn so that you do not need to rely on the PGconn * / hook function PGNoticeHooks noticeHooks; PGEvent * events; int nEvents; int client_encoding; / * encoding id * / / * * Error information (all NULL if not an error result). ErrMsg is the * "overall" error message returned by PQresultErrorMessage. If we have * per-field info then it is stored in a linked list. * error message (all NULL if there is no error) * errMsg is the "overall" error message returned by PQresultErrorMessage. * if per-field information exists, it will be stored in linked lists * / / error information char * errMsg; / * error message, or NULL if no error * / / information split by field PGMessageField * errFields; / * message broken into fields * / / if available, the text message char * errQuery that triggers the query / * text of triggering query, if available * / * All NULL attributes in the query result point to this null string * / / all NULL attributes in the query result point to the null string char null_field [1]; / * * Space management information. Note that attDescs and error stuff, if * not null, point into allocated blocks. But tuples points to a * separately malloc'd block, so that we can realloc it. * Space management information. * Note attDescs and error. If not null, point to the assigned blocks. * but the tuple points to a separate allocated block, so space can be reallocated. * / / start offset of free space in recently allocated block PGresult_data * curBlock; / * most recently allocated block * / int curOffset; / * start offset of free space in block * / remaining free bytes in the block int spaceLeft; / * number of free bytes remaining in block * / the total allocated space size_t memorySize of the PGresult structure / * total space allocated for this PGresult * /}; / * Data about a single parameter of a prepared statement * / data for a single parameter of the prepared statement statement typedef struct pgresParamDesc {/ / type ID Oid typid; / * type id * /} PGresParamDesc Typedef enum {/ / empty query string PGRES_EMPTY_QUERY = 0, / * empty query string was executed * / / the background process normally executed the query command PGRES_COMMAND_OK with no result returned / * a query command that doesn't return * anything was executed properly by the * backend * / / the background process normally executed the query command returned by the tuple / / PGresult has the result tuple PGRES_TUPLES_OK / * a query command that returns tuples was * executed properly by the backend, PGresult * contains the result tuples * / / copy data OUT, PGRES_COPY_OUT in transfer, / * Copy Out data transfer in progress * / / copy data IN, PGRES_COPY_IN in transmission / * Copy In data transfer in progress * / / received unexpected response PGRES_BAD_RESPONSE from background process, / * an unexpected response was recv'd from the * backend * / / prompt or warning message PGRES_NONFATAL_ERROR, / * notice or warning message * / / query failed PGRES_FATAL_ERROR / * query failed * / / copy I single tuple from larger resultset O, PGRES_COPY_BOTH in transit, / * Copy In/Out data transfer in progress * / single tuple PGRES_SINGLE_TUPLE / * single tuple from larger resultset * /} ExecStatusType in a larger result set Typedef union pgresult_data PGresult_data;union pgresult_data {/ / Link to the next block, or NULL PGresult_data * next; / * link to next block, or NULL * / / access block char space in byte form [1]; / * dummy for accessing block as bytes * /}; II. Source code interpretation

BaseBackup, the function that actually performs the backup.

The main logic is to initiate a backup request to the server through the libpq interface (BASE_BACKUP command)

Static voidBaseBackup (void) {PGresult * res; char * sysidentifier; TimeLineID latesttli; TimeLineID starttli; char * basebkp; char escaped_ label [MAXPGPATH]; char * maxrate_clause = NULL; int i; char xlogstart [64]; char xlogend [64]; int minServerMajor, maxServerMajor Int serverVersion, serverMajor; / / Database connection Assert (conn! = NULL); / * * Check server version. BASE_BACKUP command was introduced in 9.1, so we * can't work with servers older than 9.1. * check the server version. Base _ BACKUP does not appear until 9.1 +, and the database version cannot be lower than 9.1. * / minServerMajor = 901; maxServerMajor = PG_VERSION_NUM / 100; serverVersion = PQserverVersion (conn); serverMajor = serverVersion / 100; if (serverMajor

< minServerMajor || serverMajor >

MaxServerMajor) {const char * serverver = PQparameterStatus (conn, "server_version"); fprintf (stderr, _ ("% s: incompatible server version% s\ n"), progname, serverver? Serverver: "'unknown'"); exit (1);} / * * If WAL streaming was requested, also check that the server is new * enough for that. * require WAL streaming to check whether the database supports * / if (includewal = = STREAM_WAL & &! CheckServerVersionForStreaming (conn)) {/ * * Error message already written in CheckServerVersionForStreaming (), * but add a hint about using-X none. * the error message is reflected in CheckServerVersionForStreaming (), and a-X prompt is added here. * / fprintf (stderr, _ ("HINT: use-X none or-X fetch to disable log streaming\ n"); exit (1);} / * Build contents of configuration file if requested * if you need to create a recovery.conf file * / if (writerecoveryconf) GenerateRecoveryConf (conn) / * * Run IDENTIFY_SYSTEM so we can get the timeline * execute RunIdentifySystem to get the timeline * / if (! RunIdentifySystem (conn, & sysidentifier, & latesttli, NULL, NULL)) exit (1); / * Start the actual backup * start the actual backup * / PQescapeStringConn (conn, escaped_label, label, sizeof (escaped_label), & I) If (maxrate > 0) maxrate_clause = psprintf ("MAX_RATE% u", maxrate); if (verbose) / / prompt message fprintf (stderr, _ ("% s: initiating base backup, waiting for checkpoint to complete\ n"), progname); if (showprogress & &! verbose) {/ Progress Information fprintf (stderr, "waiting for checkpoint") If (isatty (fileno (stderr)) fprintf (stderr, "\ r"); else fprintf (stderr, "\ n");} / / base backup command basebkp = psprintf ("BASE_BACKUP LABEL'% s% s", escaped_label, showprogress? "PROGRESS": ", includewal = = FETCH_WAL? "WAL": "fastcheckpoint? "FAST": ", includewal = = NO_WAL?": "NOWAIT", maxrate_clause? Maxrate_clause: "", format = ='t'? "TABLESPACE_MAP": ", verify_checksums?": "NOVERIFY_CHECKSUMS"); / / call API if (PQsendQuery (conn, basebkp) = = 0) {fprintf (stderr, _ ("% s: could not send replication command\"% s\ ":% s"), progname, "BASE_BACKUP", PQerrorMessage (conn)); exit (1) } / * * Get the starting WAL location * get the WAL starting position * / / get the PQ execution result res = PQgetResult (conn); if (PQresultStatus (res)! = PGRES_TUPLES_OK) {fprintf (stderr, _ ("% s: could not initiate base backup:% s"), progname, PQerrorMessage (conn)); exit (1) } / / judge ntuples if (PQntuples (res)! = 1) {fprintf (stderr, _ ("% s: server returned unexpected response to BASE_BACKUP command; got% d rows and% d fields, expected% d rows and% d fields\ n"), progname, PQntuples (res), PQnfields (res), 1,2); exit (1) } / / get WAL start location strlcpy (xlogstart, PQgetvalue (res, 0,0), sizeof (xlogstart)); if (verbose) fprintf (stderr, _ ("% s: checkpoint completed\ n"), progname); / * 9.3 and later sends the TLI of the starting point. With older servers, * assume it's the same as the latest timeline reported by * IDENTIFY_SYSTEM. * 9.3 + sends TLI at the starting point. * / if (PQnfields (res) > = 2) starttli = atoi (PQgetvalue (res, 0,1); else starttli = latesttli; PQclear (res); MemSet (xlogend, 0, sizeof (xlogend)); if (verbose & & includewal! = NO_WAL) fprintf (stderr, _ ("% s: write-ahead logstart point:% s on timeline% u\ n"), progname, xlogstart, starttli) / * * Get the header * get header information * / res = PQgetResult (conn); if (PQresultStatus (res)! = PGRES_TUPLES_OK) {fprintf (stderr, _ ("% s: could not get backup header:% s"), progname, PQerrorMessage (conn)); exit (1);} if (PQntuples (res))

< 1) { fprintf(stderr, _("%s: no data returned from server\n"), progname); exit(1); } /* * Sum up the total size, for progress reporting * 统计总大小,用于进度报告 */ totalsize = totaldone = 0; tablespacecount = PQntuples(res); for (i = 0; i < PQntuples(res); i++) { totalsize += atol(PQgetvalue(res, i, 2)); /* * Verify tablespace directories are empty. Don't bother with the * first once since it can be relocated, and it will be checked before * we do anything anyway. * 验证表空间目录是否为空. * 首次验证不需要报警,因为可以重新定位并且在作其他事情前会检查. */ if (format == 'p' && !PQgetisnull(res, i, 1)) { char *path = unconstify(char *, get_tablespace_mapping(PQgetvalue(res, i, 1))); verify_dir_is_empty_or_create(path, &made_tablespace_dirs, &found_tablespace_dirs); } } /* * When writing to stdout, require a single tablespace * 在写入stdout时,要求一个独立的表空间. */ if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) >

1) {fprintf (stderr, _ ("% s: can only write single tablespace to stdout, database has% d\ n"), progname, PQntuples (res); exit (1);} / * If we're streaming WAL, start the streaming session before we start * receiving the actual data chunks. * if you are streaming WAL, start streaming session before you start to receive the actual data chunks. * / if (includewal = = STREAM_WAL) {if (verbose) fprintf (stderr, _ (% s: starting background WAL receiver\ n), progname); StartLogStreamer (xlogstart, starttli, sysidentifier);} / * Start receiving chunks * start receiving chunks * / for (I = 0; I

< PQntuples(res); i++)//所有的表空间 { if (format == 't') //tar包 ReceiveTarFile(conn, res, i); else //普通文件 ReceiveAndUnpackTarFile(conn, res, i); } /* Loop over all tablespaces */ if (showprogress) { progress_report(PQntuples(res), NULL, true); if (isatty(fileno(stderr))) fprintf(stderr, "\n"); /* Need to move to next line */ } PQclear(res); /* * Get the stop position */ res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { fprintf(stderr, _("%s: could not get write-ahead log end position from server: %s"), progname, PQerrorMessage(conn)); exit(1); } if (PQntuples(res) != 1) { fprintf(stderr, _("%s: no write-ahead log end position returned from server\n"), progname); exit(1); } strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend)); if (verbose && includewal != NO_WAL) fprintf(stderr, _("%s: write-ahead log end point: %s\n"), progname, xlogend); PQclear(res); // res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) { const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); if (sqlstate && strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0) { fprintf(stderr, _("%s: checksum error occurred\n"), progname); checksum_failure = true; } else { fprintf(stderr, _("%s: final receive failed: %s"), progname, PQerrorMessage(conn)); } exit(1); } if (bgchild >

0) {# ifndef WIN32 int status; pid_t racy else DWORD status; / * * get a pointer sized version of bgchild to avoid warnings about * casting to a different size on WIN64. * / intptr_t bgchild_handle = bgchild; uint32 hi, lo;#endif if (verbose) fprintf (stderr, _ ("% s: waiting for background process to finish streaming...\ n"), progname) # ifndef WIN32//WIN32 if (write (bgpipe [1], xlogend, strlen (xlogend))! = strlen (xlogend)) {fprintf (stderr, _ ("% s: could not send command to background pipe:% s\ n"), progname, strerror (errno)); exit (1) } / * Just wait for the background process to exit * / r = waitpid (bgchild, & status, 0); if (r = = (pid_t)-1) {fprintf (stderr, _ ("% s: could not wait for child process:% s\ n"), progname, strerror (errno)); exit (1) } if (r! = bgchild) {fprintf (stderr, _ ("% s: child% d died, expected% d\ n"), progname, (int) r, (int) bgchild); exit (1) } if (status! = 0) {fprintf (stderr, "% s:% s\ n", progname, wait_result_to_str (status)); exit (1) } / * Exited normally, we're happy! * / # else / * WIN32 * / * * On Windows, since we are in the same process, we can just store the * value directly in the variable, and then set the flag that says * it's there. * on the Windows platform, because in the same process, you only need to store the value directly into the traversal, and then set the tag. * / if (sscanf (xlogend, "% Xamp% X", & hi, & lo)! = 2) {fprintf (stderr, _ ("% s: could not parse write-ahead log location\"% s\ "\ n"), progname, xlogend); exit (1);} xlogendptr = ((uint64) hi)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report