diff --git a/src/backend/cdb/cdbparquetrowgroup.c b/src/backend/cdb/cdbparquetrowgroup.c index b7f506b9ce..d0959119db 100644 --- a/src/backend/cdb/cdbparquetrowgroup.c +++ b/src/backend/cdb/cdbparquetrowgroup.c @@ -215,26 +215,33 @@ ParquetRowGroupReader_ScanNextTuple( int natts = slot->tts_tupleDescriptor->natts; Assert(natts <= tupDesc->natts); - Datum *values = slot_get_values(slot); - bool *nulls = slot_get_isnull(slot); - bool useBloomFilter = false; - int joinKeyCount = 0; - int *joinKeySet = NULL; + List *joinKeyAtts = NIL; + List *nonJoinKeyAtts = NIL; + List *allAtts = NIL; + + /* prepare data structure to sperate join keys from other attributes */ if (rfState != NULL && rfState->hasRuntimeFilter && !rfState->stopRuntimeFilter) { useBloomFilter = true; - joinKeyCount = list_length(rfState->joinkeys); - Assert(joinKeyCount <= natts); - joinKeySet = palloc(sizeof(int) * joinKeyCount); - + /* find out attributes in hash join key */ ListCell *hk; - int i = 0; foreach(hk, rfState->joinkeys) { AttrNumber attrno = (AttrNumber) lfirst(hk); - joinKeySet[i++] = attrno -1; + lappend_int(joinKeyAtts, attrno - 1); + } + } + + /* find out attributes not in hash join keys */ + for (int i = 0; i < natts; i++) + { + lappend_int(allAtts, i); + + if(joinKeyAtts != NIL && list_find_int(joinKeyAtts, i) < 0) + { + lappend_int(nonJoinKeyAtts, i); } } @@ -247,59 +254,32 @@ ParquetRowGroupReader_ScanNextTuple( rowGroupReader->rowRead++; /* - * Step 1: fetch those columns as hash join keys - */ - int colReaderIndex = 0; - for (int i = 0; i < natts; i++) - { - if (projs[i] == false) - { - nulls[i] = true; - continue; - } - - bool isJoinKeyColumn = false; - for (int j = 0; j < joinKeyCount; j++) - { - if (joinKeySet[j] == i) - { - isJoinKeyColumn = true; - break; - } - } - - if (isJoinKeyColumn) - { - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; - - ParquetRowGroupReader_ScanOneAttribute( - rowGroupReader, hawqAttrToParquetColNum[i], - nextReader, &values[i], &nulls[i], hawqTypeID); - } - - colReaderIndex += hawqAttrToParquetColNum[i]; - } - - /* - * Step 2: skip following columns decoding if bloomfilter is mismatched + * In case using BloomFilter, we first fetch those columns in hash join keys, + * then check whether their hash values contained by bloomfilter. If negative, + * we skip following columns reading and decoding to speed up. */ if (useBloomFilter) { + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, joinKeyAtts); + + Datum *values = slot_get_values(slot); uint32_t hashkey = 0; - for (int i = 0; i < joinKeyCount; i++) + + ListCell *hk; + int i = 0; + foreach(hk, joinKeyAtts) { Datum keyval; uint32 hkey; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - keyval = values[joinKeySet[i]]; + keyval = values[lfirst_int(hk)]; /* Evaluate expression */ hkey = DatumGetUInt32( - FunctionCall1(&rfState->hashfunctions[i], keyval)); + FunctionCall1(&rfState->hashfunctions[i++], keyval)); hashkey ^= hkey; } @@ -307,43 +287,14 @@ ParquetRowGroupReader_ScanNextTuple( { continue; } - } - /* - * Step 3: fetch those columns not in hash join keys - */ - colReaderIndex = 0; - for (int i = 0; i < natts; i++) + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, nonJoinKeyAtts); + } + else { - // it is not expensive to do twice - if (projs[i] == false) - { - nulls[i] = true; - continue; - } - - bool isJoinKeyColumn = false; - for (int j = 0; j < joinKeyCount; j++) - { - if (joinKeySet[j] == i) - { - isJoinKeyColumn = true; - break; - } - } - - if (!isJoinKeyColumn) - { - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; - - ParquetRowGroupReader_ScanOneAttribute( - rowGroupReader, hawqAttrToParquetColNum[i], - nextReader, &values[i], &nulls[i], hawqTypeID); - } - - colReaderIndex += hawqAttrToParquetColNum[i]; + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, allAtts); } /*construct tuple, and return back*/ @@ -356,59 +307,87 @@ ParquetRowGroupReader_ScanNextTuple( } /* - * Get one attribute of a tuple from current row group into slot. - * - * Similar to ParquetColumnReader_readValue() but consider more hawq types. + * Get specified attributes of a tuple from current row group into slot. */ void -ParquetRowGroupReader_ScanOneAttribute( - ParquetRowGroupReader *rowGroupReader, - int colChildNum, // hawqAttrToParquetColNum - ParquetColumnReader *columnReader, - Datum *value, - bool *null, - int hawqTypeID) +ParquetRowGroupReader_ScanNextTupleColumns( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot, + List *attsList) { - if (colChildNum == 1) - { - ParquetColumnReader_readValue(columnReader, value, null, hawqTypeID); - } - else + int natts = slot->tts_tupleDescriptor->natts; + Assert(natts <= tupDesc->natts); + + Datum *values = slot_get_values(slot); + bool *nulls = slot_get_isnull(slot); + + int colReaderIndex = 0; + for(int i = 0; i < natts; i++) { - /* - * Because there are some memory reused inside the whole column reader, so need - * to switch the context from PerTupleContext to rowgroup->context - */ - MemoryContext oldContext = MemoryContextSwitchTo( - rowGroupReader->memoryContext); + /* it is not expensive to do twice in case of bloomfilter */ + if(projs[i] == false) + { + nulls[i] = true; + continue; + } - switch (hawqTypeID) { - case HAWQ_TYPE_POINT: - ParquetColumnReader_readPoint(columnReader, value, null); - break; - case HAWQ_TYPE_PATH: - ParquetColumnReader_readPATH(columnReader, value, null); - break; - case HAWQ_TYPE_LSEG: - ParquetColumnReader_readLSEG(columnReader, value, null); - break; - case HAWQ_TYPE_BOX: - ParquetColumnReader_readBOX(columnReader, value, null); - break; - case HAWQ_TYPE_CIRCLE: - ParquetColumnReader_readCIRCLE(columnReader, value, null); - break; - case HAWQ_TYPE_POLYGON: - ParquetColumnReader_readPOLYGON(columnReader, value, null); - break; - default: - /* TODO array type */ - /* TODO UDT */ - Insist(false); - break; + /* skip those attributes not in given list */ + if (attsList != NIL && list_find_int(attsList, i) < 0) + { + colReaderIndex += hawqAttrToParquetColNum[i]; + continue; + } + + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + if (hawqAttrToParquetColNum[i] == 1) + { + ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], hawqTypeID); + } + else + { + /* + * Because there are some memory reused inside the whole column reader, so need + * to switch the context from PerTupleContext to rowgroup->context + */ + MemoryContext oldContext = MemoryContextSwitchTo( + rowGroupReader->memoryContext); + + switch (hawqTypeID) { + case HAWQ_TYPE_POINT: + ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_PATH: + ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_LSEG: + ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_BOX: + ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_CIRCLE: + ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_POLYGON: + ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]); + break; + default: + /* TODO array type */ + /* TODO UDT */ + Insist(false); + break; + } + + MemoryContextSwitchTo(oldContext); } - MemoryContextSwitchTo(oldContext); + colReaderIndex += hawqAttrToParquetColNum[i]; } } diff --git a/src/include/cdb/cdbparquetrowgroup.h b/src/include/cdb/cdbparquetrowgroup.h index f245880a57..1d94bc1484 100644 --- a/src/include/cdb/cdbparquetrowgroup.h +++ b/src/include/cdb/cdbparquetrowgroup.h @@ -75,15 +75,15 @@ ParquetRowGroupReader_ScanNextTuple( RuntimeFilterState *rfState, TupleTableSlot *slot); -/* Get one attribute of a tuple from current row group*/ +/* Get specified attributes of a tuple into slot*/ void -ParquetRowGroupReader_ScanOneAttribute( - ParquetRowGroupReader *rowGroupReader, - int colChildNum, - ParquetColumnReader *columnReader, - Datum *value, - bool *null, - int hawqTypeID); +ParquetRowGroupReader_ScanNextTupleColumns( + TupleDesc pqs_tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot, + List *attsList); /* Finish scanning current row group*/ void