From 95207c8ceced25ccdce4856e5ac8185eda7176d0 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov <65451602+astrophysik@users.noreply.github.com> Date: Tue, 27 Aug 2024 18:00:16 +0300 Subject: [PATCH] add get_running_fork_id (#1054) --- builtin-functions/kphp-light/functions.txt | 2 ++ runtime-light/component/component.cpp | 3 +++ runtime-light/coroutine/awaitable.h | 30 +++++++++++++++++----- runtime-light/stdlib/fork/fork-api.h | 9 +++++++ runtime-light/stdlib/fork/fork-context.h | 2 ++ 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/builtin-functions/kphp-light/functions.txt b/builtin-functions/kphp-light/functions.txt index 0ab5a3dfee..65fa664efa 100644 --- a/builtin-functions/kphp-light/functions.txt +++ b/builtin-functions/kphp-light/functions.txt @@ -78,6 +78,8 @@ function strlen ($str ::: string) ::: int; // === Fork ======================================================================================= +function get_running_fork_id() ::: future ; + /** @kphp-extern-func-info interruptible cpp_template_call */ function wait(future | false $id, float $timeout = -1.0) ::: ^1[*] | null; diff --git a/runtime-light/component/component.cpp b/runtime-light/component/component.cpp index e1a81c95e1..4d7f4b678c 100644 --- a/runtime-light/component/component.cpp +++ b/runtime-light/component/component.cpp @@ -26,6 +26,7 @@ void ComponentState::process_platform_updates() noexcept { for (;;) { // check if platform asked for yield if (static_cast(platform_ctx.please_yield.load())) { // tell the scheduler that we are about to yield + php_debug("platform asked for yield"); const auto schedule_status{scheduler.schedule(ScheduleEvent::Yield{})}; poll_status = schedule_status == ScheduleStatus::Error ? PollStatus::PollFinishedError : PollStatus::PollReschedule; return; @@ -34,6 +35,7 @@ void ComponentState::process_platform_updates() noexcept { // try taking update from the platform if (uint64_t stream_d{}; static_cast(platform_ctx.take_update(std::addressof(stream_d)))) { if (opened_streams_.contains(stream_d)) { // update on opened stream + php_debug("took update on stream %" PRIu64, stream_d); switch (scheduler.schedule(ScheduleEvent::UpdateOnStream{.stream_d = stream_d})) { case ScheduleStatus::Resumed: { // scheduler's resumed a coroutine waiting for update break; @@ -48,6 +50,7 @@ void ComponentState::process_platform_updates() noexcept { } } } else { // update on incoming stream + php_debug("got new incoming stream %" PRIu64, stream_d); if (standard_stream_ != INVALID_PLATFORM_DESCRIPTOR) { php_warning("skip new incoming stream since previous one is not closed"); release_stream(stream_d); diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index f55d2f1228..2d5f04f256 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -46,9 +46,18 @@ namespace awaitable_impl_ { enum class State : uint8_t { Init, Suspend, Ready, End }; +class fork_id_watcher_t { + int64_t fork_id{ForkComponentContext::get().running_fork_id}; + +protected: + void await_resume() const noexcept { + ForkComponentContext::get().running_fork_id = fork_id; + } +}; + } // namespace awaitable_impl_ -class wait_for_update_t { +class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t { uint64_t stream_d; SuspendToken suspend_token; awaitable_impl_::State state{awaitable_impl_::State::Init}; @@ -86,6 +95,7 @@ class wait_for_update_t { } constexpr void await_resume() noexcept { + fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; } @@ -101,7 +111,7 @@ class wait_for_update_t { // ================================================================================================ -class wait_for_incoming_stream_t { +class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t { SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::IncomingStream{}}; awaitable_impl_::State state{awaitable_impl_::State::Init}; @@ -135,6 +145,7 @@ class wait_for_incoming_stream_t { } uint64_t await_resume() noexcept { + fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; const auto incoming_stream_d{get_component_context()->take_incoming_stream()}; php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR); @@ -153,7 +164,7 @@ class wait_for_incoming_stream_t { // ================================================================================================ -class wait_for_reschedule_t { +class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t { SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}}; awaitable_impl_::State state{awaitable_impl_::State::Init}; @@ -186,6 +197,7 @@ class wait_for_reschedule_t { } constexpr void await_resume() noexcept { + fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; } @@ -201,7 +213,7 @@ class wait_for_reschedule_t { // ================================================================================================ -class wait_for_timer_t { +class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t { std::chrono::nanoseconds duration; uint64_t timer_d{INVALID_PLATFORM_DESCRIPTOR}; SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}}; @@ -245,6 +257,7 @@ class wait_for_timer_t { } constexpr void await_resume() noexcept { + fork_id_watcher_t::await_resume(); state = awaitable_impl_::State::End; } @@ -260,7 +273,7 @@ class wait_for_timer_t { // ================================================================================================ -class start_fork_t { +class start_fork_t : awaitable_impl_::fork_id_watcher_t { public: /** * Fork start policy: @@ -302,6 +315,7 @@ class start_fork_t { case execution::fork: { suspend_token.first = current_coro; continuation = fork_coro; + ForkComponentContext::get().running_fork_id = fork_id; break; } case execution::self: { @@ -317,7 +331,8 @@ class start_fork_t { return continuation; } - constexpr int64_t await_resume() const noexcept { + int64_t await_resume() const noexcept { + fork_id_watcher_t::await_resume(); return fork_id; } }; @@ -325,7 +340,7 @@ class start_fork_t { // ================================================================================================ template -class wait_fork_t { +class wait_fork_t : awaitable_impl_::fork_id_watcher_t { int64_t fork_id; task_t fork_task; task_t::awaiter_t fork_awaiter; @@ -358,6 +373,7 @@ class wait_fork_t { } await_resume_t await_resume() noexcept { + fork_id_watcher_t::await_resume(); if constexpr (std::is_void_v) { fork_awaiter.await_resume(); } else { diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index 55e05d0fe4..20b1fcc1b4 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -24,6 +24,9 @@ constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast requires(is_optional::value || std::same_as) task_t f$wait(int64_t fork_id, double timeout = -1.0) noexcept { auto &fork_ctx{ForkComponentContext::get()}; @@ -44,6 +47,12 @@ requires(is_optional::value || std::same_as) task_t f$wait(Optio co_return co_await f$wait(fork_id_opt.has_value() ? fork_id_opt.val() : INVALID_FORK_ID, timeout); } +// === Non-blocking API ============================================================================ + +inline int64_t f$get_running_fork_id() noexcept { + return ForkComponentContext::get().running_fork_id; +} + task_t f$sched_yield() noexcept; task_t f$sched_yield_sleep(double duration) noexcept; diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h index 0788204439..153e34fdd0 100644 --- a/runtime-light/stdlib/fork/fork-context.h +++ b/runtime-light/stdlib/fork/fork-context.h @@ -42,6 +42,8 @@ class ForkComponentContext { friend class wait_fork_t; public: + int64_t running_fork_id{FORK_ID_INIT}; + explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept : forks(unordered_map>::allocator_type{memory_resource}) {}