From 42b75270020b823606b6ae71bc5d379a9afe3b24 Mon Sep 17 00:00:00 2001
From: Andrew Duffy <a10y@users.noreply.github.com>
Date: Thu, 14 Nov 2024 11:42:30 -0500
Subject: [PATCH] feat: run VortexFileArrayStream on dedicated IoDispatcher
 (#1232)

This PR separates IO for the `LayoutBatchStream` from the calling
thread's runtime. This is beneficial for a few reasons

1. Now decoding into Arrow will not block the executor where IO occurs,
which previously could lead to timeouts
2. It allows us to use non-Tokio runtimes such as compio which uses
io_uring

To enable (2), we revamp the `VortexReadAt ` trait to return `!Send`
futures that can be launched on thread-per-core runtimes. We also
enforce the `VortexReadAt` implementers are clonable, again so they can
be shared across several concurrent futures. We implement a clonable
`TokioFile` type, and un-implement for things like `Vec<u8>` and `&[u8]`
that don't have cheap cloning.


Although read operations are now `!Send`, it is preferable
for`LayoutBatchStream` to be `Send` so that it can be polled from Tokio
or any other runtime. To achieve this, we add an `IoDispatcher`, which
sits in front of either a tokio or compio executor to dispatch read
requests.

A couple of things to consider:

* **Cancellation safety**: On Drop, the dispatcher will join its worker
threads, gracefully awaiting any already-submitted tasks before it
completes. It's probably more preferable to find a way to force an
immediate shutdown, so we may want to ferry a `shutdown_now` channel and
`select!` over both of them inside of the inner loop of the dispatcher
* **Async runtime safety**: The way you can implement cross-runtime IO
is that you provide both a reader (`VortexReadAt`) and a `IoDispatcher`
to the `LayoutBatchStreamBuilder`. There is nothing at compile time that
enforces that the reader and the dispatcher are compatible. Perhaps we
should parameterize `VortexReadAt` on a runtime and then we can do
type-level checks to ensure that all IO operations can only be paired
with the right runtime. That would likely inflate the PR a good bit
---
 .github/workflows/ci.yml                      |  13 +-
 .zed/tasks.json                               |  10 +
 Cargo.lock                                    | 474 +++++++++++-------
 Cargo.toml                                    |   9 +-
 bench-vortex/benches/bytes_at.rs              |   3 +
 bench-vortex/benches/compress_noci.rs         |   9 +-
 bench-vortex/src/data_downloads.rs            |   3 +-
 bench-vortex/src/reader.rs                    |   6 +-
 pyvortex/Cargo.toml                           |   2 +
 pyvortex/src/dataset.rs                       |  22 +-
 vortex-array/src/lib.rs                       |   2 +-
 vortex-serde/Cargo.toml                       |  17 +-
 vortex-serde/src/chunked_reader/mod.rs        |   4 +-
 vortex-serde/src/file/read/buffered.rs        |   5 +-
 vortex-serde/src/file/read/builder/mod.rs     |  15 +-
 .../src/file/read/layouts/columnar.rs         |   5 +-
 vortex-serde/src/file/read/layouts/flat.rs    |   6 +-
 .../src/file/read/layouts/inline_dtype.rs     |  10 +-
 .../src/file/read/layouts/test_read.rs        |   6 +-
 vortex-serde/src/file/read/mod.rs             |   8 +-
 vortex-serde/src/file/read/stream.rs          | 184 ++++---
 vortex-serde/src/file/tests.rs                |  25 +-
 vortex-serde/src/io/compio.rs                 |  73 +++
 vortex-serde/src/io/dispatcher/compio.rs      | 100 ++++
 vortex-serde/src/io/dispatcher/mod.rs         |  93 ++++
 vortex-serde/src/io/dispatcher/tokio.rs       | 142 ++++++
 vortex-serde/src/io/mod.rs                    |   9 +-
 vortex-serde/src/io/monoio.rs                 |  55 --
 vortex-serde/src/io/object_store.rs           |  51 +-
 vortex-serde/src/io/offset.rs                 |  20 +-
 vortex-serde/src/io/read.rs                   | 109 ++--
 vortex-serde/src/io/tokio.rs                  | 162 +++++-
 vortex-serde/src/lib.rs                       |  42 +-
 vortex-serde/src/messages/reader.rs           |   6 +-
 vortex-serde/src/messages/writer.rs           |   2 +-
 vortex-serde/src/stream_writer/tests.rs       |   2 +
 36 files changed, 1174 insertions(+), 530 deletions(-)
 create mode 100644 .zed/tasks.json
 create mode 100644 vortex-serde/src/io/compio.rs
 create mode 100644 vortex-serde/src/io/dispatcher/compio.rs
 create mode 100644 vortex-serde/src/io/dispatcher/mod.rs
 create mode 100644 vortex-serde/src/io/dispatcher/tokio.rs
 delete mode 100644 vortex-serde/src/io/monoio.rs

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9974c8916d..83d0074c57 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -2,15 +2,15 @@ name: CI
 
 on:
   push:
-    branches: [ "develop" ]
-  pull_request: { }
-  workflow_dispatch: { }
+    branches: ["develop"]
+  pull_request: {}
+  workflow_dispatch: {}
 
 permissions:
   actions: read
   contents: read
-  checks: write  # audit-check creates checks
-  issues: write  # audit-check creates issues
+  checks: write # audit-check creates checks
+  issues: write # audit-check creates issues
 
 env:
   CARGO_TERM_COLOR: always
@@ -140,7 +140,7 @@ jobs:
     name: "miri"
     runs-on: ubuntu-latest
     env:
-      MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-backtrace=full
+      MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-backtrace=full -Zmiri-disable-isolation
     steps:
       - uses: rui314/setup-mold@v1
       - uses: actions/checkout@v4
@@ -182,4 +182,3 @@ jobs:
       - name: "Make sure no files changed after regenerating"
         run: |
           test -z "$(git status --porcelain)"
-
diff --git a/.zed/tasks.json b/.zed/tasks.json
new file mode 100644
index 0000000000..f6c6d94929
--- /dev/null
+++ b/.zed/tasks.json
@@ -0,0 +1,10 @@
+// Static tasks configuration.
+//
+// Example:
+[
+  {
+    "label": "clippy (workspace)",
+    "command": "cargo clippy --all-targets --all-features",
+    "shell": "system"
+  }
+]
diff --git a/Cargo.lock b/Cargo.lock
index 10b5fe034a..9e569fb5f9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -40,6 +40,15 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "aligned-array"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c"
+dependencies = [
+ "generic-array",
+]
+
 [[package]]
 name = "alloc-no-stdlib"
 version = "2.0.4"
@@ -155,6 +164,12 @@ version = "0.3.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
 
+[[package]]
+name = "arrayvec"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
+
 [[package]]
 name = "arrow"
 version = "53.2.0"
@@ -376,6 +391,12 @@ dependencies = [
  "regex-syntax",
 ]
 
+[[package]]
+name = "async-task"
+version = "4.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
+
 [[package]]
 name = "async-trait"
 version = "0.1.83"
@@ -402,17 +423,6 @@ version = "1.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
 
-[[package]]
-name = "auto-const-array"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.87",
-]
-
 [[package]]
 name = "autocfg"
 version = "1.4.0"
