pg_walsender
walsender分析分为两部分,
-
与备库建立链接前的流程
主库会监听socket,并接受备库的tcp链接请求,当收到备库walReceiver请求后,主库postgres就会fork出一个waksender进程来处理相关请求。
-
与备库建立链接后的流程
建立连接后,walsender需要判断发送哪些数据,并启动keepalive机制,探测备库receiver是否正常,同时发送数据。
建连前
postgres主进程启动后,会监听配置的地址,等待新的连接到来。而walsender启动就是在等待walreceiver过来连接。
if (strcmp(valptr, "database") == 0)
{
am_walsender = true;
am_db_walsender = true;
}
当am_walsender设置为true时,就表示postgres创建的这个服务进程为walsender。walsender进程与普通的来自客户端的连接进程没有什么区别,只是walsender专门做流复制这件事,同时它接受处理的是来自walreceiver的请求。
全局变量
/* global state */
extern PGDLLIMPORT bool am_walsender; // 是否是walsender进程
extern PGDLLIMPORT bool am_cascading_walsender; // 是否是级联walsender
extern PGDLLIMPORT bool am_db_walsender; // 是否连接到数据库
extern PGDLLIMPORT bool wake_wal_senders;
/* user-settable parameters */
extern PGDLLIMPORT int max_wal_senders; // 最大walsender进程数
extern PGDLLIMPORT int wal_sender_timeout; // wal消息发送超时时间
extern PGDLLIMPORT bool log_replication_commands;
-
am_walsender和am_db_walsender
解析启动参数replication的值进行赋值,如果replication的值是database或者true就设置这两个值为true。
if (strcmp(valptr, "database") == 0)
{
am_walsender = true;
am_db_walsender = true;
}
else if (!parse_bool(valptr, &am_walsender)) {
} -
am_cascading_walsender
am_cascading_walsender在初始化walSnd的时候赋值。
am_cascading_walsender = RecoveryInProgress();
值呢主要来自于全局的LocalRecoveryInProgress,LocalRecoveryInProgress=false时就是false,否则的话就从xlogctl->SharedRecoveryState取值。
LocalRecoveryInProgress = (xlogctl->SharedRecoveryState != RECOVERY_STATE_DONE);
对外接口
extern void InitWalSender(void);
extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
extern void WalSndRqstFileReload(void);
-
InitWalSender
初始化一个walSnd。当am_walsender为true的时候,porstgres启动的时候就会初始化一个walSnd。
初始化slot的时候会将全局的WalSndCtl的walsnds初始化,walsnds是一个变长数组,会根据max_wal_senders进行内存分配和初始化。每个创建的walSnd都会保存到全局的WalSndCtl的数组中。
根据walSnd的pid是否为0来判断是否需要初始化,每个初始化的walSnd的状态为WALSNDSTATE_STARTUP。
对内接口
extern void WalSndSetState(WalSndState state);
/*
* Internal functions for parsing the replication grammar, in repl_gram.y and
* repl_scanner.l
*/
extern int replication_yyparse(void);
extern int replication_yylex(void);
extern void replication_yyerror(const char *str) pg_attribute_noreturn();
extern void replication_scanner_init(const char *query_string);
extern void replication_scanner_finish(void);
extern bool replication_scanner_is_replication_command(void);
-
WalSndSetState
用来更改walSnd的状态。
数据模型
- walsender状态
typedef enum WalSndState
{
WALSNDSTATE_STARTUP = 0,
WALSNDSTATE_BACKUP,
WALSNDSTATE_CATCHUP,
WALSNDSTATE_STREAMING,
WALSNDSTATE_STOPPING
} WalSndState;
- walsender 结构
一个进程对应一个walSnd结构。
typedef struct WalSnd
{
pid_t pid; /* this walsender's PID, or 0 if not active */
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
bool needreload; /* does currently-open file need to be
* reloaded? */
/*
* The xlog locations that have been written, flushed, and applied by
* standby-side. These may be invalid if the standby-side has not offered
* values yet.
*/
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
/* Measured lag times, or -1 for unknown/none. */
TimeOffset writeLag;
TimeOffset flushLag;
TimeOffset applyLag;
/*
* The priority order of the standby managed by this WALSender, as listed
* in synchronous_standby_names, or 0 if not-listed.
*/
int sync_standby_priority;
/* Protects shared variables shown above. */
slock_t mutex;
/*
* Pointer to the walsender's latch. Used by backends to wake up this
* walsender when it has work to do. NULL if the walsender isn't active.
*/
Latch *latch;
/*
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
} WalSnd;
- WalSndCtlData
typedef struct
{
/*
* Synchronous replication queue with one queue per request type.
* Protected by SyncRepLock.
*/
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
/*
* Current location of the head of the queue. All waiters should have a
* waitLSN that follows this value. Protected by SyncRepLock.
*/
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
/*
* Are any sync standbys defined? Waiting backends can't reload the
* config file safely, so checkpointer updates this value as needed.
* Protected by SyncRepLock.
*/
bool sync_standbys_defined;
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
} WalSndCtlData;
- NodeTag
typedef enum NodeTag {
.....
/*
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
*/
T_IdentifySystemCmd,
T_BaseBackupCmd,
T_CreateReplicationSlotCmd,
T_DropReplicationSlotCmd,
T_ReadReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
......
} NodeTag;
-
XLogReaderState
typedef uint64 XLogRecPtr;
struct XLogReaderState
{
XLogReaderRoutine routine;
XLogRecPtr ReadRecPtr; /* start of last record read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */
}-
XLogReaderRoutine
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
XLogSegNo nextSegNo,
TimeLineID *tli_p);
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
typedef struct XLogReaderRoutine
{
XLogPageReadCB page_read;
WALSegmentOpenCB segment_open;
WALSegmentCloseCB segment_close;
} XLogReaderRoutine;
-
-
XLogRecoveryCtlData
typedef struct XLogRecoveryCtlData
{
bool SharedHotStandbyActive;
bool SharedPromoteIsTriggered;
Latch recoveryWakeupLatch;
/*
* Last record successfully replayed.
*/
XLogRecPtr lastReplayedReadRecPtr; /* start position */
XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */
TimeLineID lastReplayedTLI; /* timeline */
XLogRecPtr replayEndRecPtr;
TimeLineID replayEndTLI;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
TimestampTz currentChunkStartTime;
/* Recovery pause state */
RecoveryPauseState recoveryPauseState;
ConditionVariable recoveryNotPausedCV;
slock_t info_lck; /* locks shared variables shown above */
} XLogRecoveryCtlData;
请求接收
postgresmain会监听socket,并接受对端的请求。
数据发送
数据通过socket接口进行发送,最终数据出口为操作系统提供的socket接口的send函数。
typedef struct
{
void (*comm_reset) (void);
int (*flush) (void);
int (*flush_if_writable) (void);
bool (*is_send_pending) (void);
int (*putmessage) (char msgtype, const char *s, size_t len);
void (*putmessage_noblock) (char msgtype, const char *s, size_t len);
} PQcommMethods;
static const PQcommMethods PqCommSocketMethods = {
socket_comm_reset,
socket_flush,
socket_flush_if_writable,
socket_is_send_pending,
socket_putmessage,
socket_putmessage_noblock
};
其执行流程如下:
建连后
基本流程
建立连接后,walsender会进入一个循环中,循环判断是否需要发送数据,是否需要启动心跳机制。
其流程如下:
其中walsender收包的流程如下:
需要特别注意的是recv分为阻塞IO和非阻塞IO,pg使用的是非阻塞IO,收包不会在这里卡住。
心跳
心跳主要由以下两个时间来控制:
/* Timestamp of last ProcessRepliesIfAny(). */
static TimestampTz last_processing = 0;
/*
* Timestamp of last ProcessRepliesIfAny() that saw a reply from the
* standby. Set to 0 if wal_sender_timeout doesn't need to be active.
*/
static TimestampTz last_reply_timestamp = 0;
其中last_reply_timestamp会在进入循环的时候获取当前时间戳,
last_reply_timestamp = GetCurrentTimestamp();
在每次sender循环中,都会先检查sender有没有收到receiver发过来的报文,此时在收包前会记下当前的时间戳,并赋值给last_processing。
last_processing = GetCurrentTimestamp();
此时会适用recv进行收包,若收到报文类型为'd'和‘c’的报文时,会将是否收到报文的状态量received设置为true。并且若receivede为true,则在收包完成后更新last_reply_timestamp。
/*
* Save the last reply timestamp if we've received at least one reply.
*/
if (received)
{
last_reply_timestamp = last_processing;
waiting_for_ping_response = false;
}
当获取到last_processing和last_reply_timestamp时间后,再结合配置wal_send_timeout即可计算sender是否需要关闭,以及计算sender的keepalive发送的时机。
-
是否关闭sender
通过WalSndCheckTimeOut来检查sender是否超时,其主要判断依据是
timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout);
if (wal_sender_timeout > 0 && last_processing >= timeout)
{
/*
* Since typically expiration of replication timeout means
* communication problem, we don't send the error message to the
* standby.
*/
ereport(COMMERROR,
(errmsg("terminating walsender process due to replication timeout")));
WalSndShutdown();
}用上一次收到回复的时刻,加上wal_sender_timeout,计算得到一个时间戳timeout,然后看last_processing是否已经超过了timeout。或者说从上次收到回复到当前执行的时间差值是否已经超过了wal_sender_timeout。
-
是否需要发送keepalive
通过WalSndKeepaliveIfNecessary来检查是否需要发送keepalive报文。其判断逻辑如下:
/*
* If half of wal_sender_timeout has lapsed without receiving any reply
* from the standby, send a keep-alive message to the standby requesting
* an immediate reply.
*/
ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2);
if (last_processing >= ping_time)
{
WalSndKeepalive(true, InvalidXLogRecPtr);
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
}可以看到与超时关闭不同的是,发送心跳报文的判断时间是wal_sender_timeout的一半。
sender消息处理
walsender收到receiver的消息后,通过消息的第一个字符来处理对应的消息。sender仅处理如下三种类型的报文:
-
x
x表示对端已经关闭了流复制的socket
-
d
d表示的是数据报文。
-
c
c表示的是copydone,表示备机已经完成流式复制,若sender还没有发送数据的话,也需要使用这种类型回复。
sender消息发送
-
心跳报文
心跳报文在WalSndKeepalive中发送,其消息格式如下:
pq_sendbyte(&output_message, 'k');
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);第一个字符是'k',表示心跳报文,然后跟着一个8字节的数据指针和一个8字节的时间戳,最后还有一个字节的是否需要回复标志。
-
数据报文
pq_sendbyte(&output_message, 'w');
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
pq_sendint64(&output_message, 0); /* sendtime, filled in last */
逻辑复制
物理复制
物理复制使用XLogSendPhysical进行数据发送。
先找到未同步的起始指针
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
*tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)
result = receivePtr;