-
Notifications
You must be signed in to change notification settings - Fork 49
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Implement COPY TO functionality for exporting data to CSV.
Extended the COPY TO command to support exporting table data to CSV files. Enhanced the planning, execution, and test modules to handle the file writing mechanics and schema handling. Updated README to mark VIEW support as implemented and added relevant tests.
- Loading branch information
Showing
7 changed files
with
257 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,215 @@ | ||
use crate::binder::copy::FileFormat; | ||
use crate::errors::DatabaseError; | ||
use crate::execution::{Executor, ReadExecutor}; | ||
use crate::planner::operator::copy_to_file::CopyToFileOperator; | ||
use crate::storage::{Iter, StatisticsMetaCache, TableCache, Transaction, ViewCache}; | ||
use crate::throw; | ||
use crate::types::tuple_builder::TupleBuilder; | ||
use std::sync::Arc; | ||
|
||
#[allow(dead_code)] | ||
pub struct CopyToFile { | ||
op: CopyToFileOperator, | ||
pub op: CopyToFileOperator, | ||
} | ||
|
||
impl From<CopyToFileOperator> for CopyToFile { | ||
fn from(op: CopyToFileOperator) -> Self { | ||
CopyToFile { op } | ||
} | ||
} | ||
|
||
impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for CopyToFile { | ||
fn execute( | ||
self, | ||
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), | ||
transaction: &'a T, | ||
) -> Executor<'a> { | ||
Box::new( | ||
#[coroutine] | ||
move || { | ||
let mut writer = throw!(self.create_writer()); | ||
|
||
let mut iter = throw!(transaction.read( | ||
cache.0, | ||
Arc::new(self.op.table.clone()), | ||
(None, None), | ||
self.op | ||
.schema_ref | ||
.iter() | ||
.enumerate() | ||
.map(|(index, column_ref)| (index, column_ref.clone())) | ||
.collect() | ||
)); | ||
|
||
while let Some(tuple) = throw!(iter.next_tuple()) { | ||
throw!(writer | ||
.write_record( | ||
tuple | ||
.values | ||
.iter() | ||
.map(|v| v.to_string()) | ||
.collect::<Vec<_>>() | ||
) | ||
.map_err(|e| DatabaseError::from(e))); | ||
} | ||
|
||
throw!(writer.flush().map_err(|e| DatabaseError::from(e))); | ||
|
||
yield Ok(TupleBuilder::build_result(format!("{}", self.op))); | ||
}, | ||
) | ||
} | ||
} | ||
|
||
impl CopyToFile { | ||
fn create_writer(&self) -> Result<csv::Writer<std::fs::File>, DatabaseError> { | ||
let mut writer = match self.op.target.format { | ||
FileFormat::Csv { | ||
delimiter, | ||
quote, | ||
header, | ||
.. | ||
} => csv::WriterBuilder::new() | ||
.delimiter(delimiter as u8) | ||
.quote(quote as u8) | ||
.has_headers(header) | ||
.from_path(self.op.target.path.clone())?, | ||
}; | ||
|
||
if let FileFormat::Csv { header: true, .. } = self.op.target.format { | ||
let headers = self | ||
.op | ||
.schema_ref | ||
.iter() | ||
.map(|c| c.name()) | ||
.collect::<Vec<_>>(); | ||
writer.write_record(headers)?; | ||
} | ||
|
||
Ok(writer) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::binder::copy::ExtSource; | ||
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnRelation, ColumnSummary}; | ||
use crate::db::DataBaseBuilder; | ||
use crate::errors::DatabaseError; | ||
use crate::storage::Storage; | ||
use crate::types::LogicalType; | ||
use sqlparser::ast::CharLengthUnits; | ||
use std::ops::{Coroutine, CoroutineState}; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use tempfile::TempDir; | ||
use ulid::Ulid; | ||
|
||
#[test] | ||
fn read_csv() -> Result<(), DatabaseError> { | ||
let columns = vec![ | ||
ColumnRef::from(ColumnCatalog::direct_new( | ||
ColumnSummary { | ||
name: "a".to_string(), | ||
relation: ColumnRelation::Table { | ||
column_id: Ulid::new(), | ||
table_name: Arc::new("t1".to_string()), | ||
is_temp: false, | ||
}, | ||
}, | ||
false, | ||
ColumnDesc::new(LogicalType::Integer, Some(0), false, None)?, | ||
false, | ||
)), | ||
ColumnRef::from(ColumnCatalog::direct_new( | ||
ColumnSummary { | ||
name: "b".to_string(), | ||
relation: ColumnRelation::Table { | ||
column_id: Ulid::new(), | ||
table_name: Arc::new("t1".to_string()), | ||
is_temp: false, | ||
}, | ||
}, | ||
false, | ||
ColumnDesc::new(LogicalType::Float, None, false, None)?, | ||
false, | ||
)), | ||
ColumnRef::from(ColumnCatalog::direct_new( | ||
ColumnSummary { | ||
name: "c".to_string(), | ||
relation: ColumnRelation::Table { | ||
column_id: Ulid::new(), | ||
table_name: Arc::new("t1".to_string()), | ||
is_temp: false, | ||
}, | ||
}, | ||
false, | ||
ColumnDesc::new( | ||
LogicalType::Varchar(Some(10), CharLengthUnits::Characters), | ||
None, | ||
false, | ||
None, | ||
)?, | ||
false, | ||
)), | ||
]; | ||
|
||
let tmp_dir = TempDir::new()?; | ||
let file_path = tmp_dir.path().join("test.csv"); | ||
|
||
let op = CopyToFileOperator { | ||
table: "t1".to_string(), | ||
target: ExtSource { | ||
path: file_path.clone(), | ||
format: FileFormat::Csv { | ||
delimiter: ',', | ||
quote: '"', | ||
escape: None, | ||
header: true, | ||
}, | ||
}, | ||
schema_ref: Arc::new(columns), | ||
}; | ||
|
||
let temp_dir = TempDir::new().unwrap(); | ||
let db = DataBaseBuilder::path(temp_dir.path()).build()?; | ||
let _ = db.run("create table t1 (a int primary key, b float, c varchar(10))"); | ||
let _ = db.run("insert into t1 values (1, 1.1, 'foo')"); | ||
let _ = db.run("insert into t1 values (2, 2.0, 'fooo')"); | ||
let _ = db.run("insert into t1 values (3, 2.1, 'fnck')"); | ||
|
||
let storage = db.storage; | ||
let mut transaction = storage.transaction()?; | ||
|
||
let executor = CopyToFile { op: op.clone() }; | ||
let mut coroutine = executor.execute( | ||
(&db.table_cache, &db.view_cache, &db.meta_cache), | ||
&mut transaction, | ||
); | ||
|
||
let tuple = match Pin::new(&mut coroutine).resume(()) { | ||
CoroutineState::Yielded(tuple) => tuple, | ||
CoroutineState::Complete(()) => unreachable!(), | ||
}?; | ||
|
||
let mut rdr = csv::Reader::from_path(file_path)?; | ||
let headers = rdr.headers()?.clone(); | ||
assert_eq!(headers, vec!["a", "b", "c"]); | ||
|
||
let mut records = rdr.records(); | ||
let record1 = records.next().unwrap()?; | ||
assert_eq!(record1, vec!["1", "1.1", "foo"]); | ||
|
||
let record2 = records.next().unwrap()?; | ||
assert_eq!(record2, vec!["2", "2.0", "fooo"]); | ||
|
||
let record3 = records.next().unwrap()?; | ||
assert_eq!(record3, vec!["3", "2.1", "fnck"]); | ||
|
||
assert!(records.next().is_none()); | ||
|
||
assert_eq!(tuple, TupleBuilder::build_result(format!("{}", op))); | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,32 @@ | ||
use crate::binder::copy::ExtSource; | ||
use crate::types::tuple::SchemaRef; | ||
use fnck_sql_serde_macros::ReferenceSerialization; | ||
use itertools::Itertools; | ||
use std::fmt; | ||
use std::fmt::Formatter; | ||
|
||
#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] | ||
pub struct CopyToFileOperator { | ||
pub source: ExtSource, | ||
pub table: String, | ||
pub target: ExtSource, | ||
pub schema_ref: SchemaRef, | ||
} | ||
|
||
impl fmt::Display for CopyToFileOperator { | ||
fn fmt(&self, f: &mut Formatter) -> fmt::Result { | ||
let columns = self | ||
.schema_ref | ||
.iter() | ||
.map(|column| column.name().to_string()) | ||
.join(", "); | ||
write!( | ||
f, | ||
"Copy {} -> {} [{}]", | ||
self.table, | ||
self.target.path.display(), | ||
columns | ||
)?; | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters