Skip to content

Commit

Permalink
cgo packed writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 4, 2025
1 parent 51b7244 commit 7d83835
Show file tree
Hide file tree
Showing 15 changed files with 330 additions and 126 deletions.
16 changes: 10 additions & 6 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
}
}

static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
static void PackedWrite(benchmark::State& st,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& path,
size_t buffer_size) {
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())});
arrow::Int32Builder int_builder;
Expand All @@ -134,17 +137,18 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf, *parquet::default_writer_properties());
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
st.SkipWithError(r.ToString());
break;
}
}
auto r = writer.Close();
if (!r.ok()) {
st.SkipWithError(r.ToString());
auto column_index_groups = writer.Close();
if (column_index_groups->Size() == 0) {
st.SkipWithError("Failed to close writer");
break;
}
}
}
Expand All @@ -153,7 +157,7 @@ std::string PATH = "/tmp/bench/foo";

BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) {
SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st);
PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024);
PackedWrite(st, fs_, PATH, 22 * 1024 * 1024);
}
BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime();

Expand Down
8 changes: 1 addition & 7 deletions cpp/include/milvus-storage/packed/chunk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "packed/column_group.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand All @@ -24,13 +25,6 @@

namespace milvus_storage {

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// record which chunk is in use and its offset in the file
struct ChunkState {
int chunk;
Expand Down
47 changes: 47 additions & 0 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <arrow/type.h>
#include <arrow/record_batch.h>
#include <queue>
#include "common/status.h"
Expand Down Expand Up @@ -87,4 +88,50 @@ struct ColumnGroupState {
void resetMemorySize() { this->memory_size = 0; }
};

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset() = default;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// ColumnOffsetMapping is a map of original field name to ColumnOffset.
// The column offset is a pair of path index and a column index in the path.
struct ColumnOffsetMapping {
ColumnOffsetMapping() = default;

ColumnOffsetMapping(const std::vector<std::vector<int>>& group_indices, const std::shared_ptr<arrow::Schema> schema) {
for (int path_index = 0; path_index < group_indices.size(); path_index++) {
for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) {
int original_col_index = group_indices[path_index][col_index];
std::string field_name = schema->field(original_col_index)->name();
mapping_[field_name] = ColumnOffset(path_index, col_index);
}
}
}

ColumnOffset GetColumnOffset(std::string field_name) {
if (mapping_.find(field_name) == mapping_.end()) {
return ColumnOffset(-1, -1);
}
return mapping_[field_name];
}

std::string ToString() {
std::string str;
for (auto& pair : mapping_) {
str += pair.first + "->" + std::to_string(pair.second.path_index) + ":" + std::to_string(pair.second.col_index) +
";";
}
return str;
}

size_t Size() { return mapping_.size(); }

private:
std::unordered_map<std::string, ColumnOffset> mapping_;
};

} // namespace milvus_storage
1 change: 0 additions & 1 deletion cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ extern "C" {
#include <arrow/c/abi.h>

typedef void* CReader;
typedef void* CStatus;
typedef void* CRecordBatch;
typedef void* CFileSystem;

Expand Down
14 changes: 9 additions & 5 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ class PackedRecordBatchWriter {
public:
PackedRecordBatchWriter(size_t memory_limit,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props);
parquet::WriterProperties& props = *parquet::default_writer_properties());

// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status Close();
// Close the writer and return the mapping of field name written to column offset.
std::unique_ptr<ColumnOffsetMapping> Close();

private:
// split first buffer into column groups based on column size
Expand All @@ -51,17 +52,20 @@ class PackedRecordBatchWriter {

Status writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record, size_t batch_size);
Status balanceMaxHeap();
Status flushRemainingBuffer();
Status flushUnsplittedBuffer();

std::vector<std::shared_ptr<arrow::RecordBatch>> buffered_batches_;
bool size_split_done_;
size_t memory_limit_;
std::shared_ptr<arrow::Schema> schema_;
arrow::fs::FileSystem& fs_;
const std::string& file_path_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
const std::string file_path_;
const StorageConfig& storage_config_;
parquet::WriterProperties& props_;
size_t current_memory_usage_;
std::vector<std::unique_ptr<ColumnGroupWriter>> group_writers_;
std::vector<std::vector<int>> group_indices_;
IndicesBasedSplitter splitter_;
MemoryMaxHeap max_heap_;
};
Expand Down
39 changes: 39 additions & 0 deletions cpp/include/milvus-storage/packed/writer_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#include <arrow/c/abi.h>

typedef void* CPackedWriter;
typedef void* CColumnOffsetMapping;

int NewPackedWriter(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedWriter* c_packed_writer);

int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema);

int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping);

void DeletePackedWriter(CPackedWriter c_packed_writer);

