Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Support for COPY TO/FROM Google Cloud Storage #61

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8553677
Adds support for COPY TO/FROM Azure Blob Storage
aykut-bozkurt Oct 23, 2024
f187bc4
Adds support for COPY TO/FROM Google Cloud Storage
aykut-bozkurt Oct 23, 2024
7664102
Merge branch 'main' into aykut/azure-blob-storage
aykut-bozkurt Dec 16, 2024
f1b7114
configure test endpoint via env var
aykut-bozkurt Dec 16, 2024
31f6701
merge
aykut-bozkurt Dec 16, 2024
4cb8eb1
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Dec 16, 2024
f3a31d0
no curl in fake-gcs
aykut-bozkurt Dec 16, 2024
0c65a3b
ci uri fix
aykut-bozkurt Dec 16, 2024
d76bf55
ci uri fix
aykut-bozkurt Dec 16, 2024
74df5b2
update readme
aykut-bozkurt Dec 16, 2024
841f5ec
Merge branch 'main' into aykut/azure-blob-storage
aykut-bozkurt Dec 24, 2024
a8df29d
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Dec 24, 2024
6102ba9
improve coverage
aykut-bozkurt Jan 3, 2025
3af0a78
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 3, 2025
24ff523
support connection string
aykut-bozkurt Jan 6, 2025
5f19d59
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 6, 2025
e0df256
support connection string
aykut-bozkurt Jan 6, 2025
dc181dd
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 6, 2025
e9d0cdd
support connection string
aykut-bozkurt Jan 6, 2025
88080dd
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 6, 2025
d5f5ef2
- support azure bearer token via client secret
aykut-bozkurt Jan 10, 2025
0016a3f
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 10, 2025
5955d57
google config to pass to object store
aykut-bozkurt Jan 10, 2025
60103e7
Merge branch 'main' into aykut/google-cloud-storage
aykut-bozkurt Jan 10, 2025
dd3d157
fix ci for fake google storage server
aykut-bozkurt Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"

# GCS tests
GOOGLE_TEST_BUCKET=testbucket
GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oauth": true,"client_email": "","private_key_id": "","private_key": ""}'
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443

# Others
RUST_TEST_THREADS=1
15 changes: 15 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro
- ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw
- ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw
- ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw
- ./entrypoint.sh:/entrypoint.sh
env_file:
- .env
Expand All @@ -20,6 +21,7 @@ services:
depends_on:
- minio
- azurite
- fake-gcs-server

minio:
image: minio/minio
Expand Down Expand Up @@ -47,3 +49,16 @@ services:
interval: 6s
timeout: 2s
retries: 3

fake-gcs-server:
image: tustvold/fake-gcs-server
env_file:
- .env
network_mode: host
command: -scheme http -public-host localhost:4443
restart: unless-stopped
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "4443"]
interval: 6s
timeout: 2s
retries: 3
3 changes: 3 additions & 0 deletions .devcontainer/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM
# create azurite container
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

# create fake-gcs bucket
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

sleep infinity
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ jobs:
# create container
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

- name: Start fake-gcs-server for Google Cloud Storage emulator tests
run: |
docker run -d \
--env-file .devcontainer/.env \
-p 4443:4443 \
tustvold/fake-gcs-server -scheme http -public-host localhost:4443

while ! curl $GOOGLE_SERVICE_ENDPOINT; do
echo "Waiting for $GOOGLE_SERVICE_ENDPOINT..."
sleep 1
done

curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

