PostgreSQL数据库WAL——日志合成XLogRecordAssemble

PostgreSQL数据库WAL——日志合成XLogRecordAssemble,第1张

PostgreSQL数据库WAL——日志合成XLogRecordAssemble

XLogRecordAssemble从注册的数据和缓冲区组成的XLogRecData链中合成WAL记录,后续可以调用XLogInsertRecord写入日志。Assemble a WAL record from the registered data and buffers into an XLogRecData chain, ready for insertion with XLogInsertRecord(). 除了xl_prev,其他记录头域都被填入。当前数据的CRC没有放入记录头部。The record header fields are filled in, except for the xl_prev field. The calculated CRC does not include the record header yet. 如果存在注册的buffer和full-page image,*fpw_lsn被设置为这些页的最低LSN。If there are any registered buffers, and a full-page image was not taken of all of them, *fpw_lsn is set to the lowest LSN among such pages. This signals that the assembled record is only good for insertion on the assumption that the RedoRecPtr and doPageWrites values were up-to-date.

处理日志数据链头部

hdr_rdt用于在构造记录时保存记录头部,hdr_scratch会在初始化时使用palloc分配好空间。

static XLogRecData hdr_rdt;
typedef struct XLogRecData {
	struct XLogRecData *next;	
	char	   *data;			
	uint32		len;			
} XLogRecData;
static char *hdr_scratch = NULL;