#ifdef __cplusplus
}
#endif
57 changes: 38 additions & 19 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ namespace milvus_storage {

PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props)
: memory_limit_(memory_limit),
schema_(std::move(schema)),
fs_(fs),
fs_(std::move(fs)),
file_path_(file_path),
storage_config_(storage_config),
props_(props),
Expand Down Expand Up @@ -65,22 +65,21 @@ Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>&
Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
std::vector<ColumnGroup> groups =
SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_);
std::vector<std::vector<int>> group_indices;
for (GroupId i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
std::string group_path = file_path_ + "/" + std::to_string(i);
auto writer = std::make_unique<ColumnGroupWriter>(i, group.Schema(), fs_, group_path, storage_config_, props_,
auto writer = std::make_unique<ColumnGroupWriter>(i, group.Schema(), *fs_, group_path, storage_config_, props_,
group.GetOriginColumnIndices());
RETURN_NOT_OK(writer->Init());
for (auto& batch : group.GetRecordBatches()) {
RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0)));
}

max_heap_.emplace(i, group.GetMemoryUsage());
group_indices.emplace_back(group.GetOriginColumnIndices());
group_indices_.emplace_back(group.GetOriginColumnIndices());
group_writers_.emplace_back(std::move(writer));
}
splitter_ = IndicesBasedSplitter(group_indices);
splitter_ = IndicesBasedSplitter(group_indices_);

// check memory usage limit
size_t min_memory_limit = groups.size() * ARROW_PART_UPLOAD_SIZE;
Expand Down Expand Up @@ -119,22 +118,26 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
return balanceMaxHeap();
}

Status PackedRecordBatchWriter::Close() {
std::unique_ptr<ColumnOffsetMapping> PackedRecordBatchWriter::Close() {
// write unsplitted record batch to one file if the buffer record batches are not splitted
if (!size_split_done_ && !buffered_batches_.empty()) {
std::string group_path = file_path_ + "/" + std::to_string(0);
std::vector<int> indices(buffered_batches_[0]->num_columns());
std::iota(std::begin(indices), std::end(indices), 0);
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), fs_, group_path,
storage_config_, props_, indices);
RETURN_NOT_OK(writer->Init());
for (auto& batch : buffered_batches_) {
RETURN_NOT_OK(writer->Write(batch));
auto status = flushUnsplittedBuffer();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write unsplitted record batch while closing writer.";
return std::make_unique<ColumnOffsetMapping>();
}
RETURN_NOT_OK(writer->Flush());
RETURN_NOT_OK(writer->Close());
return Status::OK();
return std::make_unique<ColumnOffsetMapping>(std::move(group_indices_), schema_);
}
// flush all remaining column groups before closing'
// flush all remaining column groups before closing
auto status = flushRemainingBuffer();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to flush remaining column groups while closing writer.";
return std::make_unique<ColumnOffsetMapping>();
}
return std::make_unique<ColumnOffsetMapping>(std::move(group_indices_), schema_);
}

Status PackedRecordBatchWriter::flushRemainingBuffer() {
while (!max_heap_.empty()) {
auto max_group = max_heap_.top();
max_heap_.pop();
Expand All @@ -150,6 +153,22 @@ Status PackedRecordBatchWriter::Close() {
return Status::OK();
}

Status PackedRecordBatchWriter::flushUnsplittedBuffer() {
std::string group_path = file_path_ + "/" + std::to_string(0);
std::vector<int> indices(buffered_batches_[0]->num_columns());
std::iota(std::begin(indices), std::end(indices), 0);
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), *fs_, group_path,
storage_config_, props_, indices);
RETURN_NOT_OK(writer->Init());
for (auto& batch : buffered_batches_) {
RETURN_NOT_OK(writer->Write(batch));
}
RETURN_NOT_OK(writer->Flush());
RETURN_NOT_OK(writer->Close());
group_indices_.emplace_back(indices);
return Status::OK();
}

Status PackedRecordBatchWriter::balanceMaxHeap() {
std::map<GroupId, size_t> group_map;
while (!max_heap_.empty()) {
Expand Down
69 changes: 69 additions & 0 deletions cpp/src/packed/writer_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "packed/writer_c.h"
#include "packed/writer.h"
#include "common/log.h"
#include "common/config.h"
#include "filesystem/fs.h"

#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <iostream>

int NewPackedWriter(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedWriter* c_packed_writer) {
try {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto trueFs = factory->BuildFileSystem(conf, &truePath).value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto writer =
std::make_unique<milvus_storage::PackedRecordBatchWriter>(buffer_size, trueSchema, trueFs, truePath, conf);

*c_packed_writer = writer.release();
return 0;
} catch (std::exception& e) {
return -1;
}
}

int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema) {
try {
auto packed_writer = static_cast<milvus_storage::PackedRecordBatchWriter*>(c_packed_writer);
auto record_batch = arrow::ImportRecordBatch(array, schema).ValueOrDie();
auto status = packed_writer->Write(record_batch);
if (!status.ok()) {
return -1;
}
return 0;
} catch (std::exception& e) {
return -1;
}
}

int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping) {
try {
auto packed_writer = static_cast<milvus_storage::PackedRecordBatchWriter*>(c_packed_writer);
*c_column_offset_mapping = packed_writer->Close().release();
delete packed_writer;
return 0;
} catch (std::exception& e) {
return -1;
}
}
Loading

0 comments on commit 7d83835

Please sign in to comment.