@@ -431,7 +441,7 @@ dependencies = [
  "miniz_oxide",
  "object",
  "rustc-demangle",
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -570,9 +580,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
 
 [[package]]
 name = "cc"
-version = "1.1.37"
+version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "40545c26d092346d8a8dab71ee48e7685a7a9cba76e634790c215b41a4a7b4cf"
+checksum = "1aeb932158bd710538c73702db6945cb68a8fb08c519e6e12706b94263b36db8"
 dependencies = [
  "jobserver",
  "libc",
@@ -603,7 +613,7 @@ dependencies = [
  "num-traits",
  "serde",
  "wasm-bindgen",
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -712,6 +722,171 @@ dependencies = [
  "unicode-width 0.1.14",
 ]
 
+[[package]]
+name = "compio"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de9895d3b1b383334e6dd889618d555ecca48988cfd2be47c7ac8a98b0195c90"
+dependencies = [
+ "compio-buf",
+ "compio-driver",
+ "compio-fs",
+ "compio-io",
+ "compio-log",
+ "compio-macros",
+ "compio-net",
+ "compio-runtime",
+ "compio-signal",
+]
+
+[[package]]
+name = "compio-buf"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "efe0e1ba3af4fcf7ee139f145c3d085b5051e10c5fba16dec8ecbb4fd632a676"
+dependencies = [
+ "arrayvec",
+ "bytes",
+ "libc",
+]
+
+[[package]]
+name = "compio-driver"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a6be49fe37cd203d925e3850522a47f453b4cb98960846be5e4ebae42e26a64c"
+dependencies = [
+ "aligned-array",
+ "cfg-if",
+ "compio-buf",
+ "compio-log",
+ "crossbeam-channel",
+ "crossbeam-queue",
+ "futures-util",
+ "io-uring",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "paste",
+ "polling",
+ "socket2",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-fs"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "36f645c7bd9c1e1ce5b0ca6aa9a77ec3908d2ed9200c6708a72bccd1c3f875c8"
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "libc",
+ "os_pipe",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-io"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "db908087365769933042c157adf860e19bff5a8cdb846ec2b5dd03d0dacf7a35"
+dependencies = [
+ "compio-buf",
+ "futures-util",
+ "paste",
+]
+
+[[package]]
+name = "compio-log"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4e560213c1996b618da369b7c9109564b41af9033802ae534465c4ee4e132f"
+dependencies = [
+ "tracing",
+]
+
+[[package]]
+name = "compio-macros"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f05ed201484967dc70de77a8f7a02b29aaa8e6c81cbea2e75492ee0c8d97766b"
+dependencies = [
+ "proc-macro-crate 3.2.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.87",
+]
+
+[[package]]
+name = "compio-net"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b75d9bb79502ac1abb73df8a34e83e51efcb805038cf30c1c48827203a4c6b49"
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "either",
+ "libc",
+ "socket2",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-runtime"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b2d856e9017fdde73918cb1a2f15b6e47fe0aeb93d547201a457b12bb2da74a"
+dependencies = [
+ "async-task",
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-log",
+ "crossbeam-queue",
+ "futures-util",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "scoped-tls",
+ "smallvec",
+ "socket2",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-signal"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0cc8476edd2311b8d34cef15eddd0f81a3a9d2dc622dbefd154a39171fc6dba8"
+dependencies = [
+ "compio-buf",
+ "compio-driver",
+ "compio-runtime",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "slab",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "concurrent-queue"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "condtype"
 version = "1.3.0"
@@ -838,6 +1013,15 @@ dependencies = [
  "cc",
 ]
 
+[[package]]
+name = "crossbeam-channel"
+version = "0.5.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-deque"
 version = "0.8.5"
@@ -857,6 +1041,15 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "crossbeam-queue"
+version = "0.3.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-utils"
 version = "0.8.20"
@@ -1494,6 +1687,18 @@ dependencies = [
  "serde_derive",
 ]
 
+[[package]]
+name = "flume"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+ "nanorand",
+ "spin",
+]
+
 [[package]]
 name = "fnv"
 version = "1.0.7"
@@ -1631,15 +1836,6 @@ dependencies = [
  "slab",
 ]
 
-[[package]]
-name = "fxhash"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
-dependencies = [
- "byteorder",
-]
-
 [[package]]
 name = "generic-array"
 version = "0.14.7"
@@ -1757,7 +1953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5bdbbd5bc8c5749697ccaa352fa45aff8730cf21c68029c0eef1ffed7c3d6ba2"
 dependencies = [
  "cfg-if",
- "nix 0.29.0",
+ "nix",
  "widestring",
  "windows",
 ]
@@ -2101,11 +2297,12 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
 
 [[package]]
 name = "io-uring"
-version = "0.6.4"
+version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
+checksum = "8c9c844e08c94e8558389fb9b8944cb99fc697e231c975e4274b42bc99e0625b"
 dependencies = [
- "bitflags 1.3.2",
+ "bitflags 2.6.0",
+ "cfg-if",
  "libc",
 ]
 
@@ -2371,15 +2568,6 @@ version = "2.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
 
-[[package]]
-name = "memoffset"
-version = "0.7.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
-dependencies = [
- "autocfg",
-]
-
 [[package]]
 name = "memoffset"
 version = "0.9.1"
@@ -2413,18 +2601,6 @@ dependencies = [
  "adler2",
 ]
 
-[[package]]
-name = "mio"
-version = "0.8.11"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
-dependencies = [
- "libc",
- "log",
- "wasi",
- "windows-sys 0.48.0",
-]
-
 [[package]]
 name = "mio"
 version = "1.0.2"
@@ -2438,42 +2614,20 @@ dependencies = [
 ]
 
 [[package]]
-name = "monoio"
-version = "0.2.4"
+name = "multimap"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
-dependencies = [
- "auto-const-array",
- "bytes",
- "fxhash",
- "io-uring",
- "libc",
- "memchr",
- "mio 0.8.11",
- "monoio-macros",
- "nix 0.26.4",
- "pin-project-lite",
- "socket2",
- "windows-sys 0.48.0",
-]
+checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
 
 [[package]]
-name = "monoio-macros"
-version = "0.1.0"
+name = "nanorand"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
+checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
 dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.87",
+ "getrandom",
 ]
 
-[[package]]
-name = "multimap"
-version = "0.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
-
 [[package]]
 name = "native-tls"
 version = "0.2.12"
@@ -2491,19 +2645,6 @@ dependencies = [
  "tempfile",
 ]
 
-[[package]]
-name = "nix"
-version = "0.26.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
-dependencies = [
- "bitflags 1.3.2",
- "cfg-if",
- "libc",
- "memoffset 0.7.1",
- "pin-utils",
-]
-
 [[package]]
 name = "nix"
 version = "0.29.0"
@@ -2768,6 +2909,16 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "os_pipe"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982"
+dependencies = [
+ "libc",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "paris"
 version = "1.5.15"
@@ -2794,7 +2945,7 @@ dependencies = [
  "libc",
  "redox_syscall",
  "smallvec",
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -2968,6 +3119,21 @@ dependencies = [
  "plotters-backend",
 ]
 
+[[package]]
+name = "polling"
+version = "3.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f"
+dependencies = [
+ "cfg-if",
+ "concurrent-queue",
+ "hermit-abi 0.4.0",
+ "pin-project-lite",
+ "rustix",
+ "tracing",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "portable-atomic"
 version = "1.9.0"
@@ -3103,7 +3269,7 @@ dependencies = [
  "cfg-if",
  "indoc",
  "libc",
- "memoffset 0.9.1",
+ "memoffset",
  "once_cell",
  "portable-atomic",
  "pyo3-build-config",
@@ -3173,6 +3339,8 @@ name = "pyvortex"
 version = "0.16.0"
 dependencies = [
  "arrow",
+ "flume",
+ "futures",
  "itertools 0.13.0",
  "log",
  "object_store",
@@ -3335,9 +3503,9 @@ dependencies = [
 
 [[package]]
 name = "regex-automata"
-version = "0.4.8"
+version = "0.4.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
+checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -3575,6 +3743,12 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "scoped-tls"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
+
 [[package]]
 name = "scopeguard"
 version = "1.2.0"
@@ -3781,6 +3955,9 @@ name = "spin"
 version = "0.9.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
+dependencies = [
+ "lock_api",
+]
 
 [[package]]
 name = "sqlparser"
@@ -4109,7 +4286,7 @@ dependencies = [
  "backtrace",
  "bytes",
  "libc",
- "mio 1.0.2",
+ "mio",
  "parking_lot",
  "pin-project-lite",
  "signal-hook-registry",
@@ -4738,17 +4915,20 @@ dependencies = [
  "arrow-schema",
  "arrow-select",
  "bytes",
+ "compio",
  "criterion",
  "croaring",
  "flatbuffers",
+ "flume",
  "futures",
+ "futures-channel",
  "futures-executor",
  "futures-util",
  "itertools 0.13.0",
- "monoio",
  "object_store",
  "once_cell",
  "rstest",
+ "tempfile",
  "tokio",
  "vortex-array",
  "vortex-buffer",
@@ -4926,7 +5106,7 @@ version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
 dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -4942,7 +5122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143"
 dependencies = [
  "windows-core 0.57.0",
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -4951,7 +5131,7 @@ version = "0.52.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
 dependencies = [
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -4963,7 +5143,7 @@ dependencies = [
  "windows-implement",
  "windows-interface",
  "windows-result 0.1.2",
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -4996,7 +5176,7 @@ checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0"
 dependencies = [
  "windows-result 0.2.0",
  "windows-strings",
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -5005,7 +5185,7 @@ version = "0.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8"
 dependencies = [
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -5014,7 +5194,7 @@ version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e"
 dependencies = [
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -5024,16 +5204,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10"
 dependencies = [
  "windows-result 0.2.0",
- "windows-targets 0.52.6",
-]
-
-[[package]]
-name = "windows-sys"
-version = "0.48.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
-dependencies = [
- "windows-targets 0.48.5",
+ "windows-targets",
 ]
 
 [[package]]
@@ -5042,7 +5213,7 @@ version = "0.52.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
 dependencies = [
- "windows-targets 0.52.6",
+ "windows-targets",
 ]
 
 [[package]]
@@ -5051,22 +5222,7 @@ version = "0.59.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
 dependencies = [
- "windows-targets 0.52.6",
-]
-
-[[package]]
-name = "windows-targets"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
-dependencies = [
- "windows_aarch64_gnullvm 0.48.5",
- "windows_aarch64_msvc 0.48.5",
- "windows_i686_gnu 0.48.5",
- "windows_i686_msvc 0.48.5",
- "windows_x86_64_gnu 0.48.5",
- "windows_x86_64_gnullvm 0.48.5",
- "windows_x86_64_msvc 0.48.5",
+ "windows-targets",
 ]
 
 [[package]]
@@ -5075,46 +5231,28 @@ version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
 dependencies = [
- "windows_aarch64_gnullvm 0.52.6",
- "windows_aarch64_msvc 0.52.6",
- "windows_i686_gnu 0.52.6",
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc",
+ "windows_i686_gnu",
  "windows_i686_gnullvm",
- "windows_i686_msvc 0.52.6",
- "windows_x86_64_gnu 0.52.6",
- "windows_x86_64_gnullvm 0.52.6",
- "windows_x86_64_msvc 0.52.6",
+ "windows_i686_msvc",
+ "windows_x86_64_gnu",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc",
 ]
 
-[[package]]
-name = "windows_aarch64_gnullvm"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
-
 [[package]]
 name = "windows_aarch64_gnullvm"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
 
-[[package]]
-name = "windows_aarch64_msvc"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
-
 [[package]]
 name = "windows_aarch64_msvc"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
 
-[[package]]
-name = "windows_i686_gnu"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
-
 [[package]]
 name = "windows_i686_gnu"
 version = "0.52.6"
@@ -5127,48 +5265,24 @@ version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
 
-[[package]]
-name = "windows_i686_msvc"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
-
 [[package]]
 name = "windows_i686_msvc"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
 
-[[package]]
-name = "windows_x86_64_gnu"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
-
 [[package]]
 name = "windows_x86_64_gnu"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
 
-[[package]]
-name = "windows_x86_64_gnullvm"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
-
 [[package]]
 name = "windows_x86_64_gnullvm"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
 
-[[package]]
-name = "windows_x86_64_msvc"
-version = "0.48.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
-
 [[package]]
 name = "windows_x86_64_msvc"
 version = "0.52.6"
diff --git a/Cargo.toml b/Cargo.toml
index 91706a88a0..e01cfdf2a2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -66,6 +66,7 @@ bzip2 = "0.4.4"
 cargo_metadata = "0.18.1"
 chrono = "0.4.38"
 clap = "4.5.13"
+compio = "0.12"
 criterion = { version = "0.5.1", features = ["html_reports"] }
 croaring = "2.1.0"
 csv = "1.3.0"
@@ -83,11 +84,13 @@ fallible-iterator = "0.3.0"
 fastlanes = "0.1.5"
 flatbuffers = "24.3.25"
 flexbuffers = "2.0.0"
+flume = "0.11"
 fs_extra = "1.3.0"
 fsst-rs = "0.4.1"
-futures = { version = "0.3.30", default-features = false }
-futures-executor = "0.3.30"
-futures-util = "0.3.30"
+futures = { version = "0.3", default-features = false }
+futures-channel = "0.3"
+futures-executor = "0.3"
+futures-util = "0.3"
 getrandom = "0.2.14"
 half = { version = "^2", features = ["std", "num-traits"] }
 hashbrown = "0.15.0"
diff --git a/bench-vortex/benches/bytes_at.rs b/bench-vortex/benches/bytes_at.rs
index f0f4515e25..134f3f5aa3 100644
--- a/bench-vortex/benches/bytes_at.rs
+++ b/bench-vortex/benches/bytes_at.rs
@@ -8,6 +8,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
 use futures::executor::block_on;
 use futures::StreamExt;
 use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
+use vortex::buffer::Buffer;
 use vortex::dtype::{DType, Nullability};
 use vortex::serde::stream_reader::StreamArrayReader;
 use vortex::serde::stream_writer::StreamArrayWriter;
@@ -31,6 +32,8 @@ fn array_view_fixture() -> VarBinViewArray {
     let writer = StreamArrayWriter::new(&mut buffer);
     block_on(writer.write_array(array_data.into_array())).unwrap();
 
+    let buffer = Buffer::from(buffer);
+
     let ctx = Arc::new(Context::default());
     let reader = block_on(StreamArrayReader::try_new(Cursor::new(buffer), ctx.clone())).unwrap();
     let reader = block_on(reader.load_dtype()).unwrap();
diff --git a/bench-vortex/benches/compress_noci.rs b/bench-vortex/benches/compress_noci.rs
index dd4a55b481..b5a748d8ec 100644
--- a/bench-vortex/benches/compress_noci.rs
+++ b/bench-vortex/benches/compress_noci.rs
@@ -23,6 +23,7 @@ use parquet::file::properties::WriterProperties;
 use regex::Regex;
 use tokio::runtime::Runtime;
 use vortex::array::{ChunkedArray, StructArray};
+use vortex::buffer::Buffer;
 use vortex::dtype::field::Field;
 use vortex::error::VortexResult;
 use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
@@ -121,8 +122,8 @@ fn vortex_compress_write(
     Ok(cursor.position())
 }
 
-fn vortex_decompress_read(runtime: &Runtime, buf: Arc<Vec<u8>>) -> VortexResult<ArrayRef> {
-    async fn async_read(buf: Arc<Vec<u8>>) -> VortexResult<Array> {
+fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<ArrayRef> {
+    async fn async_read(buf: Buffer) -> VortexResult<Array> {
         let builder: VortexReadBuilder<_> = VortexReadBuilder::new(
             buf,
             LayoutDeserializer::new(
@@ -214,9 +215,9 @@ fn benchmark_compress<F, U>(
         group.bench_function(bench_name, |b| {
             let mut buf = Vec::new();
             vortex_compress_write(runtime, compressor, uncompressed.as_ref(), &mut buf).unwrap();
-            let arc = Arc::new(buf);
+            let bytes = Buffer::from(buf);
             b.iter_with_large_drop(|| {
-                black_box(vortex_decompress_read(runtime, arc.clone()).unwrap());
+                black_box(vortex_decompress_read(runtime, bytes.clone()).unwrap());
             });
         });
         group.finish();
diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs
index a2775995c1..43ab50c375 100644
--- a/bench-vortex/src/data_downloads.rs
+++ b/bench-vortex/src/data_downloads.rs
@@ -13,6 +13,7 @@ use vortex::array::ChunkedArray;
 use vortex::arrow::FromArrowType;
 use vortex::dtype::DType;
 use vortex::error::{VortexError, VortexResult};
+use vortex::serde::io::TokioAdapter;
 use vortex::serde::stream_writer::StreamArrayWriter;
 use vortex::{Array, IntoArray};
 
@@ -55,7 +56,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
         Runtime::new()
             .unwrap()
             .block_on(async move {
-                let write = tokio::fs::File::create(path).await.unwrap();
+                let write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
                 StreamArrayWriter::new(write)
                     .write_array(array)
                     .await
diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs
index d65deaadf9..87ca33ba2d 100644
--- a/bench-vortex/src/reader.rs
+++ b/bench-vortex/src/reader.rs
@@ -29,7 +29,7 @@ use vortex::dtype::DType;
 use vortex::error::VortexResult;
 use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
 use vortex::serde::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
-use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt, VortexWrite};
+use vortex::serde::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
 use vortex::{Array, IntoArray, IntoCanonical};
 
 pub const BATCH_SIZE: usize = 65_536;
@@ -42,7 +42,7 @@ pub struct VortexFooter {
 }
 
 pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
-    let file = tokio::fs::File::open(path).await.unwrap();
+    let file = TokioFile::open(path).unwrap();
 
     VortexReadBuilder::new(
         file,
@@ -138,7 +138,7 @@ pub async fn take_vortex_object_store(
 }
 
 pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Array> {
-    take_vortex(tokio::fs::File::open(path).await?, indices).await
+    take_vortex(TokioFile::open(path)?, indices).await
 }
 
 pub async fn take_parquet_object_store(
diff --git a/pyvortex/Cargo.toml b/pyvortex/Cargo.toml
index fcbada1e34..77f5afe61d 100644
--- a/pyvortex/Cargo.toml
+++ b/pyvortex/Cargo.toml
@@ -24,6 +24,8 @@ doctest = false
 
 [dependencies]
 arrow = { workspace = true, features = ["pyarrow"] }
+flume = { workspace = true }
+futures = { workspace = true }
 itertools = { workspace = true }
 log = { workspace = true }
 object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] }
diff --git a/pyvortex/src/dataset.rs b/pyvortex/src/dataset.rs
index d7dd539188..4dcedf3eb6 100644
--- a/pyvortex/src/dataset.rs
+++ b/pyvortex/src/dataset.rs
@@ -1,4 +1,3 @@
-use std::path::Path;
 use std::sync::Arc;
 
 use arrow::array::RecordBatchReader;
@@ -7,7 +6,6 @@ use arrow::pyarrow::{IntoPyArrow, ToPyArrow};
 use pyo3::exceptions::PyTypeError;
 use pyo3::prelude::*;
 use pyo3::types::{PyLong, PyString};
-use tokio::fs::File;
 use vortex::arrow::infer_schema;
 use vortex::dtype::field::Field;
 use vortex::dtype::DType;
@@ -17,7 +15,7 @@ use vortex::serde::file::{
     read_initial_bytes, LayoutContext, LayoutDeserializer, Projection, RowFilter,
     VortexFileArrayStream, VortexReadBuilder, VortexRecordBatchReader,
 };
-use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt};
+use vortex::serde::io::{ObjectStoreReadAt, TokioFile, VortexReadAt};
 use vortex::Array;
 
 use crate::expr::PyExpr;
@@ -105,20 +103,16 @@ fn row_filter_from_python(row_filter: Option<&Bound<PyExpr>>) -> Option<RowFilte
 
 #[pyclass(name = "TokioFileDataset", module = "io")]
 pub struct TokioFileDataset {
-    path: String,
+    file: TokioFile,
     schema: SchemaRef,
 }
 
 impl TokioFileDataset {
-    async fn file(&self) -> VortexResult<File> {
-        Ok(File::open(Path::new(&self.path)).await?)
-    }
-
     pub async fn try_new(path: String) -> VortexResult<Self> {
-        let file = File::open(Path::new(&path)).await?;
-        let schema = Arc::new(infer_schema(&read_dtype_from_reader(&file).await?)?);
+        let file = TokioFile::open(path)?;
+        let schema = Arc::new(infer_schema(&read_dtype_from_reader(file.clone()).await?)?);
 
-        Ok(Self { path, schema })
+        Ok(Self { file, schema })
     }
 
     async fn async_to_array(
@@ -128,7 +122,7 @@ impl TokioFileDataset {
         indices: Option<&PyArray>,
     ) -> PyResult<PyArray> {
         let inner = read_array_from_reader(
-            self.file().await?,
+            self.file.clone(),
             projection_from_python(columns)?,
             row_filter_from_python(row_filter),
             indices.map(PyArray::unwrap).cloned(),
@@ -144,7 +138,7 @@ impl TokioFileDataset {
         indices: Option<&PyArray>,
     ) -> PyResult<PyObject> {
         let layout_reader = layout_stream_from_reader(
-            self_.file().await?,
+            self_.file.clone(),
             projection_from_python(columns)?,
             row_filter_from_python(row_filter),
             indices.map(PyArray::unwrap).cloned(),
@@ -200,7 +194,7 @@ impl ObjectStoreUrlDataset {
 
     pub async fn try_new(url: String) -> VortexResult<Self> {
         let reader = vortex_read_at_from_url(&url).await?;
-        let schema = Arc::new(infer_schema(&read_dtype_from_reader(&reader).await?)?);
+        let schema = Arc::new(infer_schema(&read_dtype_from_reader(reader).await?)?);
 
         Ok(Self { url, schema })
     }
diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs
index 45054e5ddf..9e74a440ab 100644
--- a/vortex-array/src/lib.rs
+++ b/vortex-array/src/lib.rs
@@ -52,7 +52,7 @@ pub mod iter;
 mod metadata;
 pub mod stats;
 pub mod stream;
-mod tree;
+pub mod tree;
 mod typed;
 pub mod validity;
 pub mod variants;
diff --git a/vortex-serde/Cargo.toml b/vortex-serde/Cargo.toml
index fed8e9e3f0..f363d785a2 100644
--- a/vortex-serde/Cargo.toml
+++ b/vortex-serde/Cargo.toml
@@ -19,15 +19,21 @@ arrow-buffer = { workspace = true }
 arrow-schema = { workspace = true }
 bytes = { workspace = true }
 croaring = { workspace = true }
+compio = { workspace = true, features = ["bytes", "macros"], optional = true }
 flatbuffers = { workspace = true }
-futures = { workspace = true }
+flume = { workspace = true }
+futures = { workspace = true, features = ["std"] }
+futures-channel = { workspace = true }
 futures-executor = { workspace = true }
 futures-util = { workspace = true }
 itertools = { workspace = true }
-monoio = { workspace = true, optional = true, features = ["bytes"] }
 object_store = { workspace = true, optional = true }
 once_cell = { workspace = true }
-tokio = { workspace = true, features = ["io-util", "fs", "rt-multi-thread"], optional = true }
+tokio = { workspace = true, features = [
+    "io-util",
+    "fs",
+    "rt-multi-thread",
+], optional = true }
 vortex-array = { workspace = true }
 vortex-buffer = { workspace = true }
 vortex-dtype = { workspace = true, features = ["flatbuffers"] }
@@ -44,6 +50,7 @@ arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
 criterion = { workspace = true, features = ["async_futures"] }
 rstest = { workspace = true }
+tempfile = { workspace = true }
 tokio = { workspace = true, features = ["full"] }
 vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
 
@@ -51,9 +58,9 @@ vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
 workspace = true
 
 [features]
-default = ["futures", "monoio", "tokio"]
+default = ["futures", "tokio", "compio"]
 futures = ["futures-util/io"]
-monoio = ["dep:monoio"]
+compio = ["dep:compio"]
 tokio = ["dep:tokio"]
 object_store = ["dep:object_store", "vortex-error/object_store"]
 
diff --git a/vortex-serde/src/chunked_reader/mod.rs b/vortex-serde/src/chunked_reader/mod.rs
index 227272fabb..730535da12 100644
--- a/vortex-serde/src/chunked_reader/mod.rs
+++ b/vortex-serde/src/chunked_reader/mod.rs
@@ -52,8 +52,10 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
         Ok(())
     }
 
+    // Making a new ArrayStream requires us to clone the reader to make
+    // multiple streams that can each use the reader.
     pub async fn array_stream(&mut self) -> impl ArrayStream + '_ {
-        let mut cursor = Cursor::new(&self.read);
+        let mut cursor = Cursor::new(self.read.clone());
         let byte_offset = scalar_at(&self.byte_offsets, 0)
             .and_then(|s| u64::try_from(&s))
             .vortex_expect("Failed to convert byte_offset to u64");
diff --git a/vortex-serde/src/file/read/buffered.rs b/vortex-serde/src/file/read/buffered.rs
index a417e0a619..4c84bf3bae 100644
--- a/vortex-serde/src/file/read/buffered.rs
+++ b/vortex-serde/src/file/read/buffered.rs
@@ -6,8 +6,7 @@ use vortex_array::{Array, ArrayDType, IntoArray};
 use vortex_error::VortexResult;
 
 use crate::file::read::mask::RowMask;
-use crate::file::read::{BatchRead, LayoutReader};
-use crate::file::Message;
+use crate::file::read::{BatchRead, LayoutReader, MessageLocator};
 
 pub type RangedLayoutReader = ((usize, usize), Box<dyn LayoutReader>);
 
@@ -27,7 +26,7 @@ impl BufferedLayoutReader {
     }
 
     // TODO(robert): Support out of order reads
-    fn buffer_read(&mut self, mask: &RowMask) -> VortexResult<Option<Vec<Message>>> {
+    fn buffer_read(&mut self, mask: &RowMask) -> VortexResult<Option<Vec<MessageLocator>>> {
         while let Some(((begin, end), layout)) = self.layouts.pop_front() {
             if mask.begin() <= begin && begin < mask.end()
                 || mask.begin() < end && end <= mask.end()
diff --git a/vortex-serde/src/file/read/builder/mod.rs b/vortex-serde/src/file/read/builder/mod.rs
index e69bac39a4..0fb573945a 100644
--- a/vortex-serde/src/file/read/builder/mod.rs
+++ b/vortex-serde/src/file/read/builder/mod.rs
@@ -13,7 +13,7 @@ use crate::file::read::context::LayoutDeserializer;
 use crate::file::read::filtering::RowFilter;
 use crate::file::read::stream::VortexFileArrayStream;
 use crate::file::read::{RowMask, Scan};
-use crate::io::VortexReadAt;
+use crate::io::{IoDispatcher, VortexReadAt};
 
 pub(crate) mod initial_read;
 
@@ -70,6 +70,7 @@ pub struct VortexReadBuilder<R> {
     size: Option<u64>,
     row_mask: Option<Array>,
     row_filter: Option<RowFilter>,
+    io_dispatcher: Option<IoDispatcher>,
 }
 
 impl<R: VortexReadAt> VortexReadBuilder<R> {
@@ -81,6 +82,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
             size: None,
             row_mask: None,
             row_filter: None,
+            io_dispatcher: None,
         }
     }
 
@@ -109,6 +111,11 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
         self
     }
 
+    pub fn with_io_dispatcher(mut self, dispatcher: IoDispatcher) -> Self {
+        self.io_dispatcher = Some(dispatcher);
+        self
+    }
+
     pub async fn build(self) -> VortexResult<VortexFileArrayStream<R>> {
         // we do a large enough initial read to get footer, layout, and schema
         let initial_read = read_initial_bytes(&self.read_at, self.size().await).await?;
@@ -165,6 +172,11 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
             })
             .transpose()?;
 
+        // Default: fallback to single-threaded tokio dispatcher.
+        let io_dispatcher = self
+            .io_dispatcher
+            .unwrap_or_else(|| IoDispatcher::new_tokio(1));
+
         Ok(VortexFileArrayStream::new(
             self.read_at,
             layout_reader,
@@ -173,6 +185,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
             projected_dtype,
             row_count,
             row_mask,
+            io_dispatcher,
         ))
     }
 
diff --git a/vortex-serde/src/file/read/layouts/columnar.rs b/vortex-serde/src/file/read/layouts/columnar.rs
index 311944a0df..a9d3558825 100644
--- a/vortex-serde/src/file/read/layouts/columnar.rs
+++ b/vortex-serde/src/file/read/layouts/columnar.rs
@@ -262,6 +262,7 @@ mod tests {
     use vortex_array::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray};
     use vortex_array::validity::Validity;
     use vortex_array::{ArrayDType, IntoArray, IntoArrayVariant};
+    use vortex_buffer::Buffer;
     use vortex_dtype::field::Field;
     use vortex_dtype::{DType, Nullability};
     use vortex_expr::{BinaryExpr, Column, Literal, Operator};
@@ -299,7 +300,7 @@ mod tests {
 
         let mut writer = VortexFileWriter::new(Vec::new());
         writer = writer.write_array_columns(struct_arr).await.unwrap();
-        let written = writer.finalize().await.unwrap();
+        let written = Buffer::from(writer.finalize().await.unwrap());
 
         let initial_read = read_initial_bytes(&written, written.len() as u64)
             .await
@@ -322,7 +323,7 @@ mod tests {
                 RelativeLayoutCache::new(cache.clone(), dtype),
             )
             .unwrap(),
-            Bytes::from(written),
+            Bytes::copy_from_slice(&written),
             len,
         )
     }
diff --git a/vortex-serde/src/file/read/layouts/flat.rs b/vortex-serde/src/file/read/layouts/flat.rs
index 329eaaa419..c58a22161b 100644
--- a/vortex-serde/src/file/read/layouts/flat.rs
+++ b/vortex-serde/src/file/read/layouts/flat.rs
@@ -9,7 +9,7 @@ use vortex_flatbuffers::footer;
 use crate::file::read::cache::RelativeLayoutCache;
 use crate::file::read::mask::RowMask;
 use crate::file::{
-    BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, Scan,
+    BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, MessageLocator, Scan,
     FLAT_LAYOUT_ID,
 };
 use crate::messages::reader::ArrayMessageReader;
@@ -73,8 +73,8 @@ impl FlatLayout {
         }
     }
 
-    fn own_message(&self) -> Message {
-        (self.message_cache.absolute_id(&[]), self.range)
+    fn own_message(&self) -> MessageLocator {
+        MessageLocator(self.message_cache.absolute_id(&[]), self.range)
     }
 
     fn array_from_bytes(&self, mut buf: Bytes) -> VortexResult<Array> {
diff --git a/vortex-serde/src/file/read/layouts/inline_dtype.rs b/vortex-serde/src/file/read/layouts/inline_dtype.rs
index 20fe901e96..161ec2f7ea 100644
--- a/vortex-serde/src/file/read/layouts/inline_dtype.rs
+++ b/vortex-serde/src/file/read/layouts/inline_dtype.rs
@@ -10,8 +10,8 @@ use vortex_flatbuffers::{footer, message};
 use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
 use crate::file::read::mask::RowMask;
 use crate::file::{
-    BatchRead, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LayoutSpec, Message, Scan,
-    INLINE_SCHEMA_LAYOUT_ID,
+    BatchRead, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LayoutSpec,
+    MessageLocator, Scan, INLINE_SCHEMA_LAYOUT_ID,
 };
 use crate::messages::reader::MESSAGE_PREFIX_LENGTH;
 use crate::stream_writer::ByteRange;
@@ -53,12 +53,12 @@ pub struct InlineDTypeLayout {
 }
 
 enum DTypeReadResult {
-    ReadMore(Vec<Message>),
+    ReadMore(Vec<MessageLocator>),
     DType(DType),
 }
 
 enum ChildReaderResult {
-    ReadMore(Vec<Message>),
+    ReadMore(Vec<MessageLocator>),
     Reader(Box<dyn LayoutReader>),
 }
 
@@ -106,7 +106,7 @@ impl InlineDTypeLayout {
                 vortex_bail!("Missing buffers for inline dtype layout")
             }
             let dtype_buf = buffers.get(0);
-            Ok(DTypeReadResult::ReadMore(vec![(
+            Ok(DTypeReadResult::ReadMore(vec![MessageLocator(
                 self.message_cache.absolute_id(&[INLINE_DTYPE_BUFFER_IDX]),
                 ByteRange::new(dtype_buf.begin(), dtype_buf.end()),
             )]))
diff --git a/vortex-serde/src/file/read/layouts/test_read.rs b/vortex-serde/src/file/read/layouts/test_read.rs
index ceb5429eab..ba4944ea19 100644
--- a/vortex-serde/src/file/read/layouts/test_read.rs
+++ b/vortex-serde/src/file/read/layouts/test_read.rs
@@ -8,7 +8,7 @@ use vortex_array::Array;
 use vortex_error::VortexUnwrap;
 
 use crate::file::read::mask::RowMask;
-use crate::file::{BatchRead, LayoutMessageCache, LayoutReader};
+use crate::file::{BatchRead, LayoutMessageCache, LayoutReader, MessageLocator};
 
 pub fn layout_splits(layout: &mut dyn LayoutReader, length: usize) -> Vec<RowMask> {
     let mut splits = BTreeSet::new();
@@ -33,7 +33,7 @@ pub fn read_layout_data(
         match rr {
             BatchRead::ReadMore(m) => {
                 let mut write_cache_guard = cache.write().unwrap();
-                for (id, range) in m {
+                for MessageLocator(id, range) in m {
                     write_cache_guard.set(id, buf.slice(range.to_range()));
                 }
             }
@@ -53,7 +53,7 @@ pub fn read_filters(
         match rr {
             BatchRead::ReadMore(m) => {
                 let mut write_cache_guard = cache.write().unwrap();
-                for (id, range) in m {
+                for MessageLocator(id, range) in m {
                     write_cache_guard.set(id, buf.slice(range.to_range()));
                 }
             }
diff --git a/vortex-serde/src/file/read/mod.rs b/vortex-serde/src/file/read/mod.rs
index 95a669ba43..a9587480da 100644
--- a/vortex-serde/src/file/read/mod.rs
+++ b/vortex-serde/src/file/read/mod.rs
@@ -49,12 +49,14 @@ impl Scan {
 pub type LayoutPartId = u16;
 /// Path through layout tree to given message
 pub type MessageId = Vec<LayoutPartId>;
-/// ID and Range of atomic element of the file
-pub type Message = (MessageId, ByteRange);
+/// A unique locator for a message, including its ID and byte range containing
+/// the message contents.
+#[derive(Debug, Clone)]
+pub struct MessageLocator(pub MessageId, pub ByteRange);
 
 #[derive(Debug)]
 pub enum BatchRead {
-    ReadMore(Vec<Message>),
+    ReadMore(Vec<MessageLocator>),
     Batch(Array),
 }
 
diff --git a/vortex-serde/src/file/read/stream.rs b/vortex-serde/src/file/read/stream.rs
index 48fb15dd4c..23f2187fa2 100644
--- a/vortex-serde/src/file/read/stream.rs
+++ b/vortex-serde/src/file/read/stream.rs
@@ -5,8 +5,8 @@ use std::sync::{Arc, RwLock};
 use std::task::{Context, Poll};
 
 use bytes::{Bytes, BytesMut};
+use futures::future::BoxFuture;
 use futures::Stream;
-use futures_util::future::BoxFuture;
 use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
 use itertools::Itertools;
 use vortex_array::array::ChunkedArray;
@@ -19,15 +19,16 @@ use vortex_schema::Schema;
 
 use crate::file::read::cache::LayoutMessageCache;
 use crate::file::read::mask::RowMask;
-use crate::file::read::{BatchRead, LayoutReader, MessageId};
-use crate::io::VortexReadAt;
-use crate::stream_writer::ByteRange;
+use crate::file::read::{BatchRead, LayoutReader, MessageId, MessageLocator};
+use crate::io::{Dispatch, IoDispatcher, VortexReadAt};
 
-/// Reads a layout from some memory, on-disk or elsewhere.
+/// An asynchronous Vortex file that returns a [`Stream`] of [`Array`]s.
 ///
-/// Instead of using [`VortexFileArrayStream::new`], use a
-/// [VortexReadBuilder][crate::file::read::builder::VortexReadBuilder] to create an instance of
-/// this struct.
+/// The file may be read from any source implementing [`VortexReadAt`], such
+/// as memory, disk, and object storage.
+///
+/// Use [VortexReadBuilder][crate::file::read::builder::VortexReadBuilder] to build one
+/// from a reader.
 pub struct VortexFileArrayStream<R> {
     dtype: DType,
     row_count: u64,
@@ -35,11 +36,14 @@ pub struct VortexFileArrayStream<R> {
     messages_cache: Arc<RwLock<LayoutMessageCache>>,
     splits: VecDeque<(usize, usize)>,
     row_mask: Option<RowMask>,
-    state: Option<StreamingState<R>>,
+    state: Option<StreamingState>,
+    input: R,
+    dispatcher: IoDispatcher,
 }
 
 impl<R: VortexReadAt> VortexFileArrayStream<R> {
-    pub fn new(
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
         input: R,
         layout_reader: Box<dyn LayoutReader>,
         filter_reader: Option<Box<dyn LayoutReader>>,
@@ -47,6 +51,7 @@ impl<R: VortexReadAt> VortexFileArrayStream<R> {
         dtype: DType,
         row_count: u64,
         row_mask: Option<RowMask>,
+        dispatcher: IoDispatcher,
     ) -> Self {
         VortexFileArrayStream {
             dtype,
@@ -55,7 +60,9 @@ impl<R: VortexReadAt> VortexFileArrayStream<R> {
             messages_cache,
             splits: VecDeque::new(),
             row_mask,
-            state: Some(StreamingState::AddSplits(input, filter_reader)),
+            state: Some(StreamingState::AddSplits(filter_reader)),
+            input,
+            dispatcher,
         }
     }
 
@@ -67,59 +74,58 @@ impl<R: VortexReadAt> VortexFileArrayStream<R> {
         Schema::new(self.dtype.clone())
     }
 
-    fn store_messages(&self, messages: Vec<(MessageId, Bytes)>) {
+    fn store_messages(&self, messages: Vec<Message>) {
         let mut write_cache_guard = self
             .messages_cache
             .write()
             .unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}"));
-        for (message_id, buf) in messages {
+        for Message(message_id, buf) in messages {
             write_cache_guard.set(message_id, buf);
         }
     }
 }
 
-type StreamMessages = Vec<(MessageId, Bytes)>;
-type StreamStateFuture<R> = BoxFuture<'static, VortexResult<(R, StreamMessages)>>;
+/// A message that has had its bytes materialized onto the heap.
+#[derive(Debug, Clone)]
+struct Message(pub MessageId, pub Bytes);
+
+type StreamMessages = Vec<Message>;
+type StreamStateFuture = BoxFuture<'static, VortexResult<StreamMessages>>;
 
-enum ReadingFor<R> {
-    Read(StreamStateFuture<R>, RowMask, Option<LayoutReaderRef>),
-    Filter(StreamStateFuture<R>, RowMask, Box<dyn LayoutReader>),
+enum ReadingFor {
+    Read(StreamStateFuture, RowMask, Option<LayoutReaderRef>),
+    Filter(StreamStateFuture, RowMask, Box<dyn LayoutReader>),
 }
 
-enum ReadingPoll<R> {
-    Ready(StreamingState<R>, StreamMessages),
-    Pending(ReadingFor<R>),
+enum ReadingPoll {
+    Ready(StreamingState, StreamMessages),
+    Pending(ReadingFor),
 }
 
-impl<R> ReadingFor<R> {
-    fn future(&mut self) -> &mut StreamStateFuture<R> {
+impl ReadingFor {
+    fn future(&mut self) -> &mut StreamStateFuture {
         match self {
             ReadingFor::Read(future, ..) => future,
             ReadingFor::Filter(future, ..) => future,
         }
     }
 
-    fn into_streaming_state(self, input: R) -> StreamingState<R> {
+    fn into_streaming_state(self) -> StreamingState {
         match self {
             ReadingFor::Read(.., row_mask, filter_reader) => {
-                StreamingState::Read(input, row_mask, filter_reader)
-            }
-            ReadingFor::Filter(.., row_mask, reader) => {
-                StreamingState::Filter(input, row_mask, reader)
+                StreamingState::Read(row_mask, filter_reader)
             }
+            ReadingFor::Filter(.., row_mask, reader) => StreamingState::Filter(row_mask, reader),
         }
     }
 
-    fn poll_unpin(mut self, cx: &mut Context) -> VortexResult<ReadingPoll<R>> {
-        let (input, messages) = match self.future().poll_unpin(cx) {
+    fn poll_unpin(mut self, cx: &mut Context) -> VortexResult<ReadingPoll> {
+        let messages = match self.future().poll_unpin(cx) {
             Poll::Pending => return Ok(ReadingPoll::Pending(self)),
             Poll::Ready(Err(err)) => return Err(err),
             Poll::Ready(Ok(x)) => x,
         };
-        Ok(ReadingPoll::Ready(
-            self.into_streaming_state(input),
-            messages,
-        ))
+        Ok(ReadingPoll::Ready(self.into_streaming_state(), messages))
     }
 }
 
@@ -131,47 +137,47 @@ type LayoutReaderRef = Box<dyn LayoutReader>;
 /// Main read loop goes from `NextSplit` -> `Filter` (if there's filter) -> `Read`
 /// `Filter` and `Read` states transition to `Reading` when they're blocked on an io operation which resumes back to
 /// the previous state.
-enum StreamingState<R> {
-    AddSplits(R, Option<LayoutReaderRef>),
-    NextSplit(R, Option<LayoutReaderRef>),
-    Filter(R, RowMask, LayoutReaderRef),
-    Read(R, RowMask, Option<LayoutReaderRef>),
-    Reading(ReadingFor<R>),
+enum StreamingState {
+    AddSplits(Option<LayoutReaderRef>),
+    NextSplit(Option<LayoutReaderRef>),
+    Filter(RowMask, LayoutReaderRef),
+    Read(RowMask, Option<LayoutReaderRef>),
+    Reading(ReadingFor),
     EndOfStream,
     Error,
 }
 
-enum StreamingTransition<R> {
-    GoTo(StreamingState<R>),
-    YieldTo(StreamingState<R>),
-    Produce(StreamingState<R>, Array),
+enum StreamingTransition {
+    GoTo(StreamingState),
+    YieldTo(StreamingState),
+    Produce(StreamingState, Array),
     Finished,
 }
 
-fn goto<R>(next_state: StreamingState<R>) -> VortexResult<StreamingTransition<R>> {
+fn goto(next_state: StreamingState) -> VortexResult<StreamingTransition> {
     Ok(StreamingTransition::GoTo(next_state))
 }
 
-fn yield_to<R>(next_state: StreamingState<R>) -> VortexResult<StreamingTransition<R>> {
+fn yield_to(next_state: StreamingState) -> VortexResult<StreamingTransition> {
     Ok(StreamingTransition::YieldTo(next_state))
 }
 
-fn produce<R>(next_state: StreamingState<R>, array: Array) -> VortexResult<StreamingTransition<R>> {
+fn produce(next_state: StreamingState, array: Array) -> VortexResult<StreamingTransition> {
     Ok(StreamingTransition::Produce(next_state, array))
 }
 
-fn finished<R>() -> VortexResult<StreamingTransition<R>> {
+fn finished() -> VortexResult<StreamingTransition> {
     Ok(StreamingTransition::Finished)
 }
 
-impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
+impl<R: VortexReadAt + Unpin> VortexFileArrayStream<R> {
     fn step(
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
-        current_state: StreamingState<R>,
-    ) -> VortexResult<StreamingTransition<R>> {
+        current_state: StreamingState,
+    ) -> VortexResult<StreamingTransition> {
         match current_state {
-            StreamingState::AddSplits(input, filter_reader) => {
+            StreamingState::AddSplits(filter_reader) => {
                 let mut splits = BTreeSet::new();
                 splits.insert(self.row_count as usize);
                 if let Some(filter_reader) = &filter_reader {
@@ -180,9 +186,9 @@ impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
                 self.layout_reader.as_mut().add_splits(0, &mut splits)?;
                 self.splits
                     .extend(splits.into_iter().tuple_windows::<(usize, usize)>());
-                goto(StreamingState::NextSplit(input, filter_reader))
+                goto(StreamingState::NextSplit(filter_reader))
             }
-            StreamingState::NextSplit(input, filter_reader) => {
+            StreamingState::NextSplit(filter_reader) => {
                 let Some((begin, end)) = self.splits.pop_front() else {
                     return finished();
                 };
@@ -193,35 +199,29 @@ impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
                     .map(|row_mask| row_mask.slice(begin, end).is_empty())
                     .unwrap_or(false);
                 if row_mask_removes_all_rows {
-                    return goto(StreamingState::NextSplit(input, filter_reader));
+                    return goto(StreamingState::NextSplit(filter_reader));
                 }
 
                 let mut split_mask = RowMask::new_valid_between(begin, end);
                 match filter_reader {
-                    Some(filter_reader) => {
-                        goto(StreamingState::Filter(input, split_mask, filter_reader))
-                    }
+                    Some(filter_reader) => goto(StreamingState::Filter(split_mask, filter_reader)),
                     None => {
                         if let Some(row_mask) = &self.row_mask {
                             split_mask.and_inplace(&row_mask.slice(begin, end))?;
                         };
 
-                        goto(StreamingState::Read(input, split_mask, filter_reader))
+                        goto(StreamingState::Read(split_mask, filter_reader))
                     }
                 }
             }
-            StreamingState::Filter(input, split_mask, mut filter_reader) => {
+            StreamingState::Filter(split_mask, mut filter_reader) => {
                 let sel_begin = split_mask.begin();
                 let sel_end = split_mask.end();
 
                 match filter_reader.as_mut().read_selection(&split_mask)? {
-                    Some(BatchRead::ReadMore(messages)) => {
-                        goto(StreamingState::Reading(ReadingFor::Filter(
-                            read_ranges(input, messages).boxed(),
-                            split_mask,
-                            filter_reader,
-                        )))
-                    }
+                    Some(BatchRead::ReadMore(messages)) => goto(StreamingState::Reading(
+                        ReadingFor::Filter(self.read_ranges(messages), split_mask, filter_reader),
+                    )),
                     Some(BatchRead::Batch(mut batch)) => {
                         if let Some(row_mask) = &self.row_mask {
                             // Either `and` or `and_kleene` is fine. They only differ on `false AND
@@ -238,22 +238,21 @@ impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
                             .vortex_expect("must be a bool array if it's a result of a filter")
                             == 0
                         {
-                            goto(StreamingState::NextSplit(input, Some(filter_reader)))
+                            goto(StreamingState::NextSplit(Some(filter_reader)))
                         } else {
                             goto(StreamingState::Read(
-                                input,
                                 RowMask::from_mask_array(&batch, sel_begin, sel_end)?,
                                 Some(filter_reader),
                             ))
                         }
                     }
-                    None => goto(StreamingState::NextSplit(input, Some(filter_reader))),
+                    None => goto(StreamingState::NextSplit(Some(filter_reader))),
                 }
             }
-            StreamingState::Read(input, selector, filter_reader) => {
+            StreamingState::Read(selector, filter_reader) => {
                 match self.layout_reader.read_selection(&selector)? {
-                    Some(BatchRead::ReadMore(messages)) => {
-                        let read_future = read_ranges(input, messages).boxed();
+                    Some(BatchRead::ReadMore(message_ranges)) => {
+                        let read_future = self.read_ranges(message_ranges);
                         goto(StreamingState::Reading(ReadingFor::Read(
                             read_future,
                             selector,
@@ -261,9 +260,9 @@ impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
                         )))
                     }
                     Some(BatchRead::Batch(array)) => {
-                        produce(StreamingState::NextSplit(input, filter_reader), array)
+                        produce(StreamingState::NextSplit(filter_reader), array)
                     }
-                    None => goto(StreamingState::NextSplit(input, filter_reader)),
+                    None => goto(StreamingState::NextSplit(filter_reader)),
                 }
             }
             StreamingState::Reading(reading_state) => match reading_state.poll_unpin(cx)? {
@@ -279,9 +278,31 @@ impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
             StreamingState::EndOfStream => finished(),
         }
     }
+
+    /// Schedule an asynchronous read of several byte ranges.
+    ///
+    /// IO is scheduled on the provided IO dispatcher.
+    fn read_ranges(
+        &self,
+        ranges: Vec<MessageLocator>,
+    ) -> BoxFuture<'static, VortexResult<StreamMessages>> {
+        let reader = self.input.clone();
+
+        let result_rx = self
+            .dispatcher
+            .dispatch(move || async move { read_ranges(reader, ranges).await })
+            .vortex_expect("dispatch async task");
+
+        result_rx
+            .map(|res| match res {
+                Ok(result) => result,
+                Err(e) => vortex_bail!("dispatcher channel canceled: {e}"),
+            })
+            .boxed()
+    }
 }
 
-impl<R: VortexReadAt + Unpin + 'static> Stream for VortexFileArrayStream<R> {
+impl<R: VortexReadAt + Unpin> Stream for VortexFileArrayStream<R> {
     type Item = VortexResult<Array>;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -313,7 +334,7 @@ impl<R: VortexReadAt + Unpin + 'static> Stream for VortexFileArrayStream<R> {
     }
 }
 
-impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
+impl<R: VortexReadAt + Unpin> VortexFileArrayStream<R> {
     pub async fn read_all(self) -> VortexResult<Array> {
         let dtype = self.dtype().clone();
         let vecs: Vec<Array> = self.try_collect().await?;
@@ -331,10 +352,10 @@ impl<R: VortexReadAt + Unpin + 'static> VortexFileArrayStream<R> {
 
 async fn read_ranges<R: VortexReadAt>(
     reader: R,
-    ranges: Vec<(MessageId, ByteRange)>,
-) -> VortexResult<(R, Vec<(MessageId, Bytes)>)> {
+    ranges: Vec<MessageLocator>,
+) -> VortexResult<Vec<Message>> {
     stream::iter(ranges.into_iter())
-        .map(|(id, range)| {
+        .map(|MessageLocator(id, range)| {
             let mut buf = BytesMut::with_capacity(range.len());
             unsafe { buf.set_len(range.len()) }
 
@@ -342,12 +363,11 @@ async fn read_ranges<R: VortexReadAt>(
 
             read_ft.map(|result| {
                 result
-                    .map(|res| (id, res.freeze()))
+                    .map(|res| Message(id, res.freeze()))
                     .map_err(VortexError::from)
             })
         })
         .buffered(10)
         .try_collect()
         .await
-        .map(|b| (reader, b))
 }
diff --git a/vortex-serde/src/file/tests.rs b/vortex-serde/src/file/tests.rs
index 6abeda512e..d627c6860e 100644
--- a/vortex-serde/src/file/tests.rs
+++ b/vortex-serde/src/file/tests.rs
@@ -8,6 +8,7 @@ use vortex_array::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray
 use vortex_array::validity::Validity;
 use vortex_array::variants::{PrimitiveArrayTrait, StructArrayTrait};
 use vortex_array::{Array, ArrayDType, IntoArray, IntoArrayVariant};
+use vortex_buffer::Buffer;
 use vortex_dtype::field::Field;
 use vortex_dtype::{DType, Nullability, PType, StructDType};
 use vortex_error::vortex_panic;
@@ -47,7 +48,7 @@ async fn test_read_simple() {
     let buf = Vec::new();
     let mut writer = VortexFileWriter::new(buf);
     writer = writer.write_array_columns(st.into_array()).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     let mut stream = VortexReadBuilder::new(written, LayoutDeserializer::default())
         .build()
@@ -88,7 +89,7 @@ async fn test_splits() {
     let buf = Vec::new();
     let mut writer = VortexFileWriter::new(buf);
     writer = writer.write_array_columns(st.into_array()).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     let initial_read = read_initial_bytes(&written, written.len() as u64)
         .await
@@ -135,7 +136,7 @@ async fn test_read_projection() {
     let buf = Vec::new();
     let mut writer = VortexFileWriter::new(buf);
     writer = writer.write_array_columns(st.into_array()).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default())
         .with_projection(Projection::new([0]))
@@ -273,7 +274,7 @@ async fn unequal_batches() {
     let buf = Vec::new();
     let mut writer = VortexFileWriter::new(buf);
     writer = writer.write_array_columns(st.into_array()).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     let mut stream = VortexReadBuilder::new(written, LayoutDeserializer::default())
         .build()
@@ -331,7 +332,8 @@ async fn write_chunked() {
     let buf = Vec::new();
     let mut writer = VortexFileWriter::new(buf);
     writer = writer.write_array_columns(chunked_st).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+
+    let written = Buffer::from(writer.finalize().await.unwrap());
     let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default())
         .build()
         .await
@@ -364,7 +366,8 @@ async fn filter_string() {
     .into_array();
     let mut writer = VortexFileWriter::new(Vec::new());
     writer = writer.write_array_columns(st).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+
+    let written = Buffer::from(writer.finalize().await.unwrap());
     let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default())
         .with_row_filter(RowFilter::new(BinaryExpr::new_expr(
             Column::new_expr(Field::from("name")),
@@ -421,7 +424,7 @@ async fn filter_or() {
     .into_array();
     let mut writer = VortexFileWriter::new(Vec::new());
     writer = writer.write_array_columns(st).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
     let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default())
         .with_row_filter(RowFilter::new(BinaryExpr::new_expr(
             BinaryExpr::new_expr(
@@ -497,7 +500,7 @@ async fn filter_and() {
     .into_array();
     let mut writer = VortexFileWriter::new(Vec::new());
     writer = writer.write_array_columns(st).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
     let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default())
         .with_row_filter(RowFilter::new(BinaryExpr::new_expr(
             BinaryExpr::new_expr(
@@ -559,7 +562,7 @@ async fn test_with_indices_simple() {
         .write_array_columns(expected_array.into_array())
         .await
         .unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     // test no indices
     let empty_indices = Vec::<u32>::new();
@@ -640,7 +643,7 @@ async fn test_with_indices_on_two_columns() {
     let buf = Vec::new();
     let mut writer = VortexFileWriter::new(buf);
     writer = writer.write_array_columns(st.into_array()).await.unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     let kept_indices = [0_usize, 3, 7];
     let kept_indices_u8 = kept_indices.iter().map(|&x| x as u8).collect::<Vec<_>>();
@@ -701,7 +704,7 @@ async fn test_with_indices_and_with_row_filter_simple() {
         .write_array_columns(expected_array.into_array())
         .await
         .unwrap();
-    let written = writer.finalize().await.unwrap();
+    let written = Buffer::from(writer.finalize().await.unwrap());
 
     // test no indices
     let empty_indices = Vec::<u32>::new();
diff --git a/vortex-serde/src/io/compio.rs b/vortex-serde/src/io/compio.rs
new file mode 100644
index 0000000000..726e8b3d7f
--- /dev/null
+++ b/vortex-serde/src/io/compio.rs
@@ -0,0 +1,73 @@
+// run on top of Compio files
+
+use std::future::Future;
+use std::io;
+
+use bytes::BytesMut;
+use compio::fs::File;
+use compio::io::AsyncReadAtExt;
+use compio::runtime::Runtime;
+use compio::BufResult;
+use vortex_error::vortex_panic;
+
+use super::VortexReadAt;
+use crate::file::AsyncRuntime;
+
+pub struct CompioAdapter<IO>(IO);
+
+impl VortexReadAt for File {
+    fn read_at_into(
+        &self,
+        pos: u64,
+        buffer: BytesMut,
+    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
+        let this = self.clone();
+        async move {
+            // Turn the buffer into a static slice.
+            let BufResult(res, buffer) = this.read_exact_at(buffer, pos).await;
+            res.map(|_| buffer)
+        }
+    }
+
+    fn size(&self) -> impl Future<Output = u64> + 'static {
+        let this = self.clone();
+        async move {
+            this.metadata()
+                .await
+                .map(|metadata| metadata.len())
+                .unwrap_or_else(|e| vortex_panic!("compio File::size: {e}"))
+        }
+    }
+}
+
+impl AsyncRuntime for Runtime {
+    fn block_on<F: Future>(&self, fut: F) -> F::Output {
+        self.block_on(fut)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::Write;
+
+    use bytes::BytesMut;
+    use compio::fs::File;
+    use tempfile::NamedTempFile;
+
+    use crate::io::VortexReadAt;
+
+    #[cfg_attr(miri, ignore)]
+    #[compio::test]
+    async fn test_read_at_compio_file() {
+        let mut tmpfile = NamedTempFile::new().unwrap();
+        write!(tmpfile, "0123456789").unwrap();
+
+        // Open up a file handle in compio land
+        let file = File::open(tmpfile.path()).await.unwrap();
+
+        // Use the file as a VortexReadAt instance.
+        let four_bytes = BytesMut::zeroed(4);
+        let read = file.read_at_into(2, four_bytes).await.unwrap();
+        assert_eq!(&read, "2345".as_bytes());
+    }
+}
diff --git a/vortex-serde/src/io/dispatcher/compio.rs b/vortex-serde/src/io/dispatcher/compio.rs
new file mode 100644
index 0000000000..64bce91fef
--- /dev/null
+++ b/vortex-serde/src/io/dispatcher/compio.rs
@@ -0,0 +1,100 @@
+use std::future::Future;
+use std::panic::resume_unwind;
+use std::thread::JoinHandle;
+
+use compio::runtime::{JoinHandle as CompioJoinHandle, Runtime, RuntimeBuilder};
+use futures_channel::oneshot;
+use vortex_error::{vortex_bail, vortex_panic, VortexResult};
+
+use super::Dispatch;
+
+trait CompioSpawn {
+    fn spawn(self: Box<Self>) -> CompioJoinHandle<()>;
+}
+
+struct CompioTask<F, R> {
+    task: F,
+    result: oneshot::Sender<R>,
+}
+
+impl<F, Fut, R> CompioSpawn for CompioTask<F, R>
+where
+    F: FnOnce() -> Fut + Send + 'static,
+    Fut: Future<Output = R>,
+    R: Send + 'static,
+{
+    fn spawn(self: Box<Self>) -> CompioJoinHandle<()> {
+        let CompioTask { task, result } = *self;
+        Runtime::with_current(|rt| {
+            rt.spawn(async move {
+                let task_output = task().await;
+                result.send(task_output).ok();
+            })
+        })
+    }
+}
+
+#[derive(Debug)]
+pub struct CompioDispatcher {
+    submitter: flume::Sender<Box<dyn CompioSpawn + Send>>,
+    threads: Vec<JoinHandle<()>>,
+}
+
+impl CompioDispatcher {
+    pub fn new(num_threads: usize) -> Self {
+        let (submitter, rx) = flume::unbounded();
+        let threads: Vec<_> = (0..num_threads)
+            .map(|tid| {
+                let worker_thread = std::thread::Builder::new();
+                let worker_thread = worker_thread.name(format!("compio-dispatch-{tid}"));
+                let rx: flume::Receiver<Box<dyn CompioSpawn + Send>> = rx.clone();
+
+                worker_thread
+                    .spawn(move || {
+                        // Create a runtime-per-thread
+                        let rt = RuntimeBuilder::new().build().unwrap_or_else(|e| {
+                            vortex_panic!("CompioDispatcher RuntimeBuilder build(): {e}")
+                        });
+
+                        rt.block_on(async move {
+                            while let Ok(task) = rx.recv_async().await {
+                                task.spawn().detach();
+                            }
+                        });
+                    })
+                    .unwrap_or_else(|e| vortex_panic!("CompioDispatcher worker thread spawn: {e}"))
+            })
+            .collect();
+
+        Self { submitter, threads }
+    }
+}
+
+impl Dispatch for CompioDispatcher {
+    fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
+    where
+        F: (FnOnce() -> Fut) + Send + 'static,
+        Fut: Future<Output = R> + 'static,
+        R: Send + 'static,
+    {
+        let (tx, rx) = oneshot::channel();
+        let compio_task = Box::new(CompioTask { task, result: tx });
+        match self.submitter.send(compio_task) {
+            Ok(()) => Ok(rx),
+            Err(err) => vortex_bail!("Dispatcher error spawning task: {err}"),
+        }
+    }
+
+    fn shutdown(self) -> VortexResult<()> {
+        // drop the submitter.
+        //
+        // Each worker thread will receive an `Err(Canceled)`
+        drop(self.submitter);
+        for thread in self.threads {
+            // Propagate any panics from the worker threads.
+            thread.join().unwrap_or_else(|err| resume_unwind(err));
+        }
+
+        Ok(())
+    }
+}
diff --git a/vortex-serde/src/io/dispatcher/mod.rs b/vortex-serde/src/io/dispatcher/mod.rs
new file mode 100644
index 0000000000..6a9791f4e8
--- /dev/null
+++ b/vortex-serde/src/io/dispatcher/mod.rs
@@ -0,0 +1,93 @@
+#[cfg(feature = "compio")]
+mod compio;
+#[cfg(feature = "tokio")]
+mod tokio;
+use std::future::Future;
+
+use futures::channel::oneshot;
+use vortex_error::VortexResult;
+
+#[cfg(feature = "compio")]
+use self::compio::*;
+#[cfg(feature = "tokio")]
+use self::tokio::*;
+
+pub trait Dispatch {
+    /// Dispatch a new asynchronous task.
+    ///
+    /// The function spawning the task must be `Send` as it will be sent to
+    /// the driver thread.
+    ///
+    /// The returned `Future` will be executed to completion on a single thread,
+    /// thus it may be `!Send`.
+    fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
+    where
+        F: (FnOnce() -> Fut) + Send + 'static,
+        Fut: Future<Output = R> + 'static,
+        R: Send + 'static;
+
+    /// Gracefully shutdown the dispatcher, consuming it.
+    ///
+    /// Existing tasks are awaited before exiting.
+    fn shutdown(self) -> VortexResult<()>;
+}
+
+/// A cross-thread, cross-runtime dispatcher of async IO workloads.
+///
+/// `IoDispatcher`s are handles to an async runtime that can handle work submissions and
+/// multiplexes them across a set of worker threads. Unlike an async runtime, which is free
+/// to balance tasks as they see fit, the purpose of the Dispatcher is to enable the spawning
+/// of asynchronous, `!Send` tasks across potentially many worker threads, and allowing work
+/// submission from any other runtime.
+#[derive(Debug)]
+pub struct IoDispatcher(Inner);
+
+#[derive(Debug)]
+enum Inner {
+    #[cfg(feature = "tokio")]
+    Tokio(TokioDispatcher),
+    #[cfg(feature = "compio")]
+    Compio(CompioDispatcher),
+}
+
+impl Dispatch for IoDispatcher {
+    fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
+    where
+        F: (FnOnce() -> Fut) + Send + 'static,
+        Fut: Future<Output = R> + 'static,
+        R: Send + 'static,
+    {
+        match &self.0 {
+            #[cfg(feature = "tokio")]
+            Inner::Tokio(tokio_dispatch) => tokio_dispatch.dispatch(task),
+            #[cfg(feature = "compio")]
+            Inner::Compio(compio_dispatch) => compio_dispatch.dispatch(task),
+        }
+    }
+
+    fn shutdown(self) -> VortexResult<()> {
+        match self.0 {
+            #[cfg(feature = "tokio")]
+            Inner::Tokio(tokio_dispatch) => tokio_dispatch.shutdown(),
+            #[cfg(feature = "compio")]
+            Inner::Compio(compio_dispatch) => compio_dispatch.shutdown(),
+        }
+    }
+}
+
+impl IoDispatcher {
+    /// Create a new IO dispatcher that uses a set of Tokio `current_thread` runtimes to
+    /// execute both `Send` and `!Send` futures.
+    ///
+    /// A handle to the dispatcher can be passed freely among threads, allowing multiple parties to
+    /// perform dispatching across different threads.
+    #[cfg(feature = "tokio")]
+    pub fn new_tokio(num_thread: usize) -> Self {
+        Self(Inner::Tokio(TokioDispatcher::new(num_thread)))
+    }
+
+    #[cfg(feature = "compio")]
+    pub fn new_compio(num_threads: usize) -> Self {
+        Self(Inner::Compio(CompioDispatcher::new(num_threads)))
+    }
+}
diff --git a/vortex-serde/src/io/dispatcher/tokio.rs b/vortex-serde/src/io/dispatcher/tokio.rs
new file mode 100644
index 0000000000..6b56238931
--- /dev/null
+++ b/vortex-serde/src/io/dispatcher/tokio.rs
@@ -0,0 +1,142 @@
+use std::future::Future;
+use std::panic::resume_unwind;
+use std::thread::JoinHandle;
+
+use futures::channel::oneshot;
+use tokio::task::{JoinHandle as TokioJoinHandle, LocalSet};
+use vortex_error::{vortex_bail, vortex_panic, VortexResult};
+
+use super::Dispatch;
+
+trait TokioSpawn {
+    fn spawn(self: Box<Self>) -> TokioJoinHandle<()>;
+}
+
+/// A [dispatcher][Dispatch] of IO operations that runs tasks on one of several
+/// Tokio `current_thread` runtimes.
+#[derive(Debug)]
+pub(super) struct TokioDispatcher {
+    submitter: flume::Sender<Box<dyn TokioSpawn + Send>>,
+    threads: Vec<JoinHandle<()>>,
+}
+
+impl TokioDispatcher {
+    pub fn new(num_threads: usize) -> Self {
+        let (submitter, rx) = flume::unbounded();
+        let threads: Vec<_> = (0..num_threads)
+            .map(|tid| {
+                let worker_thread = std::thread::Builder::new();
+                let worker_thread = worker_thread.name(format!("tokio-dispatch-{tid}"));
+                let rx: flume::Receiver<Box<dyn TokioSpawn + Send>> = rx.clone();
+
+                worker_thread
+                    .spawn(move || {
+                        // Create a runtime-per-thread
+                        let rt = tokio::runtime::Builder::new_current_thread()
+                            .enable_all()
+                            .build()
+                            .unwrap_or_else(|e| {
+                                vortex_panic!("TokioDispatcher new_current_thread build(): {e}")
+                            });
+
+                        rt.block_on(async move {
+                            // Use a LocalSet so that all spawned tasks will run on the current thread. This allows
+                            // spawning !Send futures.
+                            LocalSet::new()
+                                .run_until(async {
+                                    while let Ok(task) = rx.recv_async().await {
+                                        task.spawn();
+                                    }
+                                })
+                                .await;
+                        });
+                    })
+                    .unwrap_or_else(|e| vortex_panic!("TokioDispatcher worker thread spawn: {e}"))
+            })
+            .collect();
+
+        Self { submitter, threads }
+    }
+}
+
+/// Tasks that can be launched onto a runtime.
+struct TokioTask<F, R> {
+    task: F,
+    result: oneshot::Sender<R>,
+}
+
+impl<F, Fut, R> TokioSpawn for TokioTask<F, R>
+where
+    F: FnOnce() -> Fut + Send + 'static,
+    Fut: Future<Output = R>,
+    R: Send + 'static,
+{
+    fn spawn(self: Box<Self>) -> TokioJoinHandle<()> {
+        let TokioTask { task, result } = *self;
+        tokio::task::spawn_local(async move {
+            let task_output = task().await;
+            result.send(task_output).ok();
+        })
+    }
+}
+
+impl Dispatch for TokioDispatcher {
+    fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
+    where
+        F: (FnOnce() -> Fut) + Send + 'static,
+        Fut: Future<Output = R> + 'static,
+        R: Send + 'static,
+    {
+        let (tx, rx) = oneshot::channel();
+
+        let task = TokioTask { result: tx, task };
+
+        match self.submitter.send(Box::new(task)) {
+            Ok(()) => Ok(rx),
+            Err(err) => vortex_bail!("Dispatcher error spawning task: {err}"),
+        }
+    }
+
+    fn shutdown(self) -> VortexResult<()> {
+        // drop the submitter.
+        //
+        // Each worker thread will receive an `Err(Canceled)`
+        drop(self.submitter);
+        for thread in self.threads {
+            // Propagate any panics from the worker threads.
+            // NOTE: currently, panics inside any of the tasks will not propagate to the LocalSet's join handle,
+            // see https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#panics-1
+            thread.join().unwrap_or_else(|err| resume_unwind(err));
+        }
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::Write;
+
+    use bytes::BytesMut;
+    use tempfile::NamedTempFile;
+
+    use super::TokioDispatcher;
+    use crate::io::{Dispatch, TokioFile, VortexReadAt};
+
+    #[tokio::test]
+    async fn test_tokio_dispatch_simple() {
+        let dispatcher = TokioDispatcher::new(4);
+        let mut tmpfile = NamedTempFile::new().unwrap();
+        write!(tmpfile, "5678").unwrap();
+
+        let rx = dispatcher
+            .dispatch(|| async move {
+                let file = TokioFile::open(tmpfile.path()).unwrap();
+                let bytes = BytesMut::zeroed(4);
+                file.read_at_into(0, bytes).await.unwrap()
+            })
+            .unwrap();
+
+        assert_eq!(&rx.await.unwrap(), "5678".as_bytes());
+    }
+}
diff --git a/vortex-serde/src/io/mod.rs b/vortex-serde/src/io/mod.rs
index d5475dcf84..355e897791 100644
--- a/vortex-serde/src/io/mod.rs
+++ b/vortex-serde/src/io/mod.rs
@@ -1,7 +1,8 @@
+#[cfg(feature = "compio")]
+pub use compio::*;
+pub use dispatcher::*;
 #[cfg(feature = "futures")]
 pub use futures::*;
-#[cfg(feature = "monoio")]
-pub use monoio::*;
 #[cfg(feature = "object_store")]
 pub use object_store::*;
 pub use read::*;
@@ -9,8 +10,10 @@ pub use read::*;
 pub use tokio::*;
 pub use write::*;
 
+#[cfg(feature = "compio")]
+mod compio;
+mod dispatcher;
 mod futures;
-mod monoio;
 mod object_store;
 pub mod offset;
 mod read;
diff --git a/vortex-serde/src/io/monoio.rs b/vortex-serde/src/io/monoio.rs
deleted file mode 100644
index 0049812f98..0000000000
--- a/vortex-serde/src/io/monoio.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-#![cfg(feature = "monoio")]
-
-use std::future::Future;
-use std::io;
-
-use bytes::BytesMut;
-use futures_util::FutureExt;
-use monoio::buf::IoBufMut;
-use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt};
-use vortex_buffer::io_buf::IoBuf;
-
-use crate::io::{VortexRead, VortexWrite};
-
-pub struct MonoAdapter<IO>(IO);
-
-impl<R: AsyncReadRent> VortexRead for MonoAdapter<R> {
-    fn read_into(&mut self, buffer: BytesMut) -> impl Future<Output = io::Result<BytesMut>> {
-        let len = buffer.len();
-        self.0
-            .read_exact(buffer.slice_mut(0..len))
-            .map(|(result, buffer)| match result {
-                Ok(_len) => Ok(buffer.into_inner()),
-                Err(e) => Err(e),
-            })
-    }
-}
-
-impl<W: AsyncWriteRent> VortexWrite for MonoAdapter<W> {
-    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
-        self.0
-            .write_all(MonoAdapter(buffer))
-            .map(|(result, buffer)| match result {
-                Ok(_len) => Ok(buffer.0),
-                Err(e) => Err(e),
-            })
-    }
-
-    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
-        self.0.flush()
-    }
-
-    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
-        self.0.shutdown()
-    }
-}
-
-unsafe impl<B: IoBuf> monoio::buf::IoBuf for MonoAdapter<B> {
-    fn read_ptr(&self) -> *const u8 {
-        IoBuf::read_ptr(&self.0)
-    }
-
-    fn bytes_init(&self) -> usize {
-        IoBuf::bytes_init(&self.0)
-    }
-}
diff --git a/vortex-serde/src/io/object_store.rs b/vortex-serde/src/io/object_store.rs
index 1369fe12f2..c2ba1f5dfd 100644
--- a/vortex-serde/src/io/object_store.rs
+++ b/vortex-serde/src/io/object_store.rs
@@ -68,29 +68,38 @@ impl ObjectStoreReadAt {
 }
 
 impl VortexReadAt for ObjectStoreReadAt {
-    async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result<BytesMut> {
-        let start_range = pos as usize;
-        let bytes = self
-            .object_store
-            .get_range(&self.location, start_range..(start_range + buffer.len()))
-            .await?;
-        buffer.as_mut().copy_from_slice(bytes.as_ref());
-        Ok(buffer)
+    fn read_at_into(
+        &self,
+        pos: u64,
+        mut buffer: BytesMut,
+    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
+        let object_store = self.object_store.clone();
+        let location = self.location.clone();
+
+        Box::pin(async move {
+            let start_range = pos as usize;
+            let bytes = object_store
+                .get_range(&location, start_range..(start_range + buffer.len()))
+                .await?;
+            buffer.as_mut().copy_from_slice(bytes.as_ref());
+            Ok(buffer)
+        })
     }
 
-    async fn size(&self) -> u64 {
-        self.object_store
-            .head(&self.location)
-            .await
-            .map_err(VortexError::ObjectStore)
-            .unwrap_or_else(|err| {
-                vortex_panic!(
-                    err,
-                    "Failed to get size of object at location {}",
-                    self.location
-                )
-            })
-            .size as u64
+    fn size(&self) -> impl Future<Output = u64> + 'static {
+        let object_store = self.object_store.clone();
+        let location = self.location.clone();
+
+        Box::pin(async move {
+            object_store
+                .head(&location)
+                .await
+                .map_err(VortexError::ObjectStore)
+                .unwrap_or_else(|err| {
+                    vortex_panic!(err, "Failed to get size of object at location {}", location)
+                })
+                .size as u64
+        })
     }
 }
 
diff --git a/vortex-serde/src/io/offset.rs b/vortex-serde/src/io/offset.rs
index d391c10cd2..fa3294f0ba 100644
--- a/vortex-serde/src/io/offset.rs
+++ b/vortex-serde/src/io/offset.rs
@@ -1,6 +1,7 @@
 use std::future::Future;
 
 use bytes::BytesMut;
+use futures::FutureExt;
 
 use crate::io::VortexReadAt;
 
@@ -10,6 +11,18 @@ pub struct OffsetReadAt<R> {
     offset: u64,
 }
 
+impl<R> Clone for OffsetReadAt<R>
+where
+    R: Clone,
+{
+    fn clone(&self) -> Self {
+        Self {
+            read: self.read.clone(),
+            offset: self.offset,
+        }
+    }
+}
+
 impl<R: VortexReadAt> OffsetReadAt<R> {
     pub fn new(read: R, offset: u64) -> Self {
         Self { read, offset }
@@ -21,7 +34,7 @@ impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
         &self,
         pos: u64,
         buffer: BytesMut,
-    ) -> impl Future<Output = std::io::Result<BytesMut>> {
+    ) -> impl Future<Output = std::io::Result<BytesMut>> + 'static {
         self.read.read_at_into(pos + self.offset, buffer)
     }
 
@@ -29,7 +42,8 @@ impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
         self.read.performance_hint()
     }
 
-    async fn size(&self) -> u64 {
-        self.read.size().await - self.offset
+    fn size(&self) -> impl Future<Output = u64> + 'static {
+        let offset = self.offset;
+        self.read.size().map(move |len| len - offset)
     }
 }
diff --git a/vortex-serde/src/io/read.rs b/vortex-serde/src/io/read.rs
index d8abd039f0..c76cb4d6a5 100644
--- a/vortex-serde/src/io/read.rs
+++ b/vortex-serde/src/io/read.rs
@@ -1,4 +1,4 @@
-use std::future::Future;
+use std::future::{self, Future};
 use std::io;
 use std::io::Cursor;
 use std::sync::Arc;
@@ -7,17 +7,41 @@ use bytes::BytesMut;
 use vortex_buffer::Buffer;
 use vortex_error::vortex_err;
 
+/// An asynchronous streaming reader.
+///
+/// Implementations expose data via the asynchronous [`read_into`][VortexRead::read_into], which
+/// will fill the exact number of bytes and advance the stream.
+///
+/// If the exact number of bytes is not available from the stream, an
+/// [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error is returned.
 pub trait VortexRead {
     fn read_into(&mut self, buffer: BytesMut) -> impl Future<Output = io::Result<BytesMut>>;
 }
 
-#[allow(clippy::len_without_is_empty)]
-pub trait VortexReadAt: Send + Sync {
+/// A trait for types that support asynchronous reads.
+///
+/// References to the type must be safe to [share across threads][Send], but spawned
+/// futures may be `!Send` to support thread-per-core implementations.
+///
+/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
+pub trait VortexReadAt: Send + Sync + Clone + 'static {
+    /// Request an asynchronous positional read to be done, with results written into the provided `buffer`.
+    ///
+    /// This method will take ownership of the provided `buffer`, and upon successful completion will return
+    /// the buffer completely full with data.
+    ///
+    /// If the reader does not have enough data available to fill the buffer, the returned Future will complete
+    /// with an [`io::Error`].
+    ///
+    /// ## Thread Safety
+    ///
+    /// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
+    /// executors.
     fn read_at_into(
         &self,
         pos: u64,
         buffer: BytesMut,
-    ) -> impl Future<Output = io::Result<BytesMut>> + Send;
+    ) -> impl Future<Output = io::Result<BytesMut>> + 'static;
 
     // TODO(ngates): the read implementation should be able to hint at its latency/throughput
     //  allowing the caller to make better decisions about how to coalesce reads.
@@ -25,8 +49,11 @@ pub trait VortexReadAt: Send + Sync {
         0
     }
 
-    /// Size of the underlying file in bytes
-    fn size(&self) -> impl Future<Output = u64>;
+    /// Asynchronously get the number of bytes of data readable.
+    ///
+    /// For a file it will be the size in bytes, for an object in an
+    /// `ObjectStore` it will be the `ObjectMeta::size`.
+    fn size(&self) -> impl Future<Output = u64> + 'static;
 }
 
 impl<T: VortexReadAt> VortexReadAt for Arc<T> {
@@ -34,7 +61,7 @@ impl<T: VortexReadAt> VortexReadAt for Arc<T> {
         &self,
         pos: u64,
         buffer: BytesMut,
-    ) -> impl Future<Output = io::Result<BytesMut>> + Send {
+    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
         T::read_at_into(self, pos, buffer)
     }
 
@@ -42,8 +69,8 @@ impl<T: VortexReadAt> VortexReadAt for Arc<T> {
         T::performance_hint(self)
     }
 
-    async fn size(&self) -> u64 {
-        T::size(self).await
+    fn size(&self) -> impl Future<Output = u64> + 'static {
+        T::size(self)
     }
 }
 
@@ -60,6 +87,7 @@ impl VortexRead for BytesMut {
     }
 }
 
+// Implement reading for a cursor operation.
 impl<R: VortexReadAt> VortexRead for Cursor<R> {
     async fn read_into(&mut self, buffer: BytesMut) -> io::Result<BytesMut> {
         let res = R::read_at_into(self.get_ref(), self.position(), buffer).await?;
@@ -68,75 +96,28 @@ impl<R: VortexReadAt> VortexRead for Cursor<R> {
     }
 }
 
-impl<R: ?Sized + VortexReadAt> VortexReadAt for &R {
-    fn read_at_into(
-        &self,
-        pos: u64,
-        buffer: BytesMut,
-    ) -> impl Future<Output = io::Result<BytesMut>> + Send {
-        R::read_at_into(*self, pos, buffer)
-    }
-
-    fn performance_hint(&self) -> usize {
-        R::performance_hint(*self)
-    }
-
-    async fn size(&self) -> u64 {
-        R::size(*self).await
-    }
-}
-
-impl VortexReadAt for Vec<u8> {
+impl VortexReadAt for Buffer {
     fn read_at_into(
         &self,
         pos: u64,
-        buffer: BytesMut,
-    ) -> impl Future<Output = io::Result<BytesMut>> {
-        VortexReadAt::read_at_into(self.as_slice(), pos, buffer)
-    }
-
-    async fn size(&self) -> u64 {
-        self.len() as u64
-    }
-}
-
-impl VortexReadAt for [u8] {
-    async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result<BytesMut> {
+        mut buffer: BytesMut,
+    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
         if buffer.len() + pos as usize > self.len() {
-            Err(io::Error::new(
+            future::ready(Err(io::Error::new(
                 io::ErrorKind::UnexpectedEof,
                 vortex_err!("unexpected eof"),
-            ))
-        } else {
-            let buffer_len = buffer.len();
-            buffer.copy_from_slice(&self[pos as usize..][..buffer_len]);
-            Ok(buffer)
-        }
-    }
-
-    async fn size(&self) -> u64 {
-        self.len() as u64
-    }
-}
-
-impl VortexReadAt for Buffer {
-    async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result<BytesMut> {
-        if buffer.len() + pos as usize > self.len() {
-            Err(io::Error::new(
-                io::ErrorKind::UnexpectedEof,
-                vortex_err!("unexpected eof"),
-            ))
+            )))
         } else {
             let buffer_len = buffer.len();
             buffer.copy_from_slice(
                 self.slice(pos as usize..pos as usize + buffer_len)
                     .as_slice(),
             );
-            Ok(buffer)
+            future::ready(Ok(buffer))
         }
     }
 
-    async fn size(&self) -> u64 {
-        self.len() as u64
+    fn size(&self) -> impl Future<Output = u64> + 'static {
+        future::ready(self.len() as u64)
     }
 }
diff --git a/vortex-serde/src/io/tokio.rs b/vortex-serde/src/io/tokio.rs
index 678de7c49d..74e276063e 100644
--- a/vortex-serde/src/io/tokio.rs
+++ b/vortex-serde/src/io/tokio.rs
@@ -1,22 +1,26 @@
 #![cfg(feature = "tokio")]
 
-use std::future::Future;
+use std::fs::File;
+use std::future::{self, Future};
 use std::io;
-use std::os::unix::prelude::FileExt;
+use std::ops::Deref;
+use std::os::unix::fs::FileExt;
+use std::path::Path;
+use std::sync::Arc;
 
 use bytes::BytesMut;
-use tokio::fs::File;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use tokio::runtime::Runtime;
+use tokio::runtime::{Handle, Runtime};
 use vortex_buffer::io_buf::IoBuf;
-use vortex_error::{VortexError, VortexUnwrap as _};
+use vortex_error::vortex_panic;
 
+use super::VortexReadAt;
 use crate::file::AsyncRuntime;
-use crate::io::{VortexRead, VortexReadAt, VortexWrite};
+use crate::io::{VortexRead, VortexWrite};
 
 pub struct TokioAdapter<IO>(pub IO);
 
-impl<R: AsyncRead + Unpin> VortexRead for TokioAdapter<R> {
+impl<IO: AsyncRead + Unpin> VortexRead for TokioAdapter<IO> {
     async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
         self.0.read_exact(buffer.as_mut()).await?;
         Ok(buffer)
@@ -38,30 +42,80 @@ impl<W: AsyncWrite + Unpin> VortexWrite for TokioAdapter<W> {
     }
 }
 
-impl VortexRead for File {
-    async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
-        self.read_exact(buffer.as_mut()).await?;
-        Ok(buffer)
+impl AsyncRuntime for Runtime {
+    fn block_on<F: Future>(&self, fut: F) -> F::Output {
+        self.block_on(fut)
     }
 }
 
-impl VortexReadAt for File {
-    async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result<BytesMut> {
-        let std_file = self.try_clone().await?.into_std().await;
-        std_file.read_exact_at(buffer.as_mut(), pos)?;
-        Ok(buffer)
+impl AsyncRuntime for Handle {
+    fn block_on<F: Future>(&self, fut: F) -> F::Output {
+        self.block_on(fut)
     }
+}
+
+/// A cheaply cloneable, readonly file that executes operations
+/// on a tokio blocking threadpool.
+///
+/// We use this because the builtin tokio `File` type is not `Clone` and
+/// also does actually implement a `read_exact_at` operation.
+#[derive(Debug, Clone)]
+pub struct TokioFile(Arc<File>);
+
+impl TokioFile {
+    /// Open a file on the current file system.
+    ///
+    /// The `TokioFile` takes ownership of the file descriptor, and can be cloned
+    /// many times without opening a new file descriptor. When the last instance
+    /// of the `TokioFile` is dropped, the file descriptor is closed.
+    pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
+        let f = File::open(path)?;
 
-    async fn size(&self) -> u64 {
-        self.metadata()
-            .await
-            .map_err(|err| VortexError::IOError(err).with_context("Failed to get file metadata"))
-            .vortex_unwrap()
-            .len()
+        Ok(Self(Arc::new(f)))
     }
 }
 
-impl VortexWrite for File {
+// Implement deref coercion for non-mut `File` methods on `TokioFile`.
+impl Deref for TokioFile {
+    type Target = File;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl VortexReadAt for TokioFile {
+    fn read_at_into(
+        &self,
+        pos: u64,
+        buffer: BytesMut,
+    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
+        let this = self.clone();
+
+        let mut buffer = buffer;
+        match this.read_exact_at(&mut buffer, pos) {
+            Ok(()) => future::ready(Ok(buffer)),
+            Err(e) => future::ready(Err(e)),
+        }
+    }
+
+    fn size(&self) -> impl Future<Output = u64> + 'static {
+        let this = self.clone();
+
+        async move {
+            let res = tokio::task::spawn_blocking(move || {
+                this.metadata()
+                    .unwrap_or_else(|e| vortex_panic!("access TokioFile metadata: {e}"))
+                    .len()
+            })
+            .await;
+
+            res.unwrap_or_else(|e| vortex_panic!("Joining spawn_blocking: size: {e}"))
+        }
+    }
+}
+
+impl VortexWrite for tokio::fs::File {
     async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
         AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
         Ok(buffer)
@@ -76,8 +130,64 @@ impl VortexWrite for File {
     }
 }
 
-impl AsyncRuntime for Runtime {
-    fn block_on<F: Future>(&self, fut: F) -> F::Output {
-        self.block_on(fut)
+#[cfg(test)]
+mod tests {
+    use std::fs::File;
+    use std::io::Write;
+    use std::ops::Deref;
+    use std::os::unix::fs::FileExt;
+
+    use bytes::BytesMut;
+    use tempfile::NamedTempFile;
+
+    use crate::io::{TokioFile, VortexReadAt};
+
+    #[tokio::test]
+    async fn test_shared_file() {
+        let mut tmpfile = NamedTempFile::new().unwrap();
+        write!(tmpfile, "0123456789").unwrap();
+
+        let shared_file = TokioFile::open(tmpfile.path()).unwrap();
+
+        let first_half = BytesMut::zeroed(5);
+        let first_half = shared_file.read_at_into(0, first_half).await.unwrap();
+
+        let second_half = BytesMut::zeroed(5);
+        let second_half = shared_file.read_at_into(5, second_half).await.unwrap();
+
+        assert_eq!(first_half.freeze(), "01234".as_bytes());
+        assert_eq!(second_half.freeze(), "56789".as_bytes());
+    }
+
+    #[test]
+    fn test_drop_semantics() {
+        let mut file = NamedTempFile::new().unwrap();
+        write!(file, "test123").unwrap();
+
+        // Transfer ownership of the file into our Tokio file.
+        let tokio_file = TokioFile::open(file.path()).unwrap();
+        // Delete the file, so that tokio_file's owned FD is the only thing keeping it around.
+        std::fs::remove_file(file.path()).unwrap();
+
+        // Create a function to test if we can read from the file
+        let can_read = |file: &File| {
+            let mut buffer = vec![0; 7];
+            file.read_exact_at(&mut buffer, 0).is_ok()
+        };
+
+        // Test initial read
+        assert!(can_read(tokio_file.deref()));
+
+        // Clone the old tokio_file, then drop the old one. Because the refcount
+        // of the Inner is > 0, the file handle should not be dropped.
+        let tokio_file_cloned = tokio_file.clone();
+        drop(tokio_file);
+
+        // File handle should still be open and readable
+        assert!(can_read(tokio_file_cloned.deref()));
+
+        // Now, drop the cloned handle. The file should be deleted after the drop.
+        drop(tokio_file_cloned);
+        assert!(!std::fs::exists(file.path()).unwrap());
     }
 }
diff --git a/vortex-serde/src/lib.rs b/vortex-serde/src/lib.rs
index 644d1b0e8e..454c557eb3 100644
--- a/vortex-serde/src/lib.rs
+++ b/vortex-serde/src/lib.rs
@@ -17,19 +17,20 @@ pub const ALIGNMENT: usize = 64;
 #[cfg(test)]
 #[allow(clippy::panic_in_result_fn)]
 mod test {
+    use std::io::Cursor;
     use std::sync::Arc;
 
     use futures_executor::block_on;
-    use futures_util::io::Cursor;
     use futures_util::{pin_mut, StreamExt, TryStreamExt};
     use itertools::Itertools;
     use vortex_array::array::{ChunkedArray, PrimitiveArray, PrimitiveEncoding};
     use vortex_array::encoding::ArrayEncoding;
     use vortex_array::stream::ArrayStreamExt;
     use vortex_array::{ArrayDType, Context, IntoArray};
+    use vortex_buffer::Buffer;
     use vortex_error::VortexResult;
 
-    use crate::io::FuturesAdapter;
+    use crate::io::TokioAdapter;
     use crate::stream_reader::StreamArrayReader;
     use crate::stream_writer::StreamArrayWriter;
 
@@ -43,23 +44,21 @@ mod test {
         })
     }
 
-    #[test]
+    #[tokio::test]
     #[cfg_attr(miri, ignore)]
-    fn test_empty_index() -> VortexResult<()> {
+    async fn test_empty_index() -> VortexResult<()> {
         let data = PrimitiveArray::from((0i32..3_000_000).collect_vec());
         let buffer = write_ipc(data);
 
         let indices = PrimitiveArray::from(vec![1, 2, 10]).into_array();
 
         let ctx = Arc::new(Context::default());
-        let stream_reader = block_on(async {
-            StreamArrayReader::try_new(FuturesAdapter(Cursor::new(buffer)), ctx)
-                .await
-                .unwrap()
-                .load_dtype()
-                .await
-                .unwrap()
-        });
+        let stream_reader = StreamArrayReader::try_new(TokioAdapter(buffer.as_slice()), ctx)
+            .await
+            .unwrap()
+            .load_dtype()
+            .await
+            .unwrap();
         let reader = stream_reader.into_array_stream();
 
         let result_iter = reader.take_rows(indices)?;
@@ -69,9 +68,9 @@ mod test {
         Ok(())
     }
 
-    #[test]
+    #[tokio::test]
     #[cfg_attr(miri, ignore)]
-    fn test_write_read_chunked() -> VortexResult<()> {
+    async fn test_write_read_chunked() -> VortexResult<()> {
         let indices = PrimitiveArray::from(vec![
             10u32, 11, 12, 13, 100_000, 2_999_999, 2_999_999, 3_000_000,
         ])
@@ -83,16 +82,15 @@ mod test {
             PrimitiveArray::from((3_000_000i32..6_000_000).rev().collect_vec()).into_array();
         let chunked = ChunkedArray::try_new(vec![data.clone(), data2], data.dtype().clone())?;
         let buffer = write_ipc(chunked);
+        let buffer = Buffer::from(buffer);
 
         let ctx = Arc::new(Context::default());
-        let stream_reader = block_on(async {
-            StreamArrayReader::try_new(FuturesAdapter(Cursor::new(buffer)), ctx)
-                .await
-                .unwrap()
-                .load_dtype()
-                .await
-                .unwrap()
-        });
+        let stream_reader = StreamArrayReader::try_new(TokioAdapter(Cursor::new(buffer)), ctx)
+            .await
+            .unwrap()
+            .load_dtype()
+            .await
+            .unwrap();
 
         let take_iter = stream_reader.into_array_stream().take_rows(indices)?;
         pin_mut!(take_iter);
diff --git a/vortex-serde/src/messages/reader.rs b/vortex-serde/src/messages/reader.rs
index 2a977ba030..b42a37e557 100644
--- a/vortex-serde/src/messages/reader.rs
+++ b/vortex-serde/src/messages/reader.rs
@@ -15,6 +15,7 @@ use crate::io::VortexRead;
 
 pub const MESSAGE_PREFIX_LENGTH: usize = 4;
 
+/// A stateful reader of [`Message`s][fb::Message] from a stream.
 pub struct MessageReader<R> {
     read: R,
     message: BytesMut,
@@ -361,10 +362,9 @@ mod test {
                 .await
         })
         .unwrap();
-        let written = writer.into_inner();
+        let written = Buffer::from(writer.into_inner());
         let mut reader =
-            block_on(async { MessageReader::try_new(Cursor::new(written.as_slice())).await })
-                .unwrap();
+            block_on(async { MessageReader::try_new(Cursor::new(written)).await }).unwrap();
         let read_page = block_on(async { reader.maybe_read_page().await })
             .unwrap()
             .unwrap();
diff --git a/vortex-serde/src/messages/writer.rs b/vortex-serde/src/messages/writer.rs
index 07ef7e7647..32d759c54c 100644
--- a/vortex-serde/src/messages/writer.rs
+++ b/vortex-serde/src/messages/writer.rs
@@ -78,7 +78,7 @@ impl<W: VortexWrite> MessageWriter<W> {
         let buffer_len = buffer.len();
         self.write_all(buffer).await?;
 
-        let aligned_size = (buffer_len + (self.alignment - 1)) & !(self.alignment - 1);
+        let aligned_size = buffer_len.next_multiple_of(self.alignment);
         let padding = aligned_size - buffer_len;
         self.write_all(&ZEROS[0..padding]).await?;
 
diff --git a/vortex-serde/src/stream_writer/tests.rs b/vortex-serde/src/stream_writer/tests.rs
index e52a6b194c..4e28df8874 100644
--- a/vortex-serde/src/stream_writer/tests.rs
+++ b/vortex-serde/src/stream_writer/tests.rs
@@ -7,6 +7,7 @@ use arrow_array::PrimitiveArray;
 use vortex_array::arrow::FromArrowArray;
 use vortex_array::stream::ArrayStreamExt;
 use vortex_array::{Array, Context, IntoCanonical};
+use vortex_buffer::Buffer;
 
 use crate::stream_reader::StreamArrayReader;
 use crate::stream_writer::StreamArrayWriter;
@@ -20,6 +21,7 @@ async fn broken_data() {
         .await
         .unwrap()
         .into_inner();
+    let written = Buffer::from(written);
     let reader = StreamArrayReader::try_new(Cursor::new(written), Arc::new(Context::default()))
         .await
         .unwrap();