From 2320cf08eadb52921427e07ed678abf787d41385 Mon Sep 17 00:00:00 2001 From: Kuien Liu Date: Thu, 20 Sep 2018 17:15:27 +0800 Subject: [PATCH 1/2] HAWQ-1660. Optimize parquet scan when bloom filter enabled. --- src/backend/cdb/cdbparquetrowgroup.c | 200 +++++++++++++++++++-------- src/include/cdb/cdbparquetrowgroup.h | 10 ++ 2 files changed, 155 insertions(+), 55 deletions(-) diff --git a/src/backend/cdb/cdbparquetrowgroup.c b/src/backend/cdb/cdbparquetrowgroup.c index 743815d831..b7f506b9ce 100644 --- a/src/backend/cdb/cdbparquetrowgroup.c +++ b/src/backend/cdb/cdbparquetrowgroup.c @@ -211,6 +211,33 @@ ParquetRowGroupReader_ScanNextTuple( TupleTableSlot *slot) { Assert(slot); + + int natts = slot->tts_tupleDescriptor->natts; + Assert(natts <= tupDesc->natts); + + Datum *values = slot_get_values(slot); + bool *nulls = slot_get_isnull(slot); + + bool useBloomFilter = false; + int joinKeyCount = 0; + int *joinKeySet = NULL; + if (rfState != NULL && rfState->hasRuntimeFilter && !rfState->stopRuntimeFilter) + { + useBloomFilter = true; + + joinKeyCount = list_length(rfState->joinkeys); + Assert(joinKeyCount <= natts); + joinKeySet = palloc(sizeof(int) * joinKeyCount); + + ListCell *hk; + int i = 0; + foreach(hk, rfState->joinkeys) + { + AttrNumber attrno = (AttrNumber) lfirst(hk); + joinKeySet[i++] = attrno -1; + } + } + while (rowGroupReader->rowRead < rowGroupReader->rowCount) { @@ -219,12 +246,9 @@ ParquetRowGroupReader_ScanNextTuple( */ rowGroupReader->rowRead++; - int natts = slot->tts_tupleDescriptor->natts; - Assert(natts <= tupDesc->natts); - - Datum *values = slot_get_values(slot); - bool *nulls = slot_get_isnull(slot); - + /* + * Step 1: fetch those columns as hash join keys + */ int colReaderIndex = 0; for (int i = 0; i < natts; i++) { @@ -233,78 +257,50 @@ ParquetRowGroupReader_ScanNextTuple( nulls[i] = true; continue; } - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; - if (hawqAttrToParquetColNum[i] == 1) + bool isJoinKeyColumn = false; + for (int j = 0; j < joinKeyCount; j++) { - ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], - hawqTypeID); - } - else - { - /* - * Because there are some memory reused inside the whole column reader, so need - * to switch the context from PerTupleContext to rowgroup->context - */ - MemoryContext oldContext = MemoryContextSwitchTo( - rowGroupReader->memoryContext); - - switch (hawqTypeID) { - case HAWQ_TYPE_POINT: - ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_PATH: - ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_LSEG: - ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_BOX: - ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_CIRCLE: - ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_POLYGON: - ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]); - break; - default: - /* TODO array type */ - /* TODO UDT */ - Insist(false); + if (joinKeySet[j] == i) + { + isJoinKeyColumn = true; break; } + } - MemoryContextSwitchTo(oldContext); + if (isJoinKeyColumn) + { + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + ParquetRowGroupReader_ScanOneAttribute( + rowGroupReader, hawqAttrToParquetColNum[i], + nextReader, &values[i], &nulls[i], hawqTypeID); } colReaderIndex += hawqAttrToParquetColNum[i]; } - if (rfState != NULL && rfState->hasRuntimeFilter - && !rfState->stopRuntimeFilter) + /* + * Step 2: skip following columns decoding if bloomfilter is mismatched + */ + if (useBloomFilter) { - Assert(rfState->bloomfilter != NULL); uint32_t hashkey = 0; - ListCell *hk; - int i = 0; - foreach(hk, rfState->joinkeys) + for (int i = 0; i < joinKeyCount; i++) { - AttrNumber attrno = (AttrNumber) lfirst(hk); Datum keyval; uint32 hkey; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - keyval = values[attrno - 1]; + keyval = values[joinKeySet[i]]; /* Evaluate expression */ hkey = DatumGetUInt32( FunctionCall1(&rfState->hashfunctions[i], keyval)); hashkey ^= hkey; - i++; } if (!FindBloomFilter(rfState->bloomfilter, hashkey)) @@ -313,6 +309,43 @@ ParquetRowGroupReader_ScanNextTuple( } } + /* + * Step 3: fetch those columns not in hash join keys + */ + colReaderIndex = 0; + for (int i = 0; i < natts; i++) + { + // it is not expensive to do twice + if (projs[i] == false) + { + nulls[i] = true; + continue; + } + + bool isJoinKeyColumn = false; + for (int j = 0; j < joinKeyCount; j++) + { + if (joinKeySet[j] == i) + { + isJoinKeyColumn = true; + break; + } + } + + if (!isJoinKeyColumn) + { + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + ParquetRowGroupReader_ScanOneAttribute( + rowGroupReader, hawqAttrToParquetColNum[i], + nextReader, &values[i], &nulls[i], hawqTypeID); + } + + colReaderIndex += hawqAttrToParquetColNum[i]; + } + /*construct tuple, and return back*/ TupSetVirtualTupleNValid(slot, natts); return true; @@ -322,6 +355,63 @@ ParquetRowGroupReader_ScanNextTuple( return false; } +/* + * Get one attribute of a tuple from current row group into slot. + * + * Similar to ParquetColumnReader_readValue() but consider more hawq types. + */ +void +ParquetRowGroupReader_ScanOneAttribute( + ParquetRowGroupReader *rowGroupReader, + int colChildNum, // hawqAttrToParquetColNum + ParquetColumnReader *columnReader, + Datum *value, + bool *null, + int hawqTypeID) +{ + if (colChildNum == 1) + { + ParquetColumnReader_readValue(columnReader, value, null, hawqTypeID); + } + else + { + /* + * Because there are some memory reused inside the whole column reader, so need + * to switch the context from PerTupleContext to rowgroup->context + */ + MemoryContext oldContext = MemoryContextSwitchTo( + rowGroupReader->memoryContext); + + switch (hawqTypeID) { + case HAWQ_TYPE_POINT: + ParquetColumnReader_readPoint(columnReader, value, null); + break; + case HAWQ_TYPE_PATH: + ParquetColumnReader_readPATH(columnReader, value, null); + break; + case HAWQ_TYPE_LSEG: + ParquetColumnReader_readLSEG(columnReader, value, null); + break; + case HAWQ_TYPE_BOX: + ParquetColumnReader_readBOX(columnReader, value, null); + break; + case HAWQ_TYPE_CIRCLE: + ParquetColumnReader_readCIRCLE(columnReader, value, null); + break; + case HAWQ_TYPE_POLYGON: + ParquetColumnReader_readPOLYGON(columnReader, value, null); + break; + default: + /* TODO array type */ + /* TODO UDT */ + Insist(false); + break; + } + + MemoryContextSwitchTo(oldContext); + } +} + /** * finish scanning row group, but keeping the structure palloced */ diff --git a/src/include/cdb/cdbparquetrowgroup.h b/src/include/cdb/cdbparquetrowgroup.h index 4f5ab7a4b7..f245880a57 100644 --- a/src/include/cdb/cdbparquetrowgroup.h +++ b/src/include/cdb/cdbparquetrowgroup.h @@ -75,6 +75,16 @@ ParquetRowGroupReader_ScanNextTuple( RuntimeFilterState *rfState, TupleTableSlot *slot); +/* Get one attribute of a tuple from current row group*/ +void +ParquetRowGroupReader_ScanOneAttribute( + ParquetRowGroupReader *rowGroupReader, + int colChildNum, + ParquetColumnReader *columnReader, + Datum *value, + bool *null, + int hawqTypeID); + /* Finish scanning current row group*/ void ParquetRowGroupReader_FinishedScanRowGroup( From 0eaf8508b8d1209e41f4e740f4bc1e7535135dd1 Mon Sep 17 00:00:00 2001 From: Kuien Liu Date: Fri, 21 Sep 2018 16:31:12 +0800 Subject: [PATCH 2/2] HAWQ-1660. refactor according to reviews --- src/backend/cdb/cdbparquetrowgroup.c | 243 ++++++++++++--------------- src/include/cdb/cdbparquetrowgroup.h | 16 +- 2 files changed, 119 insertions(+), 140 deletions(-) diff --git a/src/backend/cdb/cdbparquetrowgroup.c b/src/backend/cdb/cdbparquetrowgroup.c index b7f506b9ce..d0959119db 100644 --- a/src/backend/cdb/cdbparquetrowgroup.c +++ b/src/backend/cdb/cdbparquetrowgroup.c @@ -215,26 +215,33 @@ ParquetRowGroupReader_ScanNextTuple( int natts = slot->tts_tupleDescriptor->natts; Assert(natts <= tupDesc->natts); - Datum *values = slot_get_values(slot); - bool *nulls = slot_get_isnull(slot); - bool useBloomFilter = false; - int joinKeyCount = 0; - int *joinKeySet = NULL; + List *joinKeyAtts = NIL; + List *nonJoinKeyAtts = NIL; + List *allAtts = NIL; + + /* prepare data structure to sperate join keys from other attributes */ if (rfState != NULL && rfState->hasRuntimeFilter && !rfState->stopRuntimeFilter) { useBloomFilter = true; - joinKeyCount = list_length(rfState->joinkeys); - Assert(joinKeyCount <= natts); - joinKeySet = palloc(sizeof(int) * joinKeyCount); - + /* find out attributes in hash join key */ ListCell *hk; - int i = 0; foreach(hk, rfState->joinkeys) { AttrNumber attrno = (AttrNumber) lfirst(hk); - joinKeySet[i++] = attrno -1; + lappend_int(joinKeyAtts, attrno - 1); + } + } + + /* find out attributes not in hash join keys */ + for (int i = 0; i < natts; i++) + { + lappend_int(allAtts, i); + + if(joinKeyAtts != NIL && list_find_int(joinKeyAtts, i) < 0) + { + lappend_int(nonJoinKeyAtts, i); } } @@ -247,59 +254,32 @@ ParquetRowGroupReader_ScanNextTuple( rowGroupReader->rowRead++; /* - * Step 1: fetch those columns as hash join keys - */ - int colReaderIndex = 0; - for (int i = 0; i < natts; i++) - { - if (projs[i] == false) - { - nulls[i] = true; - continue; - } - - bool isJoinKeyColumn = false; - for (int j = 0; j < joinKeyCount; j++) - { - if (joinKeySet[j] == i) - { - isJoinKeyColumn = true; - break; - } - } - - if (isJoinKeyColumn) - { - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; - - ParquetRowGroupReader_ScanOneAttribute( - rowGroupReader, hawqAttrToParquetColNum[i], - nextReader, &values[i], &nulls[i], hawqTypeID); - } - - colReaderIndex += hawqAttrToParquetColNum[i]; - } - - /* - * Step 2: skip following columns decoding if bloomfilter is mismatched + * In case using BloomFilter, we first fetch those columns in hash join keys, + * then check whether their hash values contained by bloomfilter. If negative, + * we skip following columns reading and decoding to speed up. */ if (useBloomFilter) { + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, joinKeyAtts); + + Datum *values = slot_get_values(slot); uint32_t hashkey = 0; - for (int i = 0; i < joinKeyCount; i++) + + ListCell *hk; + int i = 0; + foreach(hk, joinKeyAtts) { Datum keyval; uint32 hkey; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - keyval = values[joinKeySet[i]]; + keyval = values[lfirst_int(hk)]; /* Evaluate expression */ hkey = DatumGetUInt32( - FunctionCall1(&rfState->hashfunctions[i], keyval)); + FunctionCall1(&rfState->hashfunctions[i++], keyval)); hashkey ^= hkey; } @@ -307,43 +287,14 @@ ParquetRowGroupReader_ScanNextTuple( { continue; } - } - /* - * Step 3: fetch those columns not in hash join keys - */ - colReaderIndex = 0; - for (int i = 0; i < natts; i++) + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, nonJoinKeyAtts); + } + else { - // it is not expensive to do twice - if (projs[i] == false) - { - nulls[i] = true; - continue; - } - - bool isJoinKeyColumn = false; - for (int j = 0; j < joinKeyCount; j++) - { - if (joinKeySet[j] == i) - { - isJoinKeyColumn = true; - break; - } - } - - if (!isJoinKeyColumn) - { - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; - - ParquetRowGroupReader_ScanOneAttribute( - rowGroupReader, hawqAttrToParquetColNum[i], - nextReader, &values[i], &nulls[i], hawqTypeID); - } - - colReaderIndex += hawqAttrToParquetColNum[i]; + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, allAtts); } /*construct tuple, and return back*/ @@ -356,59 +307,87 @@ ParquetRowGroupReader_ScanNextTuple( } /* - * Get one attribute of a tuple from current row group into slot. - * - * Similar to ParquetColumnReader_readValue() but consider more hawq types. + * Get specified attributes of a tuple from current row group into slot. */ void -ParquetRowGroupReader_ScanOneAttribute( - ParquetRowGroupReader *rowGroupReader, - int colChildNum, // hawqAttrToParquetColNum - ParquetColumnReader *columnReader, - Datum *value, - bool *null, - int hawqTypeID) +ParquetRowGroupReader_ScanNextTupleColumns( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot, + List *attsList) { - if (colChildNum == 1) - { - ParquetColumnReader_readValue(columnReader, value, null, hawqTypeID); - } - else + int natts = slot->tts_tupleDescriptor->natts; + Assert(natts <= tupDesc->natts); + + Datum *values = slot_get_values(slot); + bool *nulls = slot_get_isnull(slot); + + int colReaderIndex = 0; + for(int i = 0; i < natts; i++) { - /* - * Because there are some memory reused inside the whole column reader, so need - * to switch the context from PerTupleContext to rowgroup->context - */ - MemoryContext oldContext = MemoryContextSwitchTo( - rowGroupReader->memoryContext); + /* it is not expensive to do twice in case of bloomfilter */ + if(projs[i] == false) + { + nulls[i] = true; + continue; + } - switch (hawqTypeID) { - case HAWQ_TYPE_POINT: - ParquetColumnReader_readPoint(columnReader, value, null); - break; - case HAWQ_TYPE_PATH: - ParquetColumnReader_readPATH(columnReader, value, null); - break; - case HAWQ_TYPE_LSEG: - ParquetColumnReader_readLSEG(columnReader, value, null); - break; - case HAWQ_TYPE_BOX: - ParquetColumnReader_readBOX(columnReader, value, null); - break; - case HAWQ_TYPE_CIRCLE: - ParquetColumnReader_readCIRCLE(columnReader, value, null); - break; - case HAWQ_TYPE_POLYGON: - ParquetColumnReader_readPOLYGON(columnReader, value, null); - break; - default: - /* TODO array type */ - /* TODO UDT */ - Insist(false); - break; + /* skip those attributes not in given list */ + if (attsList != NIL && list_find_int(attsList, i) < 0) + { + colReaderIndex += hawqAttrToParquetColNum[i]; + continue; + } + + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + if (hawqAttrToParquetColNum[i] == 1) + { + ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], hawqTypeID); + } + else + { + /* + * Because there are some memory reused inside the whole column reader, so need + * to switch the context from PerTupleContext to rowgroup->context + */ + MemoryContext oldContext = MemoryContextSwitchTo( + rowGroupReader->memoryContext); + + switch (hawqTypeID) { + case HAWQ_TYPE_POINT: + ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_PATH: + ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_LSEG: + ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_BOX: + ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_CIRCLE: + ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_POLYGON: + ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]); + break; + default: + /* TODO array type */ + /* TODO UDT */ + Insist(false); + break; + } + + MemoryContextSwitchTo(oldContext); } - MemoryContextSwitchTo(oldContext); + colReaderIndex += hawqAttrToParquetColNum[i]; } } diff --git a/src/include/cdb/cdbparquetrowgroup.h b/src/include/cdb/cdbparquetrowgroup.h index f245880a57..1d94bc1484 100644 --- a/src/include/cdb/cdbparquetrowgroup.h +++ b/src/include/cdb/cdbparquetrowgroup.h @@ -75,15 +75,15 @@ ParquetRowGroupReader_ScanNextTuple( RuntimeFilterState *rfState, TupleTableSlot *slot); -/* Get one attribute of a tuple from current row group*/ +/* Get specified attributes of a tuple into slot*/ void -ParquetRowGroupReader_ScanOneAttribute( - ParquetRowGroupReader *rowGroupReader, - int colChildNum, - ParquetColumnReader *columnReader, - Datum *value, - bool *null, - int hawqTypeID); +ParquetRowGroupReader_ScanNextTupleColumns( + TupleDesc pqs_tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot, + List *attsList); /* Finish scanning current row group*/ void