>(
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
- let values = get_buffer::(data_ref, block_offset, buffers, num_rows)?.as_ptr();
+ let bytes = get_bytes(data_ref, block_offset, buffers)?;
+ let is_aligned = check_bytes_len_and_is_aligned::
(bytes, num_rows)?;
- Ok(unsafe {
- create_array(
- data,
- num_rows,
- null_count,
- [validity, Some(values)].into_iter(),
- [].into_iter(),
- None,
- None,
- )
- })
+ let out = if is_aligned || std::mem::size_of::() <= 8 {
+ assert!(
+ is_aligned,
+ "primitive type with size <= 8 bytes should have been aligned"
+ );
+ let bytes_ptr = bytes.as_ptr();
+
+ unsafe {
+ create_array(
+ data,
+ num_rows,
+ null_count,
+ [validity, Some(bytes_ptr)].into_iter(),
+ [].into_iter(),
+ None,
+ None,
+ )
+ }
+ } else {
+ let mut values = vec![P::default(); num_rows];
+ unsafe {
+ std::ptr::copy_nonoverlapping(
+ bytes.as_ptr(),
+ values.as_mut_ptr() as *mut u8,
+ bytes.len(),
+ )
+ };
+ // Now we need to keep the new buffer alive
+ let owned_data = Arc::new((
+ // We can drop the original ref if we don't have a validity
+ validity.and(Some(data)),
+ values,
+ ));
+ let bytes_ptr = owned_data.1.as_ptr() as *mut u8;
+
+ unsafe {
+ create_array(
+ owned_data,
+ num_rows,
+ null_count,
+ [validity, Some(bytes_ptr)].into_iter(),
+ [].into_iter(),
+ None,
+ None,
+ )
+ }
+ };
+
+ Ok(out)
}
#[allow(clippy::too_many_arguments)]
@@ -482,7 +529,7 @@ fn get_array>(
match data_type.to_physical_type() {
Null => mmap_null(data, &node, block_offset, buffers),
Boolean => mmap_boolean(data, &node, block_offset, buffers),
- Primitive(p) => with_match_primitive_type!(p, |$T| {
+ Primitive(p) => with_match_primitive_type_full!(p, |$T| {
mmap_primitive::<$T, _>(data, &node, block_offset, buffers)
}),
Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers),
diff --git a/crates/polars-core/src/chunked_array/from_iterator_par.rs b/crates/polars-core/src/chunked_array/from_iterator_par.rs
index 88b135ad0405..12263053e368 100644
--- a/crates/polars-core/src/chunked_array/from_iterator_par.rs
+++ b/crates/polars-core/src/chunked_array/from_iterator_par.rs
@@ -1,5 +1,6 @@
//! Implementations of upstream traits for [`ChunkedArray`]
use std::collections::LinkedList;
+use std::sync::Mutex;
use arrow::pushable::{NoOption, Pushable};
use rayon::prelude::*;
@@ -139,81 +140,159 @@ where
}
}
-/// From trait
+pub trait FromParIterWithDtype {
+ fn from_par_iter_with_dtype(iter: I, name: &str, dtype: DataType) -> Self
+ where
+ I: IntoParallelIterator- ,
+ Self: Sized;
+}
+
+fn get_value_cap(vectors: &LinkedList>>) -> usize {
+ vectors
+ .iter()
+ .map(|list| {
+ list.iter()
+ .map(|opt_s| opt_s.as_ref().map(|s| s.len()).unwrap_or(0))
+ .sum::()
+ })
+ .sum::()
+}
+
+fn get_dtype(vectors: &LinkedList>>) -> DataType {
+ for v in vectors {
+ for s in v.iter().flatten() {
+ let dtype = s.dtype();
+ if !matches!(dtype, DataType::Null) {
+ return dtype.clone();
+ }
+ }
+ }
+ DataType::Null
+}
+
+fn materialize_list(
+ name: &str,
+ vectors: &LinkedList>>,
+ dtype: DataType,
+ value_capacity: usize,
+ list_capacity: usize,
+) -> ListChunked {
+ match &dtype {
+ #[cfg(feature = "object")]
+ DataType::Object(_, _) => {
+ let s = vectors
+ .iter()
+ .flatten()
+ .find_map(|opt_s| opt_s.as_ref())
+ .unwrap();
+ let mut builder = s.get_list_builder(name, value_capacity, list_capacity);
+
+ for v in vectors {
+ for val in v {
+ builder.append_opt_series(val.as_ref()).unwrap();
+ }
+ }
+ builder.finish()
+ },
+ dtype => {
+ let mut builder = get_list_builder(dtype, value_capacity, list_capacity, name).unwrap();
+ for v in vectors {
+ for val in v {
+ builder.append_opt_series(val.as_ref()).unwrap();
+ }
+ }
+ builder.finish()
+ },
+ }
+}
+
impl FromParallelIterator