- name: Run tests
run: |
# Run tests with coverage tool
Expand Down
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"rust-analyzer.checkOnSave": true,
"editor.inlayHints.enabled": "offUnlessPressed",
"files.watcherExclude": {
"**/target/**": true
}
"**/target/**": true
}
}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ aws-credential-types = {version = "1", default-features = false}
azure_storage = {version = "0.21", default-features = false}
futures = "0.3"
home = "0.5"
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
once_cell = "1"
parquet = {version = "53", default-features = false, features = [
"arrow",
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM
```

## Object Store Support
`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores.
`pg_parquet` supports reading and writing Parquet files from/to `S3`, `Azure Blob Storage` and `Google Cloud Service` object stores.

> [!NOTE]
> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user.
Expand Down Expand Up @@ -239,6 +239,28 @@ Supported authorization methods' priority order is shown below:
2. Sas token,
3. Storage key.

#### Google Cloud Storage

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
"gcs_base_url": "http://localhost:4443",
"disable_oauth": true,
"client_email": "",
"private_key_id": "",
"private_key": ""
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key **(only via environment variables)**
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file **(only via environment variables)**

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
Expand Down
12 changes: 9 additions & 3 deletions src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ use crate::{
arrow_parquet::uri_utils::uri_as_string,
object_store::{
aws::create_s3_object_store, azure::create_azure_object_store,
local_file::create_local_file_object_store,
gcs::create_gcs_object_store, local_file::create_local_file_object_store,
},
PG_BACKEND_TOKIO_RUNTIME,
};

pub(crate) mod aws;
pub(crate) mod azure;
pub(crate) mod gcs;
pub(crate) mod local_file;

pub(crate) fn create_object_store(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| {
panic!(
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unrecognized uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",
uri
)
});
Expand All @@ -37,6 +38,11 @@ pub(crate) fn create_object_store(uri: &Url, copy_from: bool) -> (Arc<dyn Object

(storage_container, path)
}
ObjectStoreScheme::GoogleCloudStorage => {
let storage_container = Arc::new(create_gcs_object_store(uri));

(storage_container, path)
}
ObjectStoreScheme::Local => {
let storage_container = Arc::new(create_local_file_object_store(uri, copy_from));

Expand All @@ -47,7 +53,7 @@ pub(crate) fn create_object_store(uri: &Url, copy_from: bool) -> (Arc<dyn Object
}
_ => {
panic!(
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",
uri.scheme(),
uri
);
Expand Down
57 changes: 57 additions & 0 deletions src/object_store/gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use url::Url;

// create_gcs_object_store a GoogleCloudStorage object store from given uri.
// It is configured by environment variables. Currently, we only support
// following environment variables:
// - GOOGLE_SERVICE_ACCOUNT_KEY
// - GOOGLE_SERVICE_ACCOUNT_PATH
pub(crate) fn create_gcs_object_store(uri: &Url) -> GoogleCloudStorage {
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
panic!("unsupported gcs uri: {}", uri);

Check warning on line 11 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L11

Added line #L11 was not covered by tests
});

let mut gcs_builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket_name);

let gcs_config = GoogleStorageConfig::load();

// service account key
if let Some(service_account_key) = gcs_config.service_account_key {
gcs_builder = gcs_builder.with_service_account_key(&service_account_key);
}

// service account path
if let Some(service_account_path) = gcs_config.service_account_path {
gcs_builder = gcs_builder.with_service_account_path(&service_account_path);

Check warning on line 25 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L25

Added line #L25 was not covered by tests
}

gcs_builder.build().unwrap_or_else(|e| panic!("{}", e))
}

fn parse_gcs_bucket(uri: &Url) -> Option<String> {
let host = uri.host_str()?;

// gs://{bucket}/key
if uri.scheme() == "gs" {
return Some(host.to_string());
}

None

Check warning on line 39 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L37-L39

Added lines #L37 - L39 were not covered by tests
}

// GoogleStorageConfig is a struct that holds the configuration that is
// used to configure the Google Storage object store.
struct GoogleStorageConfig {
service_account_key: Option<String>,
service_account_path: Option<String>,
}

impl GoogleStorageConfig {
// load loads the Google Storage configuration from the environment.
fn load() -> Self {
Self {
service_account_key: std::env::var("GOOGLE_SERVICE_ACCOUNT_KEY").ok(),
service_account_path: std::env::var("GOOGLE_SERVICE_ACCOUNT_PATH").ok(),
}
}
}
45 changes: 41 additions & 4 deletions src/pgrx_tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
Expand Down Expand Up @@ -500,7 +500,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
Expand Down Expand Up @@ -547,10 +547,47 @@ mod tests {
}

#[pg_test]
#[should_panic(expected = "unsupported scheme gs in uri gs://testbucket")]
fn test_gcs_from_env() {
let test_bucket_name: String =
std::env::var("GOOGLE_TEST_BUCKET").expect("GOOGLE_TEST_BUCKET not found");

let gcs_uri = format!("gs://{}/pg_parquet_test.parquet", test_bucket_name);

let test_table = TestTable::<i32>::new("int4".into()).with_uri(gcs_uri);

test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_write_wrong_bucket() {
let s3_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}';",
s3_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_read_wrong_bucket() {
let gcs_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let create_table_command = "CREATE TABLE test_table (a int);";
Spi::run(create_table_command).unwrap();

let copy_from_command = format!("COPY test_table FROM '{}';", gcs_uri);
Spi::run(copy_from_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "unsupported scheme http in uri http://testbucket")]
fn test_unsupported_uri() {
let test_table =
TestTable::<i32>::new("int4".into()).with_uri("gs://testbucket".to_string());
TestTable::<i32>::new("int4".into()).with_uri("http://testbucket".to_string());
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}
Expand Down
Loading