Skip to content

Commit

Permalink
[feat]: CGO packed writer api (#160)
Browse files Browse the repository at this point in the history
related: #158
1. Put parquet writer properties into storage config
2. Packed writer close() API return column offset mapping.
3. Add CGO and Go API for packed writer

---------

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Jan 9, 2025
1 parent 51b7244 commit f51fd09
Show file tree
Hide file tree
Showing 33 changed files with 695 additions and 275 deletions.
13 changes: 8 additions & 5 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st

auto paths = std::vector<std::string>{path + "/0", path + "/1"};

// after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file
// after writing, the pk and the ts are in the first file, and the large str is in the second file
std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("int", arrow::utf8()),
arrow::field("int64", arrow::int32()),
Expand All @@ -93,7 +93,7 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
auto schema = arrow::schema(fields);

for (auto _ : st) {
PackedRecordBatchReader pr(*fs, paths, schema, column_offsets, needed_columns, buffer_size);
PackedRecordBatchReader pr(*fs, path, schema, needed_columns, buffer_size);
auto r = arrow::RecordBatch::MakeEmpty(schema);
SKIP_IF_NOT_OK(r.status(), st)
auto rb = r.ValueOrDie();
Expand All @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
}
}

static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
static void PackedWrite(benchmark::State& st,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& path,
size_t buffer_size) {
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())});
arrow::Int32Builder int_builder;
Expand All @@ -134,7 +137,7 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf);
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand All @@ -153,7 +156,7 @@ std::string PATH = "/tmp/bench/foo";

BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) {
SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st);
PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024);
PackedWrite(st, fs_, PATH, 22 * 1024 * 1024);
}
BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime();

Expand Down
26 changes: 13 additions & 13 deletions cpp/include/milvus-storage/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@
#pragma once

#include <sstream>
#include <parquet/properties.h>

using namespace std;

