diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 378c3af31745..aa4e2343a38a 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -52,6 +52,8 @@ use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; const DEFAULT_READ_BUFFER: usize = 256 * 1024; +const MAX_INSERT_ROWS: &str = "max_insert_rows"; +const DEFAULT_MAX_INSERT_ROWS: usize = 1000; enum FileMetadata { Parquet { @@ -377,6 +379,11 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; + let max_insert_rows = req + .with + .get(MAX_INSERT_ROWS) + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_MAX_INSERT_ROWS); for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self @@ -427,6 +434,10 @@ impl StatementExecutor { rows_inserted += rows; insert_cost += cost; } + + if rows_inserted >= max_insert_rows { + return Ok(gen_insert_output(rows_inserted, insert_cost)); + } } if !pending.is_empty() { @@ -436,13 +447,17 @@ impl StatementExecutor { } } - Ok(Output::new( - OutputData::AffectedRows(rows_inserted), - OutputMeta::new_with_cost(insert_cost), - )) + Ok(gen_insert_output(rows_inserted, insert_cost)) } } +fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output { + Output::new( + OutputData::AffectedRows(rows_inserted), + OutputMeta::new_with_cost(insert_cost), + ) +} + /// Executes all pending inserts all at once, drain pending requests and reset pending bytes. async fn batch_insert( pending: &mut Vec>>, diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index 7bcabbbc2bd1..3a2eaed6174c 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -6,7 +6,19 @@ insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570 Affected Rows: 2 -Copy demo TO '/tmp/demo/export/parquet/demo.parquet'; +Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet'; + +Affected Rows: 2 + +CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index); + +Affected Rows: 0 + +insert into demo_2(host, cpu, memory, ts) values ('host3', 77.7, 1111, 1655276555000), ('host4', 99.9, 444.4, 1655276556000); + +Affected Rows: 2 + +Copy demo_2 TO '/tmp/demo/export/parquet_files/demo_2.parquet'; Affected Rows: 2 @@ -14,7 +26,7 @@ CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp Affected Rows: 0 -Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet'; +Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet'; Affected Rows: 2 @@ -31,15 +43,17 @@ CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time Affected Rows: 0 -Copy with_path FROM '/tmp/demo/export/parquet/'; +Copy with_path FROM '/tmp/demo/export/parquet_files/'; -Affected Rows: 2 +Affected Rows: 4 select * from with_path order by ts; +-------+------+--------+---------------------+ | host | cpu | memory | ts | +-------+------+--------+---------------------+ +| host3 | 77.7 | 1111.0 | 2022-06-15T07:02:35 | +| host4 | 99.9 | 444.4 | 2022-06-15T07:02:36 | | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +-------+------+--------+---------------------+ @@ -48,23 +62,61 @@ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp t Affected Rows: 0 -Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*'); +Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*'); -Affected Rows: 2 +Affected Rows: 4 select * from with_pattern order by ts; +-------+------+--------+---------------------+ | host | cpu | memory | ts | +-------+------+--------+---------------------+ +| host3 | 77.7 | 1111.0 | 2022-06-15T07:02:35 | +| host4 | 99.9 | 444.4 | 2022-06-15T07:02:36 | | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +-------+------+--------+---------------------+ +CREATE TABLE without_limit_rows(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/'; + +Affected Rows: 4 + +select count(*) from without_limit_rows; + ++----------+ +| COUNT(*) | ++----------+ +| 4 | ++----------+ + +CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2); + +Affected Rows: 2 + +select count(*) from with_limit_rows; + ++----------+ +| COUNT(*) | ++----------+ +| 2 | ++----------+ + drop table demo; Affected Rows: 0 +drop table demo_2; + +Affected Rows: 0 + drop table with_filename; Affected Rows: 0 @@ -77,3 +129,11 @@ drop table with_pattern; Affected Rows: 0 +drop table without_limit_rows; + +Affected Rows: 0 + +drop table with_limit_rows; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index 6de053f190c9..d2916e4b9322 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -2,30 +2,54 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); -Copy demo TO '/tmp/demo/export/parquet/demo.parquet'; +Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet'; + +CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index); + +insert into demo_2(host, cpu, memory, ts) values ('host3', 77.7, 1111, 1655276555000), ('host4', 99.9, 444.4, 1655276556000); + +Copy demo_2 TO '/tmp/demo/export/parquet_files/demo_2.parquet'; CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); -Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet'; +Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet'; select * from with_filename order by ts; CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); -Copy with_path FROM '/tmp/demo/export/parquet/'; +Copy with_path FROM '/tmp/demo/export/parquet_files/'; select * from with_path order by ts; CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); -Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*'); +Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*'); select * from with_pattern order by ts; +CREATE TABLE without_limit_rows(host string, cpu double, memory double, ts timestamp time index); + +Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/'; + +select count(*) from without_limit_rows; + +CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index); + +Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2); + +select count(*) from with_limit_rows; + drop table demo; +drop table demo_2; + drop table with_filename; drop table with_path; drop table with_pattern; + +drop table without_limit_rows; + +drop table with_limit_rows;