Skip to content

Commit

Permalink
Feature: asynchronous garbage collector (#516)
Browse files Browse the repository at this point in the history
Problem: deleting files from the STORE message handler causes latency in
the message processing pipeline when deleting large files.

Solution: delete files asynchronously. We now spawn a garbage collector
task every 24 hours by default that will look at the list of files and
delete files without pins.

This PR also introduces the concept of grace period pins: files without
a tx or message pin can now be pinned for an arbitrary amount of time.
This applies to files uploaded by users in unauthenticated mode and to
files deleted by FORGET messages. The default is to give 24 hours of grace
period for files.
  • Loading branch information
odesenfans authored Nov 3, 2023
1 parent 0e4a3ea commit 29139b1
Show file tree
Hide file tree
Showing 16 changed files with 456 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""grace period file pins
Revision ID: 88a25ae08ebf
Revises: 3bf484f2cc95
Create Date: 2023-11-02 22:43:40.223477
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "88a25ae08ebf"
down_revision = "3bf484f2cc95"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"file_pins", sa.Column("delete_by", sa.TIMESTAMP(timezone=True), nullable=True)
)
op.create_index(
"ix_file_pins_delete_by",
"file_pins",
["delete_by"],
unique=False,
postgresql_where=sa.text("delete_by IS NOT NULL"),
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_file_pins_delete_by",
table_name="file_pins",
postgresql_where=sa.text("delete_by IS NOT NULL"),
)
op.drop_column("file_pins", "delete_by")
# ### end Alembic commands ###
13 changes: 13 additions & 0 deletions src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
from aleph.services.ipfs.common import make_ipfs_client
from aleph.services.keys import generate_keypair, save_keys
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
from aleph.services.storage.garbage_collector import (
garbage_collector_task,
GarbageCollector,
)
from aleph.storage import StorageService
from aleph.toolkit.logging import setup_logging
from aleph.toolkit.monitoring import setup_sentry
Expand Down Expand Up @@ -142,6 +146,9 @@ async def main(args: List[str]) -> None:
ipfs_service=ipfs_service,
node_cache=node_cache,
)
garbage_collector = GarbageCollector(
session_factory=session_factory, storage_service=storage_service
)
chain_data_service = ChainDataService(
session_factory=session_factory,
storage_service=storage_service,
Expand Down Expand Up @@ -192,6 +199,12 @@ async def main(args: List[str]) -> None:
tasks.append(refresh_cache_materialized_views(session_factory))
LOGGER.debug("Initialized cache tasks")

LOGGER.debug("Initializing garbage collector task")
tasks.append(
garbage_collector_task(config=config, garbage_collector=garbage_collector)
)
LOGGER.debug("Initialized garbage collector task")

LOGGER.debug("Running event loop")
await asyncio.gather(*tasks)

Expand Down
4 changes: 4 additions & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def get_defaults():
"folder": "/var/lib/pyaleph",
# Whether to store files on the node.
"store_files": True,
# Interval between garbage collector runs, expressed in hours.
"garbage_collector_period": 24,
# Grapce period for files, expressed in hours.
"grace_period": 24,
},
"nuls2": {
# NULS2 chain ID.
Expand Down
32 changes: 32 additions & 0 deletions src/aleph/db/accessors/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@
MessageFilePinDb,
FilePinType,
ContentFilePinDb,
GracePeriodFilePinDb,
)


def is_pinned_file(session: DbSession, file_hash: str) -> bool:
return FilePinDb.exists(session=session, where=FilePinDb.file_hash == file_hash)


def get_unpinned_files(session: DbSession) -> Iterable[StoredFileDb]:
"""
Returns the list of files that are not pinned by a message or an on-chain transaction.
"""
select_pins = select(FilePinDb).where(FilePinDb.file_hash == StoredFileDb.hash)
select_stmt = select(StoredFileDb).where(~select_pins.exists())
return session.execute(select_stmt).scalars()


