Skip to content

Commit

Permalink
Merge pull request #31 from supabase/fix-service-not-exist
Browse files Browse the repository at this point in the history
fix: return an error when service path does not exist
  • Loading branch information
laktek authored Apr 6, 2023
2 parents 39ee43f + 45962a9 commit 9b29e30
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ reqwest = { version = "0.11.13" }
serde = { version = "1.0.149", features = ["derive"] }
tokio = { version = "1.24", features = ["full"] }
url = { version = "2.3.1" }
uuid = { version = "1.1.2", features = ["v4"] }
v8 = { version = "0.60.1", default-features = false }
16 changes: 13 additions & 3 deletions base/src/js_worker/user_workers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerMsgs, UserWorkerOptions};

use anyhow::Error;
use deno_core::error::{custom_error, type_error, AnyError};
use deno_core::futures::stream::Peekable;
use deno_core::futures::Stream;
Expand Down Expand Up @@ -27,6 +28,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

pub fn init() -> Extension {
Extension::builder("custom:user_workers")
Expand Down Expand Up @@ -54,7 +56,7 @@ pub async fn op_user_worker_create(
) -> Result<String, AnyError> {
let op_state = state.borrow();
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
let (result_tx, result_rx) = oneshot::channel::<CreateUserWorkerResult>();
let (result_tx, result_rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();

let mut env_vars = HashMap::new();
for (key, value) in env_vars_vec {
Expand All @@ -79,8 +81,15 @@ pub async fn op_user_worker_create(
));
}

// channel returns a Result<T, E>, we need to unwrap it first;
let result = result.unwrap();
Ok(result.key)
if result.is_err() {
return Err(custom_error(
"create_user_worker_error",
result.unwrap_err().to_string(),
));
}
Ok(result.unwrap().key.to_string())
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -285,7 +294,8 @@ pub async fn op_user_worker_fetch_send(
.ok()
.expect("multiple op_user_worker_fetch_send ongoing");
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx));
let uuid = Uuid::parse_str(key.as_str())?;
tx.send(UserWorkerMsgs::SendRequest(uuid, request.0, result_tx));

let result = result_rx.await;
if result.is_err() {
Expand Down
44 changes: 29 additions & 15 deletions base/src/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::js_worker::{MainWorker, UserWorker};

use anyhow::Error;
use anyhow::{bail, Error};
use hyper::{Body, Request, Response};
use log::{debug, error};
use std::collections::HashMap;
Expand All @@ -11,7 +11,9 @@ use std::thread;
use tokio::net::UnixStream;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

#[derive(Debug)]
pub struct WorkerContext {
handle: thread::JoinHandle<Result<(), Error>>,
request_sender: hyper::client::conn::SendRequest<Body>,
Expand Down Expand Up @@ -41,6 +43,10 @@ impl WorkerContext {
let import_map_path = options.import_map_path;
let user_worker_msgs_tx = options.user_worker_msgs_tx;

if (!service_path.exists()) {
return bail!("main function does not exist {:?}", &service_path);
}

// create a unix socket pair
let (sender_stream, recv_stream) = UnixStream::pair()?;

Expand Down Expand Up @@ -90,6 +96,10 @@ impl WorkerContext {
let import_map_path = options.import_map_path;
let env_vars = options.env_vars;

if (!service_path.exists()) {
return bail!("user function does not exist {:?}", &service_path);
}

// create a unix socket pair
let (sender_stream, recv_stream) = UnixStream::pair()?;

Expand Down Expand Up @@ -141,13 +151,16 @@ impl WorkerContext {

#[derive(Debug)]
pub struct CreateUserWorkerResult {
pub key: String,
pub key: Uuid,
}

#[derive(Debug)]
pub enum UserWorkerMsgs {
Create(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
SendRequest(String, Request<Body>, oneshot::Sender<Response<Body>>),
Create(
UserWorkerOptions,
oneshot::Sender<Result<CreateUserWorkerResult, Error>>,
),
SendRequest(Uuid, Request<Body>, oneshot::Sender<Response<Body>>),
}

pub struct WorkerPool {
Expand All @@ -174,23 +187,24 @@ impl WorkerPool {
let main_worker = Arc::new(RwLock::new(main_worker_ctx));

tokio::spawn(async move {
let mut user_workers: HashMap<String, Arc<RwLock<WorkerContext>>> = HashMap::new();
let mut user_workers: HashMap<Uuid, Arc<RwLock<WorkerContext>>> = HashMap::new();

loop {
match user_worker_msgs_rx.recv().await {
None => break,
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
let key = worker_options.service_path.display().to_string();
if !user_workers.contains_key(&key) {
// TODO: handle errors
let user_worker_ctx = WorkerContext::new_user_worker(worker_options)
.await
.unwrap();
user_workers
.insert(key.clone(), Arc::new(RwLock::new(user_worker_ctx)));
let key = Uuid::new_v4();
let user_worker_ctx = WorkerContext::new_user_worker(worker_options).await;
if !user_worker_ctx.is_err() {
user_workers.insert(
key.clone(),
Arc::new(RwLock::new(user_worker_ctx.unwrap())),
);

tx.send(Ok(CreateUserWorkerResult { key }));
} else {
tx.send(Err(user_worker_ctx.unwrap_err()));
}

tx.send(CreateUserWorkerResult { key });
}
Some(UserWorkerMsgs::SendRequest(key, req, tx)) => {
// TODO: handle errors
Expand Down
26 changes: 17 additions & 9 deletions examples/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ serve(async (req: Request) => {
const no_module_cache = false;
const import_map_path = null;
const env_vars = [];
const worker = await EdgeRuntime.userWorkers.create({
service_path,
memory_limit_mb,
worker_timeout_ms,
no_module_cache,
import_map_path,
env_vars
});
return worker.fetch(req);
try {
const worker = await EdgeRuntime.userWorkers.create({
service_path,
memory_limit_mb,
worker_timeout_ms,
no_module_cache,
import_map_path,
env_vars
});
return worker.fetch(req);
} catch (e) {
const error = { msg: e.toString() }
return new Response(
JSON.stringify(error),
{ status: 500, headers: { "Content-Type": "application/json" } },
)
}
})

0 comments on commit 9b29e30

Please sign in to comment.