Skip to content

Commit

Permalink
Fix: add origin pending message field (#677)
Browse files Browse the repository at this point in the history
* Fix:
Adds origins field on pending_messages table

* Fix: from_obj missing origin

* Fix: check to ensure origin is not OnChain

* Fix: lint issue

* Fix: isort issue

* Fix: Solve code quality issues.

* Fix: Remove un-needed log info..

* Fix: Solve None origin case tests.

---------

Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
1yam and Andres D. Molins authored Jan 14, 2025
1 parent 42315ce commit 2ec5c83
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""add_pending_messages_origin
Revision ID: bafd49315934
Revises: d3bba5c2bfa0
Create Date: 2025-01-13 15:05:05.309960
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bafd49315934'
down_revision = 'd3bba5c2bfa0'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column("pending_messages", sa.Column("origin", sa.String(), nullable=True, default="p2p"))


def downgrade() -> None:
op.drop_column("pending_messages", "origin")
6 changes: 6 additions & 0 deletions src/aleph/db/models/pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from aleph.schemas.pending_messages import BasePendingMessage
from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now
from aleph.types.channel import Channel
from aleph.types.message_status import MessageOrigin

from .base import Base
from .chains import ChainTxDb
Expand Down Expand Up @@ -65,6 +66,7 @@ class PendingMessageDb(Base):
retries: int = Column(Integer, nullable=False)
tx_hash: Optional[str] = Column(ForeignKey("chain_txs.hash"), nullable=True)
fetched: bool = Column(Boolean, nullable=False)
origin: Optional[str] = Column(String, nullable=True, default=MessageOrigin.P2P)

__table_args__ = (
CheckConstraint(
Expand All @@ -83,6 +85,7 @@ def from_obj(
tx_hash: Optional[str] = None,
check_message: bool = True,
fetched: bool = False,
origin: Optional[MessageOrigin] = MessageOrigin.P2P,
) -> "PendingMessageDb":

return cls(
Expand All @@ -101,6 +104,7 @@ def from_obj(
tx_hash=tx_hash,
reception_time=reception_time,
fetched=fetched,
origin=origin,
)

@classmethod
Expand All @@ -111,6 +115,7 @@ def from_message_dict(
fetched: bool,
tx_hash: Optional[str] = None,
check_message: bool = True,
origin: Optional[MessageOrigin] = MessageOrigin.P2P,
) -> "PendingMessageDb":
"""
Utility function to translate Aleph message dictionaries, such as those returned by the API,
Expand All @@ -137,6 +142,7 @@ def from_message_dict(
retries=0,
tx_hash=tx_hash,
reception_time=reception_time,
origin=origin,
)


Expand Down
21 changes: 17 additions & 4 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
InvalidMessageFormat,
InvalidSignature,
MessageContentUnavailable,
MessageOrigin,
MessageStatus,
)

Expand Down Expand Up @@ -187,16 +188,19 @@ def __init__(
async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None:
mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8"))
process_or_fetch = "process" if pending_message.fetched else "fetch"
await self.pending_message_exchange.publish(
mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}"
)
if pending_message.origin != MessageOrigin.ONCHAIN:
await self.pending_message_exchange.publish(
mq_message,
routing_key=f"{process_or_fetch}.{pending_message.item_hash}",
)

async def add_pending_message(
self,
message_dict: Mapping[str, Any],
reception_time: dt.datetime,
tx_hash: Optional[str] = None,
check_message: bool = True,
origin: Optional[MessageOrigin] = MessageOrigin.P2P,
) -> Optional[PendingMessageDb]:
# TODO: this implementation is just messy, improve it.
with self.session_factory() as session:
Expand All @@ -219,6 +223,7 @@ async def add_pending_message(
reception_time=reception_time,
tx_hash=tx_hash,
check_message=check_message,
origin=origin,
)

try:
Expand Down Expand Up @@ -394,7 +399,15 @@ async def process(
session=session, pending_message=pending_message, message=message
)
await content_handler.process(session=session, messages=[message])
return ProcessedMessage(message=message, is_confirmation=False)
return ProcessedMessage(
message=message,
origin=(
MessageOrigin(pending_message.origin)
if pending_message.origin
else None
),
is_confirmation=False,
)

async def check_permissions(self, session: DbSession, message: MessageDb):
content_handler = self.get_content_handler(message.type)
Expand Down
14 changes: 9 additions & 5 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from aleph.types.db_session import DbSessionFactory
from aleph.types.message_processing_result import MessageProcessingResult

from ..types.message_status import MessageOrigin
from .job_utils import MessageJob, prepare_loop

LOGGER = getLogger(__name__)
Expand Down Expand Up @@ -131,11 +132,14 @@ async def publish_to_mq(
) -> AsyncIterator[Sequence[MessageProcessingResult]]:
async for processing_results in message_iterator:
for result in processing_results:
mq_message = aio_pika.Message(body=aleph_json.dumps(result.to_dict()))
await self.mq_message_exchange.publish(
mq_message,
routing_key=f"{result.status.value}.{result.item_hash}",
)
if result.origin != MessageOrigin.ONCHAIN:
mq_message = aio_pika.Message(
body=aleph_json.dumps(result.to_dict())
)
await self.mq_message_exchange.publish(
mq_message,
routing_key=f"{result.status.value}.{result.item_hash}",
)

yield processing_results

Expand Down
2 changes: 2 additions & 0 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from aleph.types.chain_sync import ChainSyncProtocol
from aleph.types.db_session import DbSessionFactory

from ..types.message_status import MessageOrigin
from .job_utils import MqWatcher, make_pending_tx_queue, prepare_loop

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,6 +69,7 @@ async def handle_pending_tx(
reception_time=utc_now(),
tx_hash=tx.hash,
check_message=tx.protocol != ChainSyncProtocol.SMART_CONTRACT,
origin=MessageOrigin.ONCHAIN,
)

# bogus or handled, we remove it.
Expand Down
13 changes: 10 additions & 3 deletions src/aleph/types/message_processing_result.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import Any, Dict, Protocol
from typing import Any, Dict, Optional, Protocol

from aleph.db.models import MessageDb, PendingMessageDb
from aleph.schemas.api.messages import format_message
from aleph.types.message_status import ErrorCode, MessageProcessingStatus
from aleph.types.message_status import ErrorCode, MessageOrigin, MessageProcessingStatus


class MessageProcessingResult(Protocol):
status: MessageProcessingStatus
origin: Optional[MessageOrigin] = None

@property
def item_hash(self) -> str:
Expand All @@ -17,13 +18,19 @@ def to_dict(self) -> Dict[str, Any]:


class ProcessedMessage(MessageProcessingResult):
def __init__(self, message: MessageDb, is_confirmation: bool = False):
def __init__(
self,
message: MessageDb,
is_confirmation: bool = False,
origin: Optional[MessageOrigin] = None,
):
self.message = message
self.status = (
MessageProcessingStatus.PROCESSED_CONFIRMATION
if is_confirmation
else MessageProcessingStatus.PROCESSED_NEW_MESSAGE
)
self.origin = origin

@property
def item_hash(self) -> str:
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from typing import Any, Dict, Optional, Sequence, Union


class MessageOrigin(str, Enum):
ONCHAIN = "onchain"
P2P = "p2p"
IPFS = "ipfs"


class MessageStatus(str, Enum):
PENDING = "pending"
PROCESSED = "processed"
Expand Down

0 comments on commit 2ec5c83

Please sign in to comment.