压缩相关修改

This commit is contained in:
王锦龙 2022-11-28 19:27:17 +08:00 committed by w30031892
parent 6ed2bf7f00
commit 1c5654cd23
12 changed files with 45 additions and 38 deletions

View File

@ -3135,7 +3135,7 @@ static void MarkBufferDirty(char *buffer, size_t len)
static int parse_page_file(const char *filename, SegmentType type, const uint32 start_point, const uint32 number_read)
{
if (type != SEG_HEAP && type != SEG_INDEX_BTREE) {
if (!IsCompressedFile(filename, strlen(filename))) {
return parse_uncompressed_page_file(filename, type, start_point, number_read);
}

View File

@ -460,13 +460,13 @@ get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno)
*/
static int32
prepare_page(ConnectionArgs *conn_arg,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BlockNumber blknum, FILE *in,
BackupMode backup_mode,
Page page, bool strict,
uint32 checksum_version,
const char *from_fullpath,
PageState *page_st, PageCompression *pageCompression, int &read_len)
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BlockNumber blknum, FILE *in,
BackupMode backup_mode,
Page page, bool strict,
uint32 checksum_version,
const char *from_fullpath,
PageState *page_st, PageCompression *pageCompression, int &read_len)
{
int try_again = PAGE_READ_ATTEMPTS;
bool page_is_valid = false;
@ -2221,9 +2221,9 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f
PageState page_st;
int read_len = -1;
int rc = prepare_page(conn_arg, file, prev_backup_start_lsn,
blknum, in, backup_mode, curr_page,
true, checksum_version,
from_fullpath, &page_st, pageCompression, read_len);
blknum, in, backup_mode, curr_page,
true, checksum_version,
from_fullpath, &page_st, pageCompression, read_len);
if (rc == PageIsTruncated)
break;

View File

@ -1427,7 +1427,7 @@ static void fio_send_pages_impl(int out, char* buf)
* Optimize stdio buffer usage, fseek only when current position
* does not match the position of requested block.
*/
if (current_pos != (int)(blknum*BLCKSZ))
if (current_pos != (int)(blknum * BLCKSZ))
{
current_pos = blknum*BLCKSZ;
if (fseek(in, current_pos, SEEK_SET) != 0)

View File

@ -501,8 +501,11 @@ Partition PartitionBuildLocalPartition(const char *relname, Oid partid, Oid part
PartitionInitPhysicalAddr(part);
/* compressed option was set by PartitionInitPhysicalAddr if part->rd_options != NULL */
if (part->rd_options == NULL && reloptions) {
(void)MemoryContextSwitchTo(oldcxt);
StdRdOptions* options = (StdRdOptions*)(void *)default_reloptions(reloptions, false, RELOPT_KIND_HEAP);
SetupPageCompressForRelation(&part->pd_node, &options->compress, PartitionGetPartitionName(part));
(void)MemoryContextSwitchTo(LocalMyDBCacheMemCxt());
pfree(options);
}
}

View File

@ -4604,8 +4604,11 @@ Relation RelationBuildLocalRelation(const char* relname, Oid relnamespace, Tuple
/* compressed option was set by RelationInitPhysicalAddr if rel->rd_options != NULL */
if (rel->rd_options == NULL && reloptions && SUPPORT_COMPRESSED(relkind, rel->rd_rel->relam)) {
(void)MemoryContextSwitchTo(oldcxt);
StdRdOptions *options = (StdRdOptions *)(void*)default_reloptions(reloptions, false, RELOPT_KIND_HEAP);
SetupPageCompressForRelation(&rel->rd_node, &options->compress, RelationGetRelationName(rel));
(void)MemoryContextSwitchTo(LocalMyDBCacheMemCxt());
pfree(options);
}

View File

@ -566,6 +566,16 @@ static void WaitForOlderSnapshots(TransactionId limitXmin)
}
}
inline bool get_rel_segment(Relation rel)
{
if (rel == NULL || rel->rd_options == NULL) {
return false;
}
StdRdOptions *opt = (StdRdOptions*)(rel->rd_options);
return opt->segment;
}
/*
* DefineIndex
* Creates a new index.
@ -672,6 +682,7 @@ Oid DefineIndex(Oid relationId, IndexStmt* stmt, Oid indexRelationId, bool is_al
lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
rel = heap_open(relationId, lockmode);
bool segment = get_rel_segment(rel);
TableCreateSupport indexCreateSupport{(int)COMPRESS_TYPE_NONE, false, false, false, false, false, true, false};
ListCell *cell = NULL;
foreach (cell, stmt->options) {
@ -683,9 +694,9 @@ Oid DefineIndex(Oid relationId, IndexStmt* stmt, Oid indexRelationId, bool is_al
/* do not suppport to create compressed index for temp table. */
if ((indexCreateSupport.compressType != (int)COMPRESS_TYPE_NONE) &&
(relPersistence == RELPERSISTENCE_TEMP || relPersistence == RELPERSISTENCE_GLOBAL_TEMP ||
relPersistence == RELPERSISTENCE_UNLOGGED)) {
relPersistence == RELPERSISTENCE_UNLOGGED || segment)) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("compressed index \"%s\" is not supported for temporary table and unlogged table,"
errmsg("compressed index \"%s\" is not supported for temporary table, unlogged table and segment table,"
" please use uncompressed one instead", stmt->idxname)));
}

