Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing of compressed ORC files with large stripe footers #17700

Open
wants to merge 11 commits into
base: branch-25.02
Choose a base branch
from
30 changes: 22 additions & 8 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2595,16 +2595,30 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data,
: 0;
if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; }
}
ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0);

ProtobufWriter pbw;
pbw.write(sf);
stripe.footerLength = pbw.size();
if (_compression != compression_type::NONE) {
uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1;
pbw.buffer()[0] = static_cast<uint8_t>(uncomp_sf_len >> 0);
pbw.buffer()[1] = static_cast<uint8_t>(uncomp_sf_len >> 8);
pbw.buffer()[2] = static_cast<uint8_t>(uncomp_sf_len >> 16);
if (_compression == compression_type::NONE) {
_out_sink->host_write(pbw.data(), pbw.size());
stripe.footerLength = pbw.size();
} else {
std::size_t bytes_written = 0;
std::size_t written_sf_len = 0;
while (written_sf_len < pbw.size()) {
auto const block_size = std::min(_compression_blocksize, pbw.size() - written_sf_len);
auto const header_val = block_size * 2 + 1; // 1 means uncompressed
CUDF_EXPECTS(header_val >> 24 == 0, "Block length exceeds maximum size");
std::array const header{static_cast<uint8_t>(header_val >> 0),
static_cast<uint8_t>(header_val >> 8),
static_cast<uint8_t>(header_val >> 16)};

_out_sink->host_write(header.data(), header.size());
_out_sink->host_write(pbw.data() + written_sf_len, block_size);
written_sf_len += block_size;
bytes_written += header.size() + block_size;
}
stripe.footerLength = bytes_written;
vuule marked this conversation as resolved.
Show resolved Hide resolved
}
_out_sink->host_write(pbw.data(), pbw.size());
}
for (auto const& task : write_tasks) {
task.wait();
Expand Down
19 changes: 19 additions & 0 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2196,4 +2196,23 @@ TEST_F(OrcChunkedWriterTest, NoDataInSinkWhenNoWrite)
EXPECT_EQ(out_buffer.size(), 0);
}

TEST_F(OrcWriterTest, MultipleBlocksInStripeFooter)
{
std::vector<std::string> vals_col(8, "a");
str_col col{vals_col.begin(), vals_col.end()};
cudf::column_view col_view = col;
table_view expected(std::vector<cudf::column_view>{6400, col_view});

std::vector<char> out_buffer;
cudf::io::orc_writer_options out_opts =
cudf::io::orc_writer_options::builder(cudf::io::sink_info{&out_buffer}, expected);
// Write with compression on (default)
cudf::io::write_orc(out_opts);

cudf::io::orc_reader_options in_opts = cudf::io::orc_reader_options::builder(
cudf::io::source_info{out_buffer.data(), out_buffer.size()});
auto result = cudf::io::read_orc(in_opts);
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

CUDF_TEST_PROGRAM_MAIN()
Loading