namespace milvus_storage {

static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB

// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

struct StorageConfig {
std::string uri = "";
std::string bucket_name = "";
Expand All @@ -31,6 +43,7 @@ struct StorageConfig {
std::string region = "";
bool use_custom_part_upload_size = false;
int64_t part_size = 0;
parquet::WriterProperties writer_props = *parquet::default_writer_properties();

std::string ToString() const {
std::stringstream ss;
Expand All @@ -42,17 +55,4 @@ struct StorageConfig {
}
};

static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB

// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

} // namespace milvus_storage
12 changes: 8 additions & 4 deletions cpp/include/milvus-storage/common/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ namespace milvus_storage {

constexpr char kSep = '/';

arrow::Status NotAFile(std::string_view path) {
static inline arrow::Status NotAFile(std::string_view path) {
return arrow::Status::IOError("Not a regular file: " + std::string(path));
}

bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; }
static inline bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; }

std::string EnsureTrailingSlash(std::string_view v) {
static inline std::string EnsureTrailingSlash(std::string_view v) {
if (!v.empty() && !HasTrailingSlash(v)) {
// XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"...
// Unless the local filesystem always uses absolute paths
Expand All @@ -37,7 +37,7 @@ std::string EnsureTrailingSlash(std::string_view v) {
}
}

std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
static inline std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
// XXX should strip trailing slash?

auto pos = s.find_last_of(kSep);
Expand All @@ -48,4 +48,8 @@ std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s)
return {s.substr(0, pos), s.substr(pos + 1)};
}

static inline std::string ConcatenateFilePath(const std::string& parent, const std::string& child) {
return parent + kSep + child;
}

} // namespace milvus_storage
60 changes: 58 additions & 2 deletions cpp/include/milvus-storage/common/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,78 @@

namespace milvus_storage {

static const std::string GROUP_DELIMITER = ";";
static const std::string COLUMN_DELIMITER = ",";
static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";
static const std::string COLUMN_OFFSETS_META_KEY = "column_offsets";

class PackedMetaSerde {
public:
// Serialize a vector of size_t to a byte array and convert it to a string
static std::string serialize(const std::vector<size_t>& sizes) {
static std::string SerializeRowGroupSizes(const std::vector<size_t>& sizes) {
std::vector<uint8_t> byteArray(sizes.size() * sizeof(size_t));
std::memcpy(byteArray.data(), sizes.data(), byteArray.size());
return std::string(byteArray.begin(), byteArray.end());
}

// Deserialize a string back to a vector of size_t
static std::vector<size_t> deserialize(const std::string& input) {
static std::vector<size_t> DeserializeRowGroupSizes(const std::string& input) {
std::vector<uint8_t> byteArray(input.begin(), input.end());
std::vector<size_t> sizes(byteArray.size() / sizeof(size_t));
std::memcpy(sizes.data(), byteArray.data(), byteArray.size());
return sizes;
}

static std::string SerializeColumnOffsets(const std::vector<std::vector<int>>& column_offsets) {
std::stringstream ss;
for (size_t i = 0; i < column_offsets.size(); ++i) {
if (i > 0) {
ss << GROUP_DELIMITER;
}

for (size_t j = 0; j < column_offsets[i].size(); ++j) {
if (j > 0) {
ss << COLUMN_DELIMITER;
}
ss << column_offsets[i][j];
}
}

auto s = ss.str();
return s;
}

static std::vector<std::vector<int>> DeserializeColumnOffsets(const std::string& input) {
std::vector<std::vector<int>> column_offsets;

size_t group_start = 0;
size_t group_end = input.find(GROUP_DELIMITER);

while (group_start != std::string::npos) {
std::string group = input.substr(group_start, group_end - group_start);
std::vector<int> group_indices;

size_t column_start = 0;
size_t column_end = group.find(COLUMN_DELIMITER);
while (column_start != std::string::npos) {
std::string column = group.substr(column_start, column_end - column_start);
if (!column.empty()) {
group_indices.push_back(std::stoi(column));
}
column_start = (column_end == std::string::npos) ? std::string::npos : column_end + COLUMN_DELIMITER.size();
column_end = group.find(COLUMN_DELIMITER, column_start);
}

if (!group_indices.empty()) {
column_offsets.push_back(group_indices);
}

group_start = (group_end == std::string::npos) ? std::string::npos : group_end + GROUP_DELIMITER.size();
group_end = input.find(GROUP_DELIMITER, group_start);
}

return column_offsets;
}
};

} // namespace milvus_storage
5 changes: 5 additions & 0 deletions cpp/include/milvus-storage/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Status {

static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); }

static Status ReaderError(const std::string& msg) { return Status(kReaderError, msg); }

static Status IOError(const std::string& msg) { return Status(kIOError, msg); }

bool ok() const { return code_ == kOk; }
Expand All @@ -51,6 +53,8 @@ class Status {

bool IsWriterError() const { return code_ == kWriterError; }

bool IsReaderError() const { return code_ == kReaderError; }

bool IsIOError() const { return code_ == kIOError; }

std::string ToString() const;
Expand All @@ -64,6 +68,7 @@ class Status {
kFileNotFound = 4,
kWriterError = 5,
kIOError = 6,
kReaderError = 7
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
12 changes: 2 additions & 10 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <parquet/properties.h>
#include <memory>
#include "arrow/filesystem/filesystem.h"
#include "format/writer.h"
Expand All @@ -27,19 +26,11 @@ namespace milvus_storage {

class ParquetFileWriter : public FileWriter {
public:
// with default WriterProperties
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config);

// with custom WriterProperties
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props);

Status Init() override;

Status Write(const arrow::RecordBatch& record) override;
Expand All @@ -49,6 +40,8 @@ class ParquetFileWriter : public FileWriter {
Status WriteRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::vector<size_t>& batch_memory_sizes);

void AppendKVMetadata(const std::string& key, const std::string& value);

int64_t count() override;

Status Close() override;
Expand All @@ -61,7 +54,6 @@ class ParquetFileWriter : public FileWriter {

std::unique_ptr<parquet::arrow::FileWriter> writer_;
std::shared_ptr<arrow::KeyValueMetadata> kv_metadata_;
parquet::WriterProperties props_;
int64_t count_ = 0;
int row_group_num_ = 0;
std::vector<size_t> row_group_sizes_;
Expand Down
7 changes: 7 additions & 0 deletions cpp/include/milvus-storage/packed/chunk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "packed/column_group.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand All @@ -28,7 +29,13 @@ struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset() = default;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}

std::string ToString() {
return "path_index: " + std::to_string(path_index) + ", col_index: " + std::to_string(col_index);
}
};

// record which chunk is in use and its offset in the file
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

#pragma once

#include <arrow/type.h>
#include <arrow/record_batch.h>
#include <queue>
#include "common/status.h"
#include <map>
#include <string>

namespace milvus_storage {

Expand Down
10 changes: 2 additions & 8 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/status.h"
#include "packed/column_group.h"
#include "common/config.h"
#include "common/serde.h"

namespace milvus_storage {

Expand All @@ -34,16 +35,9 @@ class ColumnGroupWriter {
const StorageConfig& storage_config,
const std::vector<int>& origin_column_indices);

ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props,
const std::vector<int>& origin_column_indices);

Status Init();
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status WriteColumnOffsetsMeta(const std::vector<std::vector<int>>& column_offsets);
Status Flush();
Status Close();
GroupId Group_id() const;
Expand Down
12 changes: 4 additions & 8 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,9 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
// Test only
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

Expand All @@ -60,6 +53,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;
Expand All @@ -77,7 +71,9 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::set<int> needed_paths_;
std::vector<std::vector<size_t>> row_group_sizes_;
const std::string file_path_;
int read_count_;
};

Expand Down
Loading

0 comments on commit f51fd09

Please sign in to comment.