Skip to content

Commit

Permalink
Merge pull request #107 from supabase/load-mod-in-boot
Browse files Browse the repository at this point in the history
fix: move main module load to worker boot phase
  • Loading branch information
laktek authored May 29, 2023
2 parents c393f44 + 1c55447 commit 355afaa
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 38 deletions.
58 changes: 27 additions & 31 deletions crates/base/src/edge_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use crate::js_worker::module_loader;
use anyhow::{anyhow, Error};
use deno_core::error::AnyError;
use deno_core::url::Url;
use deno_core::JsRuntime;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::{located_script_name, serde_v8};
use deno_core::{located_script_name, serde_v8, JsRuntime, ModuleId, RuntimeOptions};
use import_map::{parse_from_json, ImportMap, ImportMapDiagnostic};
use log::{debug, error, warn};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -81,7 +78,7 @@ fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {

pub struct EdgeRuntime {
pub js_runtime: JsRuntime,
pub main_module_url: ModuleSpecifier,
pub main_module_id: ModuleId,
pub is_user_runtime: bool,
pub env_vars: HashMap<String, String>,
pub conf: EdgeContextOpts,
Expand Down Expand Up @@ -117,7 +114,7 @@ fn get_error_class_name(e: &AnyError) -> &'static str {
}

impl EdgeRuntime {
pub fn new(opts: EdgeContextInitOpts) -> Result<Self, Error> {
pub async fn new(opts: EdgeContextInitOpts) -> Result<Self, Error> {
let EdgeContextInitOpts {
service_path,
no_module_cache,
Expand Down Expand Up @@ -215,9 +212,11 @@ impl EdgeRuntime {
op_state.put::<sb_env::EnvVars>(env_vars);
}

let main_module_id = js_runtime.load_main_module(&main_module_url, None).await?;

Ok(Self {
js_runtime,
main_module_url,
main_module_id,
is_user_runtime,
env_vars,
conf,
Expand Down Expand Up @@ -271,10 +270,7 @@ impl EdgeRuntime {
let mut js_runtime = self.js_runtime;

let future = async move {
let mod_id = js_runtime
.load_main_module(&self.main_module_url, None)
.await?;
let mod_result = js_runtime.mod_evaluate(mod_id);
let mod_result = js_runtime.mod_evaluate(self.main_module_id);

let result: Result<EdgeCallResult, Error> = tokio::select! {
_ = js_runtime.run_event_loop(false) => {
Expand Down Expand Up @@ -307,13 +303,7 @@ impl EdgeRuntime {
result
};

let res = future.await;

if res.is_err() {
println!("worker thread panicked {:?}", res.as_ref().err().unwrap());
}

res
future.await
}

fn start_controller_thread(
Expand Down Expand Up @@ -383,15 +373,15 @@ mod test {
use tokio::sync::mpsc;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

fn create_runtime(
async fn create_runtime(
path: Option<PathBuf>,
env_vars: Option<HashMap<String, String>>,
user_conf: Option<EdgeContextOpts>,
) -> EdgeRuntime {
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();

EdgeRuntime::new(EdgeContextInitOpts {
service_path: path.unwrap_or(PathBuf::from("./examples/main")),
service_path: path.unwrap_or(PathBuf::from("./test_cases/main")),
no_module_cache: false,
import_map_path: None,
env_vars: env_vars.unwrap_or(Default::default()),
Expand All @@ -403,6 +393,7 @@ mod test {
}
},
})
.await
.unwrap()
}

Expand All @@ -414,7 +405,7 @@ mod test {
// Main Runtime should have access to `EdgeRuntime`
#[tokio::test]
async fn test_main_runtime_creation() {
let mut runtime = create_runtime(None, None, None);
let mut runtime = create_runtime(None, None, None).await;

{
let scope = &mut runtime.js_runtime.handle_scope();
Expand All @@ -437,7 +428,8 @@ mod test {
None,
None,
Some(EdgeContextOpts::UserWorker(Default::default())),
);
)
.await;

{
let scope = &mut runtime.js_runtime.handle_scope();
Expand All @@ -455,7 +447,7 @@ mod test {

#[tokio::test]
async fn test_main_rt_fs() {
let mut main_rt = create_runtime(None, Some(std::env::vars().collect()), None);
let mut main_rt = create_runtime(None, Some(std::env::vars().collect()), None).await;

let global_value_deno_read_file_script = main_rt
.js_runtime
Expand All @@ -477,12 +469,13 @@ mod test {
#[tokio::test]
async fn test_os_env_vars() {
std::env::set_var("Supa_Test", "Supa_Value");
let mut main_rt = create_runtime(None, Some(std::env::vars().collect()), None);
let mut main_rt = create_runtime(None, Some(std::env::vars().collect()), None).await;
let mut user_rt = create_runtime(
None,
None,
Some(EdgeContextOpts::UserWorker(Default::default())),
);
)
.await;
assert!(!main_rt.env_vars.is_empty());
assert!(user_rt.env_vars.is_empty());

Expand Down Expand Up @@ -532,7 +525,7 @@ mod test {
assert!(user_serde_deno_env.unwrap().is_null());
}

fn create_basic_user_runtime(
async fn create_basic_user_runtime(
path: &str,
memory_limit: u64,
worker_timeout_ms: u64,
Expand All @@ -548,6 +541,7 @@ mod test {
pool_msg_tx: None,
})),
)
.await
}

// FIXME: Disabling these tests since they are flaky in CI
Expand All @@ -569,7 +563,7 @@ mod test {

#[flaky_test]
async fn test_unresolved_promise() {
let user_rt = create_basic_user_runtime("./test_cases/unresolved_promise", 100, 1000);
let user_rt = create_basic_user_runtime("./test_cases/unresolved_promise", 100, 1000).await;
let (_tx, unix_stream_rx) = create_user_rt_params_to_run();
let data = user_rt.run(unix_stream_rx).await.unwrap();
assert_eq!(data, EdgeCallResult::ModuleEvaluationTimedOut);
Expand All @@ -578,7 +572,8 @@ mod test {
#[flaky_test]
async fn test_delayed_promise() {
let user_rt =
create_basic_user_runtime("./test_cases/resolve_promise_after_timeout", 100, 1000);
create_basic_user_runtime("./test_cases/resolve_promise_after_timeout", 100, 1000)
.await;
let (_tx, unix_stream_rx) = create_user_rt_params_to_run();
let data = user_rt.run(unix_stream_rx).await.unwrap();
assert_eq!(data, EdgeCallResult::TimeOut);
Expand All @@ -587,23 +582,24 @@ mod test {
#[flaky_test]
async fn test_success_delayed_promise() {
let user_rt =
create_basic_user_runtime("./test_cases/resolve_promise_before_timeout", 100, 1000);
create_basic_user_runtime("./test_cases/resolve_promise_before_timeout", 100, 1000)
.await;
let (_tx, unix_stream_rx) = create_user_rt_params_to_run();
let data = user_rt.run(unix_stream_rx).await.unwrap();
assert_eq!(data, EdgeCallResult::Completed);
}

#[flaky_test]
async fn test_heap_limits_reached() {
let user_rt = create_basic_user_runtime("./test_cases/heap_limit", 5, 1000);
let user_rt = create_basic_user_runtime("./test_cases/heap_limit", 5, 1000).await;
let (_tx, unix_stream_rx) = create_user_rt_params_to_run();
let data = user_rt.run(unix_stream_rx).await.unwrap();
assert_eq!(data, EdgeCallResult::HeapLimitReached);
}

#[flaky_test]
async fn test_read_file_user_rt() {
let user_rt = create_basic_user_runtime("./test_cases/readFile", 5, 1000);
let user_rt = create_basic_user_runtime("./test_cases/readFile", 5, 1000).await;
let (_tx, unix_stream_rx) = create_user_rt_params_to_run();
let data = user_rt.run(unix_stream_rx).await.unwrap();
match data {
Expand Down
16 changes: 9 additions & 7 deletions crates/base/src/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub async fn create_worker(
EdgeContextOpts::MainWorker(_opts) => (None, None),
};

// spawn a thread to run the worker
let _handle: thread::JoinHandle<Result<(), Error>> = thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -43,16 +44,13 @@ pub async fn create_worker(
let local = tokio::task::LocalSet::new();

let result: Result<EdgeCallResult, Error> = local.block_on(&runtime, async {
let result = EdgeRuntime::new(init_opts);

match result {
match EdgeRuntime::new(init_opts).await {
Err(err) => {
let _ = worker_boot_result_tx.send(Err(anyhow!("worker boot error")));
bail!(err)
}
Ok(worker) => {
let _ = worker_boot_result_tx.send(Ok(()));
// start the worker
worker.run(unix_stream_rx).await
}
}
Expand All @@ -69,16 +67,20 @@ pub async fn create_worker(
// remove the worker from pool
if let Some(k) = worker_key {
if let Some(tx) = pool_msg_tx {
if tx.send(UserWorkerMsgs::Shutdown(k)).is_err() {
error!("failed to send the shutdown signal to user worker pool");
let res = tx.send(UserWorkerMsgs::Shutdown(k));
if res.is_err() {
error!(
"failed to send the shutdown signal to user worker pool: {:?}",
res.unwrap_err()
);
}
}
}

Ok(())
});

// create an async task waiting for a request
// create an async task waiting for requests for worker
let (worker_req_tx, mut worker_req_rx) = mpsc::unbounded_channel::<WorkerRequestMsg>();

let worker_req_handle: tokio::task::JoinHandle<Result<(), Error>> =
Expand Down
20 changes: 20 additions & 0 deletions crates/base/test_cases/invalid_imports/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import * from "https://deno.land/[email protected]/http/server.ts"

interface reqPayload {
name: string;
}

console.log('server started modified');

serve(async (req: Request) => {
const { name } : reqPayload = await req.json();
const data = {
message: `Hello ${name} from foo!`,
test: 'foo'
}

return new Response(
JSON.stringify(data),
{ headers: { "Content-Type": "application/json", "Connection": "keep-alive" } },
)
}, { port: 9005 })
20 changes: 20 additions & 0 deletions crates/base/tests/worker_boot_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::collections::HashMap;

use base::worker_ctx::create_worker;
use sb_worker_context::essentials::{EdgeContextInitOpts, EdgeContextOpts, EdgeUserRuntimeOpts};

#[tokio::test]
async fn test_worker_boot_invalid_imports() {
let user_rt_opts = EdgeUserRuntimeOpts::default();
let opts = EdgeContextInitOpts {
service_path: "./test_cases/invalid_imports".into(),
no_module_cache: false,
import_map_path: None,
env_vars: HashMap::new(),
conf: EdgeContextOpts::UserWorker(user_rt_opts),
};
let result = create_worker(opts).await;

assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), "worker boot error");
}

0 comments on commit 355afaa

Please sign in to comment.