Skip to content

Commit

Permalink
add get_running_fork_id (#1054)
Browse files Browse the repository at this point in the history
  • Loading branch information
astrophysik authored Aug 27, 2024
1 parent bd7874b commit 95207c8
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
2 changes: 2 additions & 0 deletions builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ function strlen ($str ::: string) ::: int;

// === Fork =======================================================================================

function get_running_fork_id() ::: future <void>;

/** @kphp-extern-func-info interruptible cpp_template_call */
function wait(future<any> | false $id, float $timeout = -1.0) ::: ^1[*] | null;

Expand Down
3 changes: 3 additions & 0 deletions runtime-light/component/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void ComponentState::process_platform_updates() noexcept {
for (;;) {
// check if platform asked for yield
if (static_cast<bool>(platform_ctx.please_yield.load())) { // tell the scheduler that we are about to yield
php_debug("platform asked for yield");
const auto schedule_status{scheduler.schedule(ScheduleEvent::Yield{})};
poll_status = schedule_status == ScheduleStatus::Error ? PollStatus::PollFinishedError : PollStatus::PollReschedule;
return;
Expand All @@ -34,6 +35,7 @@ void ComponentState::process_platform_updates() noexcept {
// try taking update from the platform
if (uint64_t stream_d{}; static_cast<bool>(platform_ctx.take_update(std::addressof(stream_d)))) {
if (opened_streams_.contains(stream_d)) { // update on opened stream
php_debug("took update on stream %" PRIu64, stream_d);
switch (scheduler.schedule(ScheduleEvent::UpdateOnStream{.stream_d = stream_d})) {
case ScheduleStatus::Resumed: { // scheduler's resumed a coroutine waiting for update
break;
Expand All @@ -48,6 +50,7 @@ void ComponentState::process_platform_updates() noexcept {
}
}
} else { // update on incoming stream
php_debug("got new incoming stream %" PRIu64, stream_d);
if (standard_stream_ != INVALID_PLATFORM_DESCRIPTOR) {
php_warning("skip new incoming stream since previous one is not closed");
release_stream(stream_d);
Expand Down
30 changes: 23 additions & 7 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,18 @@ namespace awaitable_impl_ {

enum class State : uint8_t { Init, Suspend, Ready, End };

class fork_id_watcher_t {
int64_t fork_id{ForkComponentContext::get().running_fork_id};

protected:
void await_resume() const noexcept {
ForkComponentContext::get().running_fork_id = fork_id;
}
};

} // namespace awaitable_impl_

class wait_for_update_t {
class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t {
uint64_t stream_d;
SuspendToken suspend_token;
awaitable_impl_::State state{awaitable_impl_::State::Init};
Expand Down Expand Up @@ -86,6 +95,7 @@ class wait_for_update_t {
}

constexpr void await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
}

Expand All @@ -101,7 +111,7 @@ class wait_for_update_t {

// ================================================================================================

class wait_for_incoming_stream_t {
class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t {
SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::IncomingStream{}};
awaitable_impl_::State state{awaitable_impl_::State::Init};

Expand Down Expand Up @@ -135,6 +145,7 @@ class wait_for_incoming_stream_t {
}

uint64_t await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
const auto incoming_stream_d{get_component_context()->take_incoming_stream()};
php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR);
Expand All @@ -153,7 +164,7 @@ class wait_for_incoming_stream_t {

// ================================================================================================

class wait_for_reschedule_t {
class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t {
SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}};
awaitable_impl_::State state{awaitable_impl_::State::Init};

Expand Down Expand Up @@ -186,6 +197,7 @@ class wait_for_reschedule_t {
}

constexpr void await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
}

Expand All @@ -201,7 +213,7 @@ class wait_for_reschedule_t {

// ================================================================================================

class wait_for_timer_t {
class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
std::chrono::nanoseconds duration;
uint64_t timer_d{INVALID_PLATFORM_DESCRIPTOR};
SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}};
Expand Down Expand Up @@ -245,6 +257,7 @@ class wait_for_timer_t {
}

constexpr void await_resume() noexcept {
fork_id_watcher_t::await_resume();
state = awaitable_impl_::State::End;
}

Expand All @@ -260,7 +273,7 @@ class wait_for_timer_t {

// ================================================================================================

class start_fork_t {
class start_fork_t : awaitable_impl_::fork_id_watcher_t {
public:
/**
* Fork start policy:
Expand Down Expand Up @@ -302,6 +315,7 @@ class start_fork_t {
case execution::fork: {
suspend_token.first = current_coro;
continuation = fork_coro;
ForkComponentContext::get().running_fork_id = fork_id;
break;
}
case execution::self: {
Expand All @@ -317,15 +331,16 @@ class start_fork_t {
return continuation;
}

constexpr int64_t await_resume() const noexcept {
int64_t await_resume() const noexcept {
fork_id_watcher_t::await_resume();
return fork_id;
}
};

// ================================================================================================

template<typename T>
class wait_fork_t {
class wait_fork_t : awaitable_impl_::fork_id_watcher_t {
int64_t fork_id;
task_t<T> fork_task;
task_t<T>::awaiter_t fork_awaiter;
Expand Down Expand Up @@ -358,6 +373,7 @@ class wait_fork_t {
}

await_resume_t await_resume() noexcept {
fork_id_watcher_t::await_resume();
if constexpr (std::is_void_v<await_resume_t>) {
fork_awaiter.await_resume();
} else {
Expand Down
9 changes: 9 additions & 0 deletions runtime-light/stdlib/fork/fork-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast<std::chrono::nano

} // namespace fork_api_impl_


// === Blocking API ================================================================================

template<typename T>
requires(is_optional<T>::value || std::same_as<T, mixed>) task_t<T> f$wait(int64_t fork_id, double timeout = -1.0) noexcept {
auto &fork_ctx{ForkComponentContext::get()};
Expand All @@ -44,6 +47,12 @@ requires(is_optional<T>::value || std::same_as<T, mixed>) task_t<T> f$wait(Optio
co_return co_await f$wait<T>(fork_id_opt.has_value() ? fork_id_opt.val() : INVALID_FORK_ID, timeout);
}

// === Non-blocking API ============================================================================

inline int64_t f$get_running_fork_id() noexcept {
return ForkComponentContext::get().running_fork_id;
}

task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(double duration) noexcept;
2 changes: 2 additions & 0 deletions runtime-light/stdlib/fork/fork-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class ForkComponentContext {
friend class wait_fork_t;

public:
int64_t running_fork_id{FORK_ID_INIT};

explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: forks(unordered_map<int64_t, task_t<void>>::allocator_type{memory_resource}) {}

Expand Down

0 comments on commit 95207c8

Please sign in to comment.