static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn) {
	XLogRecData *rdt;
	uint32		total_len = 0;
	int			block_id;
	pg_crc32c	rdata_crc;
	registered_buffer *prev_regbuf = NULL;
	XLogRecData *rdt_datas_last;  // 尾指针
	XLogRecord *rechdr;
	char	   *scratch = hdr_scratch; // 记录日志头部的临时缓冲地址

	
    // 跳过日志头部的长度
	
	rechdr = (XLogRecord *) scratch;
	scratch += SizeOfXLogRecord;
	// 初始化头部的XLogRecordData数据,rdt_datas_last指向日志数据链尾部
    // hdr_rdt指向日志数据链的头部
	hdr_rdt.next = NULL;
	rdt_datas_last = &hdr_rdt;
	hdr_rdt.data = hdr_scratch;
压缩页面



	
	if (wal_consistency_checking[rmid])
		info |= XLR_CHECK_CONSISTENCY;

	
	*fpw_lsn = InvalidXLogRecPtr;
	for (block_id = 0; block_id < max_registered_block_id; block_id++)
	{
		registered_buffer *regbuf = ®istered_buffers[block_id];
		bool		needs_backup;
		bool		needs_data;
		XLogRecordBlockHeader bkpb;
		XLogRecordBlockImageHeader bimg;
		XLogRecordBlockCompressHeader cbimg = {0};
		bool		samerel;
		bool		is_compressed = false;
		bool		include_image;

		if (!regbuf->in_use)
			continue;

		
		if (regbuf->flags & REGBUF_FORCE_IMAGE)
			needs_backup = true;
		else if (regbuf->flags & REGBUF_NO_IMAGE)
			needs_backup = false;
		else if (!doPageWrites)
			needs_backup = false;
		else
		{
			
			XLogRecPtr	page_lsn = PageGetLSN(regbuf->page);

			needs_backup = (page_lsn <= RedoRecPtr);
			if (!needs_backup)
			{
				if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
					*fpw_lsn = page_lsn;
			}
		}

		
		if (regbuf->rdata_len == 0)
			needs_data = false;
		else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
			needs_data = true;
		else
			needs_data = !needs_backup;

		bkpb.id = block_id;
		bkpb.fork_flags = regbuf->forkno;
		bkpb.data_length = 0;

		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;

		
		include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;

		if (include_image)
		{
			Page		page = regbuf->page;
			uint16		compressed_len = 0;

			
			if (regbuf->flags & REGBUF_STANDARD)
			{
				
				uint16		lower = ((PageHeader) page)->pd_lower;
				uint16		upper = ((PageHeader) page)->pd_upper;

				if (lower >= SizeOfPageHeaderData &&
					upper > lower &&
					upper <= BLCKSZ)
				{
					bimg.hole_offset = lower;
					cbimg.hole_length = upper - lower;
				}
				else
				{
					
					bimg.hole_offset = 0;
					cbimg.hole_length = 0;
				}
			}
			else
			{
				
				bimg.hole_offset = 0;
				cbimg.hole_length = 0;
			}

			
			if (wal_compression)
			{
				is_compressed =
					XLogCompressBackupBlock(page, bimg.hole_offset,
											cbimg.hole_length,
											regbuf->compressed_page,
											&compressed_len);
			}

			
			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;

			
			rdt_datas_last->next = ®buf->bkp_rdatas[0];
			rdt_datas_last = rdt_datas_last->next;

			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;

			
			if (needs_backup)
				bimg.bimg_info |= BKPIMAGE_APPLY;

			if (is_compressed)
			{
				bimg.length = compressed_len;
				bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;

				rdt_datas_last->data = regbuf->compressed_page;
				rdt_datas_last->len = compressed_len;
			}
			else
			{
				bimg.length = BLCKSZ - cbimg.hole_length;

				if (cbimg.hole_length == 0)
				{
					rdt_datas_last->data = page;
					rdt_datas_last->len = BLCKSZ;
				}
				else
				{
					
					rdt_datas_last->data = page;
					rdt_datas_last->len = bimg.hole_offset;

					rdt_datas_last->next = ®buf->bkp_rdatas[1];
					rdt_datas_last = rdt_datas_last->next;

					rdt_datas_last->data =
						page + (bimg.hole_offset + cbimg.hole_length);
					rdt_datas_last->len =
						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
				}
			}

			total_len += bimg.length;
		}

		if (needs_data)
		{
			
			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
			bkpb.data_length = regbuf->rdata_len;
			total_len += regbuf->rdata_len;

			rdt_datas_last->next = regbuf->rdata_head;
			rdt_datas_last = regbuf->rdata_tail;
		}

		if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
		{
			samerel = true;
			bkpb.fork_flags |= BKPBLOCK_SAME_REL;
		}
		else
			samerel = false;
		prev_regbuf = regbuf;

		
		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
		scratch += SizeOfXLogRecordBlockHeader;
		if (include_image)
		{
			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
			scratch += SizeOfXLogRecordBlockImageHeader;
			if (cbimg.hole_length != 0 && is_compressed)
			{
				memcpy(scratch, &cbimg,
					   SizeOfXLogRecordBlockCompressHeader);
				scratch += SizeOfXLogRecordBlockCompressHeader;
			}
		}
		if (!samerel)
		{
			memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
			scratch += sizeof(RelFileNode);
		}
		memcpy(scratch, ®buf->block, sizeof(BlockNumber));
		scratch += sizeof(BlockNumber);
	}

	
	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
		replorigin_session_origin != InvalidRepOriginId)
	{
		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
		scratch += sizeof(replorigin_session_origin);
	}

	
	if (mainrdata_len > 0)
	{
		if (mainrdata_len > 255)
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
			memcpy(scratch, &mainrdata_len, sizeof(uint32));
			scratch += sizeof(uint32);
		}
		else
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
			*(scratch++) = (uint8) mainrdata_len;
		}
		rdt_datas_last->next = mainrdata_head;
		rdt_datas_last = mainrdata_last;
		total_len += mainrdata_len;
	}
	rdt_datas_last->next = NULL;

	hdr_rdt.len = (scratch - hdr_scratch);
	total_len += hdr_rdt.len;

	
	INIT_CRC32C(rdata_crc);
	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);

	
	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
	rechdr->xl_tot_len = total_len;
	rechdr->xl_info = info;
	rechdr->xl_rmid = rmid;
	rechdr->xl_prev = InvalidXLogRecPtr;
	rechdr->xl_crc = rdata_crc;

	return &hdr_rdt;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/4667014.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存