Skip to content

Commit

Permalink
Refactor eventloop to enable async comms
Browse files Browse the repository at this point in the history
  • Loading branch information
halleysfifthinc committed Jan 15, 2025
1 parent fa415b8 commit 6e2a01b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 31 deletions.
104 changes: 74 additions & 30 deletions src/eventloop.jl
Original file line number Diff line number Diff line change
@@ -1,51 +1,95 @@
function eventloop(socket)
task_local_storage(:IJulia_task, "write task")
try
while true
msg = recv_ipython(socket)
try
send_status("busy", msg)
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, msg)
catch e
# Try to keep going if we get an exception, but
# send the exception traceback to the front-ends.
# (Ignore SIGINT since this may just be a user-requested
# kernel interruption to interrupt long calculations.)
if !isa(e, InterruptException)
content = error_content(e, msg="KERNEL EXCEPTION")
map(s -> println(orig_stderr[], s), content["traceback"])
send_ipython(publish[], msg_pub(execute_msg, "error", content))
function eventloop(socket::Socket, msgs::Channel, handlers)
while true
try
while true
msg = take!(msgs)
try
send_status("busy", msg)
invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, msg)
catch e
# Try to keep going if we get an exception, but
# send the exception traceback to the front-ends.
# (Ignore SIGINT since this may just be a user-requested
# kernel interruption to interrupt long calculations.)
if !isa(e, InterruptException)
content = error_content(e, msg="KERNEL EXCEPTION")
map(s -> println(orig_stderr[], s), content["traceback"])
send_ipython(publish[], msg_pub(execute_msg, "error", content))
else
rethrow()
end
finally
flush_all()
send_status("idle", msg)
end
finally
flush_all()
send_status("idle", msg)
yield()
end
catch e
# the Jupyter manager may send us a SIGINT if the user
# chooses to interrupt the kernel; don't crash on this
if !isa(e, InterruptException)
rethrow()
end
end
catch e
# the Jupyter manager may send us a SIGINT if the user
# chooses to interrupt the kernel; don't crash on this
if isa(e, InterruptException)
eventloop(socket)
else
rethrow()
end
yield()
end
end

const iopub_task = Ref{Task}()
const requests_task = Ref{Task}()
function waitloop()
@async eventloop(control[])
requests_task[] = @async eventloop(requests[])
control_msgs = Channel{Msg}(32) do ch
task_local_storage(:IJulia_task, "control_msgs task")
while isopen(control[])
msg::Msg = recv_ipython(control[])
put!(ch, msg)
yield()
end
end

iopub_msgs = Channel{Msg}(32)
request_msgs = Channel{Msg}(32) do ch
task_local_storage(:IJulia_task, "request_msgs task")
while isopen(requests[])
msg::Msg = recv_ipython(requests[])
if haskey(iopub_handlers, msg.header["msg_type"])
put!(iopub_msgs, msg)
else
put!(ch, msg)
end
yield()
end
end

control_task = @async begin
task_local_storage(:IJulia_task, "control handle/write task")
eventloop(control[], control_msgs, handlers)
end
requests_task[] = @async begin
task_local_storage(:IJulia_task, "requests handle/write task")
eventloop(requests[], request_msgs, handlers)
end
iopub_task[] = @async begin
task_local_storage(:IJulia_task, "iopub handle/write task")
eventloop(requests[], iopub_msgs, iopub_handlers)
end

bind(control_msgs, control_task)
bind(request_msgs, requests_task[])
bind(iopub_msgs, iopub_task[])

while true
try
wait()
catch e
# send interrupts (user SIGINT) to the code-execution task
if isa(e, InterruptException)
@async Base.throwto(iopub_task[], e)
@async Base.throwto(requests_task[], e)
else
rethrow()
end
end
end
end

9 changes: 8 additions & 1 deletion src/handlers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ end

function interrupt_request(socket, msg)
@async Base.throwto(requests_task[], InterruptException())
@async Base.throwto(iopub_task[], InterruptException())
send_ipython(requests[], msg_reply(msg, "interrupt_reply", Dict()))
end

Expand All @@ -291,5 +292,11 @@ const handlers = Dict{String,Function}(
"comm_open" => comm_open,
"comm_info_request" => comm_info_request,
"comm_msg" => comm_msg,
"comm_close" => comm_close
"comm_close" => comm_close,
)

const iopub_handlers = Dict{String,Function}(
"comm_open" => comm_open,
"comm_msg" => comm_msg,
"comm_close" => comm_close,
)

0 comments on commit 6e2a01b

Please sign in to comment.