def upsert_tx_file_pin(
session: DbSession, file_hash: str, tx_hash: str, created: dt.datetime
) -> None:
Expand Down Expand Up @@ -93,6 +103,28 @@ def delete_file_pin(session: DbSession, item_hash: str) -> None:
session.execute(delete_stmt)


def insert_grace_period_file_pin(
session: DbSession,
file_hash: str,
created: dt.datetime,
delete_by: dt.datetime,
) -> None:
insert_stmt = insert(GracePeriodFilePinDb).values(
file_hash=file_hash,
created=created,
type=FilePinType.GRACE_PERIOD,
delete_by=delete_by,
)
session.execute(insert_stmt)


def delete_grace_period_file_pins(session: DbSession, datetime: dt.datetime) -> None:
delete_stmt = delete(GracePeriodFilePinDb).where(
GracePeriodFilePinDb.delete_by < datetime
)
session.execute(delete_stmt)


def get_message_file_pin(
session: DbSession, item_hash: str
) -> Optional[MessageFilePinDb]:
Expand Down
18 changes: 18 additions & 0 deletions src/aleph/db/models/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class FilePinType(str, Enum):
MESSAGE = "message"
# A file containing sync messages.
TX = "tx"
# A file with a grace period (=no one paying for the file, but we keep it around for a while).
GRACE_PERIOD = "grace_period"


class StoredFileDb(Base):
Expand Down Expand Up @@ -101,8 +103,24 @@ class ContentFilePinDb(FilePinDb):
}


class GracePeriodFilePinDb(FilePinDb):
delete_by: dt.datetime = Column(TIMESTAMP(timezone=True), nullable=True)

__mapper_args__ = {
"polymorphic_identity": FilePinType.GRACE_PERIOD.value,
}



Index(
"ix_file_pins_owner",
MessageFilePinDb.owner,
postgresql_where=MessageFilePinDb.owner.isnot(None),
)


Index(
"ix_file_pins_delete_by",
GracePeriodFilePinDb.delete_by,
postgresql_where=GracePeriodFilePinDb.delete_by.isnot(None),
)
66 changes: 26 additions & 40 deletions src/aleph/handlers/content/store.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
""" This is the storage message handlers file.
For now it's very simple, we check if we want to store files or not.
"""
Content handler for STORE messages.
TODO:
- check balances and storage allowance
- handle incentives from 3rd party
- handle garbage collection of unused hashes
"""

import asyncio
import datetime as dt
import logging
from typing import List, Optional, Set

import aioipfs
from aioipfs import NotPinnedError
from aioipfs.api import RepoAPI
from aleph_message.models import ItemType, StoreContent, ItemHash

from aleph.config import get_config
from aleph.db.accessors.files import (
delete_file as delete_file_db,
insert_message_file_pin,
get_file_tag,
upsert_file_tag,
Expand All @@ -28,12 +23,13 @@
is_pinned_file,
get_message_file_pin,
upsert_file,
insert_grace_period_file_pin,
)
from aleph.db.models import MessageDb
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.handlers.content.content_handler import ContentHandler
from aleph.storage import StorageService
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now
from aleph.types.db_session import DbSession
from aleph.types.files import FileTag, FileType
from aleph.types.message_status import (
Expand Down Expand Up @@ -88,8 +84,9 @@ def make_file_tag(owner: str, ref: Optional[str], item_hash: str) -> FileTag:


class StoreMessageHandler(ContentHandler):
def __init__(self, storage_service: StorageService):
def __init__(self, storage_service: StorageService, grace_period: int):
self.storage_service = storage_service
self.grace_period = grace_period

async def is_related_content_fetched(
self, session: DbSession, message: MessageDb
Expand All @@ -103,7 +100,6 @@ async def is_related_content_fetched(
async def fetch_related_content(
self, session: DbSession, message: MessageDb
) -> None:

# TODO: simplify this function, it's overly complicated for no good reason.

# TODO: this check is useless, remove it
Expand Down Expand Up @@ -153,7 +149,7 @@ async def fetch_related_content(
if stats["Type"] == "file":
is_folder = False
size = stats["Size"]
do_standard_lookup = size < 1024 ** 2 and len(item_hash) == 46
do_standard_lookup = size < 1024**2 and len(item_hash) == 46
else:
is_folder = True
# Size is 0 for folders, use cumulative size instead
Expand Down Expand Up @@ -280,13 +276,17 @@ async def process(self, session: DbSession, messages: List[MessageDb]) -> None:
for message in messages:
await self._pin_and_tag_file(session=session, message=message)

# TODO: should be probably be in the storage service
async def _garbage_collect(
async def _check_remaining_pins(
self, session: DbSession, storage_hash: str, storage_type: ItemType
):
"""If a file does not have any reference left, delete or unpin it.
"""
If a file is not pinned anymore, mark it as pickable by the garbage collector.
We do not delete files directly from the message processor for two reasons:
1. Performance (deleting large files is slow)
2. Give the users some time to react if a file gets unpinned.
This is typically called after 'forgetting' a message.
If a file is not pinned by a TX or message, we give it a grace period pin.
"""
LOGGER.debug(f"Garbage collecting {storage_hash}")

