Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading bloom filters from Parquet files and filter row groups using them #17289

Merged
Merged
Show file tree
Hide file tree
Changes from 104 commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
95fe8e8
Initial stuff for reading bloom filter from PQ files
mhaseeb123 Nov 9, 2024
4f0e7ab
Minor bug fix
mhaseeb123 Nov 9, 2024
48a50c4
Apply style fix
mhaseeb123 Nov 9, 2024
9a85d08
Merge branch 'branch-24.12' into fea/extract-pq-bloom-filter-data
mhaseeb123 Nov 14, 2024
b71cf9b
Merge branch 'branch-24.12' into fea/extract-pq-bloom-filter-data
mhaseeb123 Nov 15, 2024
68be24f
Some updates
mhaseeb123 Nov 16, 2024
f848251
Move contents to a separate file
mhaseeb123 Nov 16, 2024
0b65233
Revert erroneous changes
mhaseeb123 Nov 16, 2024
cf7d762
Style and doc fix
mhaseeb123 Nov 16, 2024
81efad2
Get equality predicate col indices
mhaseeb123 Nov 19, 2024
088377b
Enable `arrow_filter_policy` and `span` types in bloom filter.
mhaseeb123 Nov 20, 2024
0435bff
Merge branch 'branch-24.12' into fea/extract-pq-bloom-filter-data
mhaseeb123 Nov 20, 2024
3dff590
Successfully search bloom filter
mhaseeb123 Nov 21, 2024
71e1d33
style fix
mhaseeb123 Nov 21, 2024
aa65a2b
Code cleanup
mhaseeb123 Nov 22, 2024
c52821b
add tests
mhaseeb123 Nov 25, 2024
3a20a98
Initial stuff for reading bloom filter from PQ files
mhaseeb123 Nov 9, 2024
d67e4b5
Minor bug fix
mhaseeb123 Nov 9, 2024
10471d4
Apply style fix
mhaseeb123 Nov 9, 2024
1e12662
Some updates
mhaseeb123 Nov 16, 2024
ee7217c
Move contents to a separate file
mhaseeb123 Nov 16, 2024
f8e6159
Revert erroneous changes
mhaseeb123 Nov 16, 2024
1886cab
Style and doc fix
mhaseeb123 Nov 16, 2024
be228b3
Get equality predicate col indices
mhaseeb123 Nov 19, 2024
aaf355e
Enable `arrow_filter_policy` and `span` types in bloom filter.
mhaseeb123 Nov 20, 2024
e92324e
Successfully search bloom filter
mhaseeb123 Nov 21, 2024
0b1719d
style fix
mhaseeb123 Nov 21, 2024
ef3a262
Code cleanup
mhaseeb123 Nov 22, 2024
051be2d
add tests
mhaseeb123 Nov 25, 2024
a12c90e
Merge branch 'fea/extract-pq-bloom-filter-data' of https://github.com…
mhaseeb123 Nov 25, 2024
fb55c3f
Major cleanups
mhaseeb123 Nov 26, 2024
b477d2d
Significant code refactoring
mhaseeb123 Nov 26, 2024
f9f1746
minor style fix
mhaseeb123 Nov 26, 2024
bad484f
refactoring
mhaseeb123 Nov 26, 2024
ce09d43
Minor refactoring
mhaseeb123 Nov 26, 2024
dddee6c
Minor improvements
mhaseeb123 Nov 26, 2024
0cfeb80
Add gtest
mhaseeb123 Nov 26, 2024
9137585
Improvements
mhaseeb123 Nov 26, 2024
77152b4
Support int96 in bloom filter
mhaseeb123 Nov 27, 2024
3984291
Cleanup
mhaseeb123 Nov 27, 2024
9a39aa4
Minor improvements
mhaseeb123 Nov 27, 2024
1def801
Fix minor bug
mhaseeb123 Nov 27, 2024
6edc248
MInor bug fixing
mhaseeb123 Nov 28, 2024
2925f1e
Add python tests
mhaseeb123 Nov 28, 2024
efc6ec0
Correct parquet files
mhaseeb123 Nov 28, 2024
df84aca
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Nov 28, 2024
a2fa784
minor spelling fix
mhaseeb123 Dec 2, 2024
1f5da37
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 2, 2024
fa0cec8
Apply suggestions from code review
mhaseeb123 Dec 2, 2024
7a309c6
Minor bug fix
mhaseeb123 Dec 2, 2024
bcc68c0
Convert to enum class
mhaseeb123 Dec 2, 2024
2dce9b1
Apply suggestion from code review
mhaseeb123 Dec 3, 2024
e03bea0
Suggestions from code reviews
mhaseeb123 Dec 3, 2024
059a9d8
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 3, 2024
4b0b5ed
Apply suggestions from code reviews
mhaseeb123 Dec 4, 2024
c1256b1
Refactor into single table for cudf::compute_column
mhaseeb123 Dec 4, 2024
88bf491
Minor, add const
mhaseeb123 Dec 4, 2024
9ca42c6
Move bloom filter test to parquet test
mhaseeb123 Dec 4, 2024
84c24c1
Minor updates
mhaseeb123 Dec 4, 2024
0c05031
Minor
mhaseeb123 Dec 4, 2024
09560c5
Logical and between bloom filter and stats
mhaseeb123 Dec 4, 2024
21f4412
Revert merging converted AST tables.
mhaseeb123 Dec 4, 2024
442de80
Revert an extra eol
mhaseeb123 Dec 4, 2024
f7952d4
Revert extra eol
mhaseeb123 Dec 4, 2024
4d0c570
Read bloom filter data sync
mhaseeb123 Dec 4, 2024
67c6247
Update cpp/src/io/parquet/bloom_filter_reader.cu
mhaseeb123 Dec 4, 2024
40c80b7
strong type for int96 timestamp
mhaseeb123 Dec 4, 2024
690c165
Merge branch 'fea/extract-pq-bloom-filter-data' of https://github.com…
mhaseeb123 Dec 4, 2024
c5f8150
Remove unused header
mhaseeb123 Dec 4, 2024
7a21a6e
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 6, 2024
4465277
Apply suggestions from code review
mhaseeb123 Dec 9, 2024
3888732
Apply suggestions
mhaseeb123 Dec 9, 2024
8bc8927
Update cpp/src/io/parquet/reader_impl_helpers.hpp
mhaseeb123 Dec 9, 2024
d719e65
Update cpp/src/io/parquet/reader_impl_helpers.hpp
mhaseeb123 Dec 9, 2024
03cf07f
Move equality_literals instead of copying
mhaseeb123 Dec 9, 2024
de94168
Merge branch 'fea/extract-pq-bloom-filter-data' of https://github.com…
mhaseeb123 Dec 9, 2024
c92d326
Minor
mhaseeb123 Dec 9, 2024
82083f9
Use spans instead of passing around vectors
mhaseeb123 Dec 10, 2024
6918a40
Minor
mhaseeb123 Dec 10, 2024
85cdc00
Make `get_equality_literals()` safe again
mhaseeb123 Dec 10, 2024
aa1a909
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 10, 2024
fdf8fc8
Update counting_iterator
mhaseeb123 Dec 10, 2024
10a8f5a
Minor changes
mhaseeb123 Dec 10, 2024
d46504f
Minor
mhaseeb123 Dec 10, 2024
c94ce86
Sync arrow filter policy with cuco
mhaseeb123 Dec 10, 2024
69aa685
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 11, 2024
d95a178
Address partial reviewer comments and fix new logger header
mhaseeb123 Dec 12, 2024
840c6e7
Revert to direct dtype check until I find a way to get scalar from li…
mhaseeb123 Dec 12, 2024
9d8c071
Create a dummy scalar of type T and compare with dtype
mhaseeb123 Dec 12, 2024
3b8aea0
Use a temporary scalar
mhaseeb123 Dec 12, 2024
0c859db
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 12, 2024
c385537
Recalculate `total_row_groups` in apply_bloom_filter
mhaseeb123 Dec 13, 2024
3693ad1
Simplify bloom filter expression with ast::tree and handle non-equali…
mhaseeb123 Dec 13, 2024
c2de9fb
Apply suggestions from code review
mhaseeb123 Dec 13, 2024
344851c
Minor optimization: Set `have_bloom_filters` while populating `bloom_…
mhaseeb123 Dec 14, 2024
96fb7c2
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 14, 2024
4522afa
Add pytest to test logical or with non == expr
mhaseeb123 Dec 16, 2024
ed66593
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 17, 2024
f509148
Remove temporary arrow_filter_policy.cuh and use cuco directly.
mhaseeb123 Dec 17, 2024
c8cd646
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 17, 2024
4194d30
MInor style fix
mhaseeb123 Dec 17, 2024
8b7baff
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Dec 18, 2024
2fce902
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Jan 6, 2025
7b0735d
Update copyright year to 2025
mhaseeb123 Jan 6, 2025
4807d1a
Apply suggestions from code review
mhaseeb123 Jan 13, 2025
39b3412
Minor refactor, early exit if no bloom filters
mhaseeb123 Jan 13, 2025
a2ede06
Merge branch 'fea/extract-pq-bloom-filter-data' of https://github.com…
mhaseeb123 Jan 13, 2025
86f0c12
Remove use of unnecessary mr parameter
mhaseeb123 Jan 13, 2025
bd9aa04
Disable boolean types in bloom filters.
mhaseeb123 Jan 13, 2025
cb0b844
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Jan 13, 2025
fa67675
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Jan 13, 2025
478a66f
Merge branch 'branch-25.02' into fea/extract-pq-bloom-filter-data
mhaseeb123 Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ add_library(
src/datetime/timezone.cpp
src/io/orc/writer_impl.cu
src/io/parquet/arrow_schema_writer.cpp
src/io/parquet/bloom_filter_reader.cu
src/io/parquet/compact_protocol_reader.cpp
src/io/parquet/compact_protocol_writer.cpp
src/io/parquet/decode_preprocess.cu
Expand Down
688 changes: 688 additions & 0 deletions cpp/src/io/parquet/bloom_filter_reader.cu

Large diffs are not rendered by default.

35 changes: 33 additions & 2 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -658,14 +658,43 @@ void CompactProtocolReader::read(ColumnChunk* c)
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterAlgorithm* alg)
{
auto op = std::make_tuple(parquet_field_union_enumerator(1, alg->algorithm));
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterHash* hash)
{
auto op = std::make_tuple(parquet_field_union_enumerator(1, hash->hash));
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterCompression* comp)
{
auto op = std::make_tuple(parquet_field_union_enumerator(1, comp->compression));
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterHeader* bf)
{
auto op = std::make_tuple(parquet_field_int32(1, bf->num_bytes),
parquet_field_struct(2, bf->algorithm),
parquet_field_struct(3, bf->hash),
parquet_field_struct(4, bf->compression));
function_builder(this, op);
}

void CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
using optional_list_enc_stats =
parquet_field_optional<std::vector<PageEncodingStats>,
parquet_field_struct_list<PageEncodingStats>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
using optional_i64 = parquet_field_optional<int64_t, parquet_field_int64>;
using optional_i32 = parquet_field_optional<int32_t, parquet_field_int32>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
parquet_field_enum<Compression>(4, c->codec),
Expand All @@ -677,6 +706,8 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics),
optional_list_enc_stats(13, c->encoding_stats),
optional_i64(14, c->bloom_filter_offset),
optional_i32(15, c->bloom_filter_length),
optional_size_statistics(16, c->size_statistics));
function_builder(this, op);
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,10 @@ class CompactProtocolReader {
void read(IntType* t);
void read(RowGroup* r);
void read(ColumnChunk* c);
void read(BloomFilterAlgorithm* bf);
void read(BloomFilterHash* bf);
void read(BloomFilterCompression* bf);
void read(BloomFilterHeader* bf);
void read(ColumnChunkMetaData* c);
void read(PageHeader* p);
void read(DataPageHeader* d);
Expand Down
52 changes: 51 additions & 1 deletion cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -382,12 +382,62 @@ struct ColumnChunkMetaData {
// Set of all encodings used for pages in this column chunk. This information can be used to
// determine if all data pages are dictionary encoded for example.
std::optional<std::vector<PageEncodingStats>> encoding_stats;
// Byte offset from beginning of file to Bloom filter data.
std::optional<int64_t> bloom_filter_offset;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
// Size of Bloom filter data including the serialized header, in bytes. Added in 2.10 so readers
// may not read this field from old files and it can be obtained after the BloomFilterHeader has
// been deserialized. Writers should write this field so readers can read the bloom filter in a
// single I/O.
std::optional<int32_t> bloom_filter_length;
// Optional statistics to help estimate total memory when converted to in-memory representations.
// The histograms contained in these statistics can also be useful in some cases for more
// fine-grained nullability/list length filter pushdown.
std::optional<SizeStatistics> size_statistics;
};

/**
* @brief The algorithm used in bloom filter
*/
struct BloomFilterAlgorithm {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
// Block-based Bloom filter.
enum class Algorithm { UNDEFINED, SPLIT_BLOCK };
Algorithm algorithm{Algorithm::SPLIT_BLOCK};
};

/**
* @brief The hash function used in Bloom filter
*/
struct BloomFilterHash {
// xxHash_64
enum class Hash { UNDEFINED, XXHASH };
Hash hash{Hash::XXHASH};
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* @brief The compression used in the bloom filter
*/
struct BloomFilterCompression {
enum class Compression { UNDEFINED, UNCOMPRESSED };
Compression compression{Compression::UNCOMPRESSED};
};

/**
* @brief Bloom filter header struct
*
* The bloom filter data of a column chunk stores this header at the beginning
* following by the filter bitset.
*/
struct BloomFilterHeader {
// The size of bitset in bytes
int32_t num_bytes;
// The algorithm for setting bits
BloomFilterAlgorithm algorithm;
// The hash function used for bloom filter
BloomFilterHash hash;
// The compression used in the bloom filter
BloomFilterCompression compression;
};

/**
* @brief Thrift-derived struct describing a chunk of data for a particular
* column
Expand Down
136 changes: 88 additions & 48 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
#include <thrust/iterator/counting_iterator.h>

#include <algorithm>
#include <limits>
#include <numeric>
#include <optional>
#include <unordered_set>
Expand Down Expand Up @@ -388,6 +389,7 @@ class stats_expression_converter : public ast::detail::expression_transformer {
} // namespace

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
Expand All @@ -396,7 +398,6 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
{
auto mr = cudf::get_current_device_resource_ref();
// Create row group indices.
std::vector<std::vector<size_type>> filtered_row_group_indices;
std::vector<std::vector<size_type>> all_row_group_indices;
host_span<std::vector<size_type> const> input_row_group_indices;
if (row_group_indices.empty()) {
Expand All @@ -412,18 +413,22 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
} else {
input_row_group_indices = row_group_indices;
}
auto const total_row_groups = std::accumulate(input_row_group_indices.begin(),
input_row_group_indices.end(),
0,
[](size_type sum, auto const& per_file_row_groups) {
return sum + per_file_row_groups.size();
});
auto const total_row_groups = std::accumulate(
input_row_group_indices.begin(),
input_row_group_indices.end(),
size_t{0},
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

// Converts Column chunk statistics to a table
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1]
// For each column, it contains #sources * #column_chunks_per_src rows.
std::vector<std::unique_ptr<column>> columns;
stats_caster const stats_col{total_row_groups, per_file_metadata, input_row_group_indices};
stats_caster const stats_col{
static_cast<size_type>(total_row_groups), per_file_metadata, input_row_group_indices};
for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const schema_idx = output_column_schemas[col_idx];
auto const& dtype = output_dtypes[col_idx];
Expand Down Expand Up @@ -452,44 +457,23 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
CUDF_EXPECTS(predicate.type().id() == cudf::type_id::BOOL8,
"Filter expression must return a boolean column");

auto const host_bitmask = [&] {
auto const num_bitmasks = num_bitmask_words(predicate.size());
if (predicate.nullable()) {
return cudf::detail::make_host_vector_sync(
device_span<bitmask_type const>(predicate.null_mask(), num_bitmasks), stream);
} else {
auto bitmask = cudf::detail::make_host_vector<bitmask_type>(num_bitmasks, stream);
std::fill(bitmask.begin(), bitmask.end(), ~bitmask_type{0});
return bitmask;
}
}();
// Filter stats table with StatsAST expression and collect filtered row group indices
auto const filtered_row_group_indices = collect_filtered_row_group_indices(
stats_table, stats_expr.get_stats_expr(), input_row_group_indices, stream, mr);

auto validity_it = cudf::detail::make_counting_transform_iterator(
0, [bitmask = host_bitmask.data()](auto bit_index) { return bit_is_set(bitmask, bit_index); });
// Span of row groups to apply bloom filtering on.
auto const bloom_filter_input_row_groups =
filtered_row_group_indices.has_value()
? host_span<std::vector<size_type> const>(filtered_row_group_indices.value())
: input_row_group_indices;
vuule marked this conversation as resolved.
Show resolved Hide resolved

auto const is_row_group_required = cudf::detail::make_host_vector_sync(
device_span<uint8_t const>(predicate.data<uint8_t>(), predicate.size()), stream);
// Apply bloom filtering on the bloom filter input row groups
auto const bloom_filtered_row_groups = apply_bloom_filters(
sources, bloom_filter_input_row_groups, output_dtypes, output_column_schemas, filter, stream);

// Return only filtered row groups based on predicate
// if all are required or all are nulls, return.
if (std::all_of(is_row_group_required.cbegin(),
is_row_group_required.cend(),
[](auto i) { return bool(i); }) or
predicate.null_count() == predicate.size()) {
return std::nullopt;
}
size_type is_required_idx = 0;
for (auto const& input_row_group_index : input_row_group_indices) {
std::vector<size_type> filtered_row_groups;
for (auto const rg_idx : input_row_group_index) {
if ((!validity_it[is_required_idx]) || is_row_group_required[is_required_idx]) {
filtered_row_groups.push_back(rg_idx);
}
++is_required_idx;
}
filtered_row_group_indices.push_back(std::move(filtered_row_groups));
}
return {std::move(filtered_row_group_indices)};
// Return bloom filtered row group indices iff collected
return bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups
: filtered_row_group_indices;
}

// convert column named expression to column index reference expression
Expand All @@ -510,14 +494,14 @@ named_to_reference_converter::named_to_reference_converter(
std::reference_wrapper<ast::expression const> named_to_reference_converter::visit(
ast::literal const& expr)
{
_stats_expr = std::reference_wrapper<ast::expression const>(expr);
_converted_expr = std::reference_wrapper<ast::expression const>(expr);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
return expr;
}

std::reference_wrapper<ast::expression const> named_to_reference_converter::visit(
ast::column_reference const& expr)
{
_stats_expr = std::reference_wrapper<ast::expression const>(expr);
_converted_expr = std::reference_wrapper<ast::expression const>(expr);
return expr;
}

Expand All @@ -531,7 +515,7 @@ std::reference_wrapper<ast::expression const> named_to_reference_converter::visi
}
auto col_index = col_index_it->second;
_col_ref.emplace_back(col_index);
_stats_expr = std::reference_wrapper<ast::expression const>(_col_ref.back());
_converted_expr = std::reference_wrapper<ast::expression const>(_col_ref.back());
return std::reference_wrapper<ast::expression const>(_col_ref.back());
}

Expand All @@ -546,7 +530,7 @@ std::reference_wrapper<ast::expression const> named_to_reference_converter::visi
} else if (cudf::ast::detail::ast_operator_arity(op) == 1) {
_operators.emplace_back(op, new_operands.front());
}
_stats_expr = std::reference_wrapper<ast::expression const>(_operators.back());
_converted_expr = std::reference_wrapper<ast::expression const>(_operators.back());
return std::reference_wrapper<ast::expression const>(_operators.back());
}

Expand Down Expand Up @@ -640,4 +624,60 @@ class names_from_expression : public ast::detail::expression_transformer {
return names_from_expression(expr, skip_names).to_vector();
}

std::optional<std::vector<std::vector<size_type>>> collect_filtered_row_group_indices(
cudf::table_view table,
std::reference_wrapper<ast::expression const> ast_expr,
host_span<std::vector<size_type> const> input_row_group_indices,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// Filter the input table using AST expression
auto predicate_col = cudf::detail::compute_column(table, ast_expr.get(), stream, mr);
auto predicate = predicate_col->view();
CUDF_EXPECTS(predicate.type().id() == cudf::type_id::BOOL8,
"Filter expression must return a boolean column");

auto const host_bitmask = [&] {
auto const num_bitmasks = num_bitmask_words(predicate.size());
if (predicate.nullable()) {
return cudf::detail::make_host_vector_sync(
device_span<bitmask_type const>(predicate.null_mask(), num_bitmasks), stream);
} else {
auto bitmask = cudf::detail::make_host_vector<bitmask_type>(num_bitmasks, stream);
std::fill(bitmask.begin(), bitmask.end(), ~bitmask_type{0});
return bitmask;
}
}();

auto validity_it = cudf::detail::make_counting_transform_iterator(
0, [bitmask = host_bitmask.data()](auto bit_index) { return bit_is_set(bitmask, bit_index); });

// Return only filtered row groups based on predicate
auto const is_row_group_required = cudf::detail::make_host_vector_sync(
device_span<uint8_t const>(predicate.data<uint8_t>(), predicate.size()), stream);

// Return if all are required, or all are nulls.
if (predicate.null_count() == predicate.size() or std::all_of(is_row_group_required.cbegin(),
is_row_group_required.cend(),
[](auto i) { return bool(i); })) {
return std::nullopt;
}

// Collect indices of the filtered row groups
size_type is_required_idx = 0;
std::vector<std::vector<size_type>> filtered_row_group_indices;
for (auto const& input_row_group_index : input_row_group_indices) {
std::vector<size_type> filtered_row_groups;
for (auto const rg_idx : input_row_group_index) {
if ((!validity_it[is_required_idx]) || is_row_group_required[is_required_idx]) {
filtered_row_groups.push_back(rg_idx);
}
++is_required_idx;
}
filtered_row_group_indices.push_back(std::move(filtered_row_groups));
}

return {filtered_row_group_indices};
}

} // namespace cudf::io::parquet::detail
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1030,6 +1030,7 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con

std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
aggregate_reader_metadata::select_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
int64_t skip_rows_opt,
std::optional<size_type> const& num_rows_opt,
Expand All @@ -1042,7 +1043,7 @@ aggregate_reader_metadata::select_row_groups(
// if filter is not empty, then gather row groups to read after predicate pushdown
if (filter.has_value()) {
filtered_row_group_indices = filter_row_groups(
row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream);
sources, row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream);
if (filtered_row_group_indices.has_value()) {
row_group_indices =
host_span<std::vector<size_type> const>(filtered_row_group_indices.value());
Expand Down
Loading
Loading