View File

@ -1613,8 +1613,8 @@ static bool PrimaryRepairSegFile_NonSegment(const RelFileNode &rd_node, RemoteRe
int64 segpathlen = strlen(path) + SEGLEN + strlen(COMPRESS_STR);
char *segpath = (char *)palloc0((Size)segpathlen);
BlockNumber relSegSize = IS_COMPRESSED_RNODE(rd_node, MAIN_FORKNUM) ? CFS_LOGIC_BLOCKS_PER_FILE: RELSEG_SIZE;
uint32 seg_size = (uint32)((seg_no < maxSegno || ((uint32)size % (relSegSize * BLCKSZ)) == 0) ?
(relSegSize * BLCKSZ) : ((uint32)size % (relSegSize * BLCKSZ)));
uint32 seg_size = (uint32)((seg_no < maxSegno || (size % (relSegSize * BLCKSZ)) == 0) ?
(relSegSize * BLCKSZ) : (size % (relSegSize * BLCKSZ)));
if (seg_no == 0) {
rc = sprintf_s(segpath, (uint64)segpathlen, "%s%s", path,
@ -1831,9 +1831,9 @@ bool gsRepairFile(Oid tableOid, char* path, int timeout)
} else {
BlockNumber relSegSize = IS_COMPRESSED_RNODE(relation->rd_node,
MAIN_FORKNUM) ? CFS_LOGIC_BLOCKS_PER_FILE: RELSEG_SIZE;
int32 maxSegno = ((int32)size % ((int64)relSegSize * BLCKSZ)) != 0
? (int32)size / ((int64)relSegSize * BLCKSZ)
: ((int32)size / ((int64)relSegSize * BLCKSZ)) - 1;
int32 maxSegno = (int32)((size % ((int64)relSegSize * BLCKSZ)) != 0
? (size / ((int64)relSegSize * BLCKSZ))
: (size / ((int64)relSegSize * BLCKSZ)) - 1);
for (int32 i = 0; i <= maxSegno; i++) {
bool repair = PrimaryRepairSegFile_NonSegment(relation->rd_node, &repairFileKey, firstPath, i, maxSegno,

View File

@ -2342,7 +2342,7 @@ PUSH_DIRTY:
item->bucketNode = buf_desc->tag.rnode.bucketNode;
item->forkNum = buf_desc->tag.forkNum;
item->blockNum = buf_desc->tag.blockNum;
if (IsSegmentFileNode(buf_desc->tag.rnode)) {
if (IsSegmentFileNode(buf_desc->tag.rnode) || IS_COMPRESSED_RNODE(buf_desc->tag.rnode, buf_desc->tag.forkNum)) {
*contain_hashbucket = true;
}

View File

@ -1854,7 +1854,7 @@ void StartPrepare(GlobalTransaction gxact)
uint32 size = (uint32)(hdr_new.hdr.nabortrels * sizeof(ColFileNode));
if (unlikely((long)(t_thrd.proc->workingVersionNum < PAGE_COMPRESSION_VERSION))) {
/* commitrels will be free in ConvertToOldColFileNode */
registerData = (void *)ConvertToOldColFileNode(abortrels, hdr_new.hdr.ncommitrels);
registerData = (void *)ConvertToOldColFileNode(abortrels, hdr_new.hdr.nabortrels);
size = hdr_new.hdr.nabortrels * sizeof(ColFileNodeRel);
}
save_state_data(registerData, size);

View File

@ -6498,7 +6498,7 @@ void shared_buffer_write_error_callback(void *arg)
if (buf_desc != NULL) {
char *path = relpathperm(((BufferDesc *)buf_desc)->tag.rnode, ((BufferDesc *)buf_desc)->tag.forkNum);
if (buf_desc->tag.rnode.opt) {
(void)errcontext("writing block %u of relation %s_pcd", buf_desc->tag.blockNum, path);
(void)errcontext("writing block %u of relation %s_compress", buf_desc->tag.blockNum, path);
} else {
(void)errcontext("writing block %u of relation %s", buf_desc->tag.blockNum, path);
}

View File

@ -63,12 +63,10 @@ size_t CfsReadCompressedPage(char *dst, size_t destLen, BlockNumber extent_offse
}
decltype(CfsExtentHeader::chunk_size) chunkSize = cfsExtentHeader->chunk_size;
auto extentStart = (cfsReadStruct->extentCount * CFS_EXTENT_SIZE) * BLCKSZ;
uint8 allocatedChunks;
uint8 nchunks;
size_t tryCount = 0;
do {
CfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentHeader, (uint16)extent_offset_blkno);
allocatedChunks = cfsExtentAddress->allocated_chunks;
nchunks = cfsExtentAddress->nchunks;
for (uint8 i = 0; i < nchunks; i++) {
@ -107,15 +105,7 @@ size_t CfsReadCompressedPage(char *dst, size_t destLen, BlockNumber extent_offse
}
} while (true);
if (allocatedChunks > nchunks) {
auto currentWriteSize = nchunks * chunkSize;
errno_t rc = memset_s(dst + (uint32)currentWriteSize,
destLen - (uint32)currentWriteSize,
0,
(uint32)(allocatedChunks - nchunks) * (uint32)chunkSize);
securec_check(rc, "", "");
}
return allocatedChunks * chunkSize;
return nchunks * chunkSize;
}
const size_t CUR_PAGE_SIZE = (uint32)getpagesize();

View File

@ -158,11 +158,11 @@ void Transpose8x16U(uint8 **src, uint8 **dst)
matLine4x2.val[3] = vld1q_u8(src[7]);
vst4q_u8(sptr4x2, matLine4x2);
matLine4x2.val[0] = vld1q_u8(src[4]);
matLine4x2.val[1] = vld1q_u8(src[5]);
matLine4x2.val[2] = vld1q_u8(src[6]);
matLine4x2.val[3] = vld1q_u8(src[7]);
vst4q_u8(sptr4x2, matLine4x2);
uint32x4x2_t dstLine04;
uint32x4x2_t dstLine15;
uint32x4x2_t dstLine26;
uint32x4x2_t dstLine37;
uint8 dstPtr[128];
dstLine04.val[0] = vld1q_u32((uint32_t *)sptr4x1);
dstLine04.val[1] = vld1q_u32((uint32_t *)sptr4x2);