Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DBus calls Async (2nd attempt) #608

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1ebf125
Problem: dbus call were not async
olethanh Apr 8, 2024
13e9e6d
use mypy procotol method for typing
olethanh Apr 8, 2024
421515b
fix CI, specify bus_type
olethanh Apr 11, 2024
0c43547
fix init in async, isort
olethanh Apr 11, 2024
fc7d3da
mypy
olethanh Apr 11, 2024
f8c9343
dbus fast is not in debian 11
olethanh Apr 11, 2024
afbdd0c
Problem: dbus call were not async
olethanh Apr 8, 2024
2a50080
use mypy procotol method for typing
olethanh Apr 8, 2024
a77cc0d
fix CI, specify bus_type
olethanh Apr 11, 2024
56f43de
fix init in async, isort
olethanh Apr 11, 2024
ee3cfbb
mypy
olethanh Apr 11, 2024
dc961fc
dbus fast is not in debian 11
olethanh Apr 11, 2024
4ac9099
Problem: Makefile for publishing example were not working
olethanh Apr 16, 2024
18bb56f
Problem: could not start Instances from command line (#597)
olethanh Apr 25, 2024
84614a5
Solve last CORS issues about duplicated headers (#604)
nesitor Apr 26, 2024
5a01c42
Fix: Diagnostic API was not updated
hoh Apr 26, 2024
4681906
Fix not awaited async call
olethanh Apr 26, 2024
e7086b2
Merge remote-tracking branch 'origin/main' into ol-dbus-async-v2
olethanh Apr 26, 2024
314666b
Connect to the bus on demand to avoid having to call setup
olethanh Apr 26, 2024
0c59d47
fix is running requiring async
olethanh Apr 26, 2024
2e2a445
working
olethanh Apr 29, 2024
067d6ee
restore
olethanh Apr 29, 2024
eacd7da
revert change to running
olethanh Apr 29, 2024
bc496dc
CI check system usage endpoint
olethanh Apr 29, 2024
12743f6
add unit test for system usage
olethanh Apr 29, 2024
e58b9ad
add unit test for system usage
olethanh Apr 29, 2024
6174c96
Set up a fresh web_app for each test as required by aiohttp
olethanh Apr 30, 2024
f913398
revert local compat change
olethanh Apr 30, 2024
fdbc765
remove force settings the loop which was causing problem with future …
olethanh Apr 30, 2024
f59cc5f
fix other double loop problems
olethanh Apr 30, 2024
4aa3eb8
Fix inconsistant execution state
olethanh Apr 30, 2024
69ed555
Remove unused loop params
olethanh Apr 30, 2024
6e84b26
Apparently CI also don't have matching arch
olethanh Apr 30, 2024
b800612
Fix test description
olethanh Apr 30, 2024
5fe46ac
Problem: allocation endpoints was not tested
olethanh Apr 30, 2024
82398b9
Merge branch 'ol-test-allocation-endpoints' into ol-dbus-async-v2
olethanh Apr 30, 2024
5aabea2
style
olethanh Apr 30, 2024
c000350
black
olethanh May 2, 2024
975ada6
Fix bug found on debian 11 / python 3.9 droplet
olethanh May 2, 2024
0e2ca01
Try another way to fix python 3.9 / Debian 11
olethanh May 3, 2024
bb6073c
Merge remote-tracking branch 'origin/main' into ol-dbus-async-v2
olethanh May 3, 2024
f0e7842
Merge remote-tracking branch 'origin/main' into ol-dbus-async-v2
olethanh May 3, 2024
ad0b397
Merge remote-tracking branch 'origin/main' into ol-dbus-async-v2
olethanh May 15, 2024
f6ed6ff
Merge remote-tracking branch 'origin/main' into ol-dbus-async-v2
olethanh May 17, 2024
b02c3c2
Split the systems Protocol in own module
olethanh May 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ debian-package-code:
cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.4' 'jwskate==0.8.0' 'eth-account==0.9.0' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12'
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.4' 'jwskate==0.8.0' 'eth-account==0.9.0' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'dbus-fast==1.90.1'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo
Expand Down
2 changes: 1 addition & 1 deletion packaging/aleph-vm/DEBIAN/control
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ Version: 0.1.8
Architecture: all
Maintainer: Aleph.im
Description: Aleph.im VM execution engine
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,python3-dbus,btrfs-progs,nftables,python3-jwcrypto
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,btrfs-progs,nftables,python3-jwcrypto
Section: aleph-im
Priority: Extra
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ dependencies = [
"packaging==23.2",
"jsonschema==4.19.1",
"qmp==0.0.1",
"dbus-python==1.3.2",
"systemd-python==235",
"dbus-fast==1.90.1",
"systemd-python==235",
"superfluid~=0.2.1",
"sqlalchemy[asyncio]>=2.0",
Expand Down
6 changes: 1 addition & 5 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ class VmExecution:

@property
def is_running(self) -> bool:
return (
self.systemd_manager.is_service_active(self.controller_service)
if self.persistent and self.systemd_manager
else bool(self.times.starting_at and not self.times.stopping_at)
)
return bool(self.times.starting_at and not self.times.stopping_at)

@property
def is_stopping(self) -> bool:
Expand Down
10 changes: 7 additions & 3 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,16 @@ async def stop_all_vms(app: web.Application):

def run():
"""Run the VM Supervisor."""
# Loop creation set here to avoid bug with Future on different loop

loop = asyncio.new_event_loop()
# apparently needed for Python 3.9 / Debian 11
asyncio.set_event_loop(loop)
settings.check()

engine = setup_engine()
asyncio.run(create_tables(engine))

loop = asyncio.new_event_loop()
pool = VmPool(loop)
pool.setup()

Expand All @@ -170,10 +174,10 @@ def run():
app.on_cleanup.append(stop_all_vms)

logger.info("Loading existing executions ...")
asyncio.run(pool.load_persistent_executions())
loop.run_until_complete(pool.load_persistent_executions())

logger.info(f"Starting the web server on http://{settings.SUPERVISOR_HOST}:{settings.SUPERVISOR_PORT}")
web.run_app(app, host=settings.SUPERVISOR_HOST, port=settings.SUPERVISOR_PORT)
web.run_app(app, host=settings.SUPERVISOR_HOST, port=settings.SUPERVISOR_PORT, loop=loop)
except OSError as e:
if e.errno == 98:
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async def operate_reboot(request: web.Request, authenticated_sender: str) -> web
if execution.is_running:
logger.info(f"Rebooting {execution.vm_hash}")
if execution.persistent:
pool.systemd_manager.restart(execution.controller_service)
await pool.systemd_manager.restart(execution.controller_service)
else:
await pool.stop_vm(vm_hash)
pool.forget_vm(vm_hash)
Expand Down
18 changes: 13 additions & 5 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop):
self.executions = {}
self.message_cache = {}

# apparently needed for Python 3.9 / Debian 11
asyncio.set_event_loop(loop)
self.creation_lock = asyncio.Lock()

Expand Down Expand Up @@ -124,7 +125,7 @@ async def create_a_vm(

# Start VM and snapshots automatically
if execution.persistent:
self.systemd_manager.enable_and_start(execution.controller_service)
await self.systemd_manager.enable_and_start(execution.controller_service)
await execution.wait_for_init()
if execution.is_program and execution.vm:
await execution.vm.load_configuration()
Expand Down Expand Up @@ -191,7 +192,7 @@ async def stop_vm(self, vm_hash: ItemHash) -> Optional[VmExecution]:
async def stop_persistent_execution(self, execution: VmExecution):
"""Stop persistent VMs in the pool."""
assert execution.persistent, "Execution isn't persistent"
self.systemd_manager.stop_and_disable(execution.controller_service)
await self.systemd_manager.stop_and_disable(execution.controller_service)
await execution.stop()

def forget_vm(self, vm_hash: ItemHash) -> None:
Expand Down Expand Up @@ -239,8 +240,10 @@ async def load_persistent_executions(self):
persistent=saved_execution.persistent,
)

if execution.is_running:
# TODO: Improve the way that we re-create running execution
if await self.systemd_manager.is_service_active(
execution.controller_service
): # TODO: Improve the way that we re-create running execution
Comment on lines +243 to +245
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO having different ways to check if an instance is running here or at the method used on the VMExecution class, can provide some issues in the future, because we don't follow always the same pattern to check it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently in the is_running we actually to have two different check pattern according if we are running systemd or not, which is why I unified the is_running.
However at start up, when we rehydrate, we want to check if the Instance process is actually running an reconnect to it so it's ok to have a different logic there, at least that is munderstanding of it, please correct if I'm wrong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, but IMO having the same way to check if a VM is running independently of the method used (systemd or direct) it's better that rehydrate fields and trust on that rehydration.

logger.debug("Execution %s is still running in systemd, reconnecting", execution.vm_hash)
await execution.prepare()
if self.network:
vm_type = VmType.from_message_content(execution.message)
Expand All @@ -251,16 +254,21 @@ async def load_persistent_executions(self):
vm = execution.create(vm_id=vm_id, tap_interface=tap_interface, prepare=False)
await vm.start_guest_api()
execution.ready_event.set()
execution.times.starting_at = execution.times.starting_at or datetime.now(tz=timezone.utc)
execution.times.started_at = datetime.now(tz=timezone.utc)

execution.times.stopping_at = None
execution.times.stopped_at = None
self._schedule_forget_on_stop(execution)

# Start the snapshot manager for the VM
if vm.support_snapshot and self.snapshot_manager:
await self.snapshot_manager.start_for(vm=execution.vm)

assert execution.is_running
self.executions[vm_hash] = execution

else:
logger.debug(("Execution %s is not running in systemd, reconnecting", execution.vm_hash))
execution.uuid = saved_execution.uuid
await execution.record_usage()

Expand Down
112 changes: 72 additions & 40 deletions src/aleph/vm/systemd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"""

import logging
from typing import Optional

import dbus
from dbus import DBusException, SystemBus
from dbus.proxies import Interface
from dbus_fast import BusType, DBusError
from dbus_fast.aio import MessageBus, ProxyObject

from aleph.vm.systemd_helpers import UnitFileState, Mode, ActiveState, SystemdProxy, UnitProxy

logger = logging.getLogger(__name__)

Expand All @@ -17,60 +19,90 @@ class SystemDManager:
Used to manage the systemd services on the host on Linux.
"""

bus: SystemBus
manager: Interface
_bus: Optional[MessageBus] = None
_manager: Optional[SystemdProxy] = None

def __init__(self):
self.bus = dbus.SystemBus()
systemd = self.bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1")
self.manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager")

def stop_and_disable(self, service: str) -> None:
if self.is_service_active(service):
self.stop(service)
if self.is_service_enabled(service):
self.disable(service)

def enable(self, service: str) -> None:
self.manager.EnableUnitFiles([service], False, True)
pass

async def get_bus(self):
if self._bus is None:
self._bus = MessageBus(bus_type=BusType.SYSTEM)
await self._bus.connect()
return self._bus

async def get_manager(self):
if self._manager is None:
bus = await self.get_bus()
path = "/org/freedesktop/systemd1"
bus_name = "org.freedesktop.systemd1"
introspect = await bus.introspect(bus_name, path)
systemd_proxy: ProxyObject = bus.get_proxy_object(bus_name, path, introspection=introspect)
interface = systemd_proxy.get_interface("org.freedesktop.systemd1.Manager")
# Check required method are implemented
assert isinstance(interface, SystemdProxy)
self._manager = interface
return self._manager

async def enable(self, service: str) -> None:
manager = await self.get_manager()
await manager.call_enable_unit_files([service], False, True)
logger.debug(f"Enabled {service} service")

def start(self, service: str) -> None:
self.manager.StartUnit(service, "replace")
async def start(self, service: str) -> None:
manager = await self.get_manager()
await manager.call_start_unit(service, Mode.REPLACE)
logger.debug(f"Started {service} service")

def stop(self, service: str) -> None:
self.manager.StopUnit(service, "replace")
async def stop(self, service: str) -> None:
manager = await self.get_manager()
await manager.call_stop_unit(service, Mode.REPLACE)
logger.debug(f"Stopped {service} service")

def restart(self, service: str) -> None:
self.manager.RestartUnit(service, "replace")
async def restart(self, service: str) -> None:
manager = await self.get_manager()
await manager.call_restart_unit(service, Mode.REPLACE)
logger.debug(f"Restarted {service} service")

def disable(self, service: str) -> None:
self.manager.DisableUnitFiles([service], False)
async def disable(self, service: str) -> None:
manager = await self.get_manager()
await manager.call_disable_unit_files([service], False)
logger.debug(f"Disabled {service} service")

def is_service_enabled(self, service: str) -> bool:
async def is_service_enabled(self, service: str) -> bool:
manager = await self.get_manager()
try:
return self.manager.GetUnitFileState(service) == "enabled"
except DBusException as error:
state = await manager.call_get_unit_file_state(service)
return state == UnitFileState.ENABLED
except DBusError as error:
logger.error(error)
return False

def is_service_active(self, service: str) -> bool:
async def is_service_active(self, service: str) -> bool:
manager = await self.get_manager()
try:
systemd_service = self.bus.get_object("org.freedesktop.systemd1", object_path=self.manager.GetUnit(service))
unit = dbus.Interface(systemd_service, "org.freedesktop.systemd1.Unit")
unit_properties = dbus.Interface(unit, "org.freedesktop.DBus.Properties")
active_state = unit_properties.Get("org.freedesktop.systemd1.Unit", "ActiveState")
return active_state == "active"
except DBusException as error:
path = await manager.call_get_unit(service)
bus = await self.get_bus()
bus_name = "org.freedesktop.systemd1"
introspect = await bus.introspect(bus_name, path)
systemd_service = bus.get_proxy_object(bus_name, path, introspection=introspect)
unit = systemd_service.get_interface("org.freedesktop.systemd1.Unit")
# Check required method are implemented
assert isinstance(unit, UnitProxy)
active_state = await unit.get_active_state()
return active_state == ActiveState.ACTIVE
except DBusError as error:
logger.error(error)
return False

def enable_and_start(self, service: str) -> None:
if not self.is_service_enabled(service):
self.enable(service)
if not self.is_service_active(service):
self.start(service)
async def enable_and_start(self, service: str) -> None:
if not await self.is_service_enabled(service):
await self.enable(service)
if not await self.is_service_active(service):
await self.start(service)

async def stop_and_disable(self, service: str) -> None:
if await self.is_service_active(service):
await self.stop(service)
if await self.is_service_enabled(service):
await self.disable(service)
Loading
Loading