Expand All @@ -303,30 +303,16 @@ async def _garbage_collect(
f"for hash '{storage_hash}'"
)

delete_file_db(session=session, file_hash=storage_hash)
LOGGER.info("Inserting grace period pin for %s", storage_hash)

if storage_type == ItemType.ipfs:
LOGGER.debug(f"Removing from IPFS: {storage_hash}")
ipfs_client = self.storage_service.ipfs_service.ipfs_client
try:
result = await ipfs_client.pin.rm(storage_hash)
print(result)

# Launch the IPFS garbage collector (`ipfs repo gc`)
async for _ in RepoAPI(driver=ipfs_client).gc():
pass

except NotPinnedError:
LOGGER.debug("File not pinned")

LOGGER.debug(f"Removed from IPFS: {storage_hash}")
elif storage_type == ItemType.storage:
LOGGER.debug(f"Removing from local storage: {storage_hash}")
await self.storage_service.storage_engine.delete(storage_hash)
LOGGER.debug(f"Removed from local storage: {storage_hash}")
else:
raise ValueError(f"Invalid storage type {storage_type}")
LOGGER.debug(f"Removed from {storage_type}: {storage_hash}")
current_datetime = utc_now()
delete_by = current_datetime + dt.timedelta(hours=self.grace_period)
insert_grace_period_file_pin(
session=session,
file_hash=storage_hash,
created=utc_now(),
delete_by=delete_by,
)

async def forget_message(self, session: DbSession, message: MessageDb) -> Set[str]:
content = _get_store_content(message)
Expand All @@ -340,7 +326,7 @@ async def forget_message(self, session: DbSession, message: MessageDb) -> Set[st
item_hash=message.item_hash,
),
)
await self._garbage_collect(
await self._check_remaining_pins(
session=session,
storage_hash=content.item_hash,
storage_type=content.item_type,
Expand Down
5 changes: 4 additions & 1 deletion src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ def __init__(
balances_post_type=config.aleph.balances.post_type.value,
),
MessageType.program: vm_handler,
MessageType.store: StoreMessageHandler(storage_service=storage_service),
MessageType.store: StoreMessageHandler(
storage_service=storage_service,
grace_period=config.storage.grace_period.value,
),
}

self.content_handlers[MessageType.forget] = ForgetMessageHandler(
Expand Down
1 change: 1 addition & 0 deletions src/aleph/services/storage/engine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import datetime as dt
from typing import Optional


Expand Down
5 changes: 1 addition & 4 deletions src/aleph/services/storage/fileystem_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@

class FileSystemStorageEngine(StorageEngine):
def __init__(self, folder: Union[Path, str]):

self.folder = folder if isinstance(folder, Path) else Path(folder)

if self.folder.exists() and not self.folder.is_dir():
raise ValueError(
f"'{self.folder}' exists and is not a directory."
)
raise ValueError(f"'{self.folder}' exists and is not a directory.")

self.folder.mkdir(parents=True, exist_ok=True)

Expand Down
Loading

0 comments on commit 29139b1

Please sign in to comment.