Skip to content

Commit

Permalink
Internal: TX processing is now event-based (#492)
Browse files Browse the repository at this point in the history
Problem: the pending TX processor uses a polling loop to determine which
transactions must be processed. This leads to some latency when adding a
new transaction when there is no currently pending transaction as the
task sleeps while waiting for new transactions.

Solution: make the TX processor event-based by using a RabbitMQ exchange
+ message queue. Upon receiving a new transaction from on-chain, we now
publish a RabbitMQ message containing the hash of the transaction. The
processor can then find the transaction in the DB. The RabbitMQ message is only
used as a hint that something must be done. The TX processor still looks for pending
TXs in the database.
  • Loading branch information
odesenfans authored and hoh committed Dec 8, 2023
1 parent 01406c9 commit d29ab5b
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 82 deletions.
8 changes: 5 additions & 3 deletions src/aleph/chains/bsc.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from aleph_message.models import Chain
from configmanager import Config

from aleph.chains.chain_data_service import ChainDataService
from aleph.chains.abc import ChainReader
from aleph.chains.chain_data_service import PendingTxPublisher
from aleph.chains.indexer_reader import AlephIndexerReader
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory


class BscConnector(ChainReader):
def __init__(
self, session_factory: DbSessionFactory, chain_data_service: ChainDataService
self,
session_factory: DbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.indexer_reader = AlephIndexerReader(
chain=Chain.BSC,
session_factory=session_factory,
chain_data_service=chain_data_service,
pending_tx_publisher=pending_tx_publisher,
)

async def fetcher(self, config: Config):
Expand Down
61 changes: 54 additions & 7 deletions src/aleph/chains/chain_data_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from io import StringIO
from typing import Dict, Optional, List, Any, Mapping, Set, cast, Type, Union
from typing import Dict, Optional, List, Any, Mapping, Set, cast, Type, Union, Self

import aio_pika.abc
from aleph_message.models import StoreContent, ItemType, Chain, MessageType
from configmanager import Config
from pydantic import ValidationError

from aleph.chains.common import LOGGER
Expand Down Expand Up @@ -36,7 +38,9 @@

class ChainDataService:
def __init__(
self, session_factory: DbSessionFactory, storage_service: StorageService
self,
session_factory: DbSessionFactory,
storage_service: StorageService,
):
self.session_factory = session_factory
self.storage_service = storage_service
Expand Down Expand Up @@ -215,11 +219,54 @@ async def get_tx_messages(
LOGGER.info("%s", error_msg)
raise InvalidContent(error_msg)


async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExchange:
mq_conn = await aio_pika.connect_robust(
host=config.p2p.mq_host.value,
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
)
channel = await mq_conn.channel()
pending_tx_exchange = await channel.declare_exchange(
name=config.rabbitmq.pending_tx_exchange.value,
type=aio_pika.ExchangeType.TOPIC,
auto_delete=False,
)
return pending_tx_exchange


class PendingTxPublisher:
def __init__(self, pending_tx_exchange: aio_pika.abc.AbstractExchange):
self.pending_tx_exchange = pending_tx_exchange

@staticmethod
async def incoming_chaindata(session: DbSession, tx: ChainTxDb):
"""Incoming data from a chain.
Content can be inline of "offchain" through an ipfs hash.
For now, we only add it to the database, it will be processed later.
"""
def add_pending_tx(session: DbSession, tx: ChainTxDb):
upsert_chain_tx(session=session, tx=tx)
upsert_pending_tx(session=session, tx_hash=tx.hash)

async def publish_pending_tx(self, tx: ChainTxDb):
message = aio_pika.Message(body=tx.hash.encode("utf-8"))
await self.pending_tx_exchange.publish(
message=message, routing_key=f"{tx.chain.value}.{tx.publisher}.{tx.hash}"
)

async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
"""
Add an event published on one of the supported chains.
Adds the tx to the database, creates a pending tx entry in the pending tx table
and publishes a message on the pending tx exchange.
Note that this function commits changes to the database for consistency
between the DB and the message queue.
"""
self.add_pending_tx(session=session, tx=tx)
session.commit()
await self.publish_pending_tx(tx)

@classmethod
async def new(cls, config: Config) -> Self:
pending_tx_exchange = await make_pending_tx_exchange(config=config)
return cls(
pending_tx_exchange=pending_tx_exchange,
)
17 changes: 11 additions & 6 deletions src/aleph/chains/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from aleph_message.models import Chain
from configmanager import Config

from aleph.storage import StorageService
from aleph.types.db_session import DbSessionFactory
from .bsc import BscConnector
from .chain_data_service import ChainDataService
from .abc import ChainReader, ChainWriter
from .bsc import BscConnector
from .chain_data_service import ChainDataService, PendingTxPublisher
from .ethereum import EthereumConnector
from .nuls2 import Nuls2Connector
from .tezos import TezosConnector
Expand All @@ -28,9 +27,13 @@ class ChainConnector:
writers: Dict[Chain, ChainWriter]

def __init__(
self, session_factory: DbSessionFactory, chain_data_service: ChainDataService
self,
session_factory: DbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
self._session_factory = session_factory
self.pending_tx_publisher = pending_tx_publisher
self._chain_data_service = chain_data_service

self.readers = {}
Expand Down Expand Up @@ -101,27 +104,29 @@ def _register_chains(self):
Chain.BSC,
BscConnector(
session_factory=self._session_factory,
chain_data_service=self._chain_data_service,
pending_tx_publisher=self.pending_tx_publisher,
),
)
self._add_chain(
Chain.NULS2,
Nuls2Connector(
session_factory=self._session_factory,
pending_tx_publisher=self.pending_tx_publisher,
chain_data_service=self._chain_data_service,
),
)
self._add_chain(
Chain.ETH,
EthereumConnector(
session_factory=self._session_factory,
pending_tx_publisher=self.pending_tx_publisher,
chain_data_service=self._chain_data_service,
),
)
self._add_chain(
Chain.TEZOS,
TezosConnector(
session_factory=self._session_factory,
chain_data_service=self._chain_data_service,
pending_tx_publisher=self.pending_tx_publisher,
),
)
25 changes: 15 additions & 10 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
from aleph.db.accessors.messages import get_unconfirmed_messages
from aleph.db.accessors.pending_messages import count_pending_messages
from aleph.db.accessors.pending_txs import count_pending_txs
from aleph.db.models.chains import ChainTxDb
from aleph.schemas.chains.tx_context import TxContext
from aleph.schemas.pending_messages import BasePendingMessage
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.utils import run_in_executor
from .chain_data_service import ChainDataService
from .abc import ChainWriter, Verifier, ChainReader
from .abc import ChainWriter, Verifier
from .chain_data_service import ChainDataService, PendingTxPublisher
from .indexer_reader import AlephIndexerReader
from ..db.models import ChainTxDb
from ..types.chain_sync import ChainEventType

LOGGER = logging.getLogger("chains.ethereum")
CHAIN_NAME = "ETH"
Expand Down Expand Up @@ -105,15 +105,17 @@ class EthereumConnector(ChainWriter):
def __init__(
self,
session_factory: DbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
self.session_factory = session_factory
self.pending_tx_publisher = pending_tx_publisher
self.chain_data_service = chain_data_service

self.indexer_reader = AlephIndexerReader(
chain=Chain.ETH,
session_factory=session_factory,
chain_data_service=chain_data_service,
pending_tx_publisher=pending_tx_publisher,
)

async def get_last_height(self, sync_type: ChainEventType) -> int:
Expand Down Expand Up @@ -212,7 +214,9 @@ async def _request_transactions(

except json.JSONDecodeError:
# if it's not valid json, just ignore it...
LOGGER.info("Incoming logic data is not JSON, ignoring. %r" % message)
LOGGER.info(
"Incoming logic data is not JSON, ignoring. %r" % message
)

except Exception:
LOGGER.exception("Can't decode incoming logic data %r" % message)
Expand Down Expand Up @@ -256,7 +260,7 @@ async def fetch_ethereum_sync_events(self, config: Config):
):
tx = ChainTxDb.from_sync_tx_context(tx_context=context, tx_data=jdata)
with self.session_factory() as session:
await self.chain_data_service.incoming_chaindata(
await self.pending_tx_publisher.add_and_publish_pending_tx(
session=session, tx=tx
)
session.commit()
Expand Down Expand Up @@ -313,7 +317,6 @@ async def packer(self, config: Config):
gas_price = web3.eth.generate_gas_price()
while True:
with self.session_factory() as session:

# Wait for sync operations to complete
if (count_pending_txs(session=session, chain=Chain.ETH)) or (
count_pending_messages(session=session, chain=Chain.ETH)
Expand Down Expand Up @@ -344,8 +347,10 @@ async def packer(self, config: Config):
LOGGER.info("Chain sync: %d unconfirmed messages")

# This function prepares a chain data file and makes it downloadable from the node.
sync_event_payload = await self.chain_data_service.prepare_sync_event_payload(
session=session, messages=messages
sync_event_payload = (
await self.chain_data_service.prepare_sync_event_payload(
session=session, messages=messages
)
)
# Required to apply update to the files table in get_chaindata
session.commit()
Expand Down
19 changes: 9 additions & 10 deletions src/aleph/chains/indexer_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pydantic import BaseModel

import aleph.toolkit.json as aleph_json
from aleph.chains.chain_data_service import ChainDataService
from aleph.chains.chain_data_service import PendingTxPublisher
from aleph.db.accessors.chains import (
get_missing_indexer_datetime_multirange,
add_indexer_range,
Expand Down Expand Up @@ -154,7 +154,6 @@ async def fetch_account_state(
blockchain: IndexerBlockchain,
accounts: List[str],
) -> IndexerAccountStateResponse:

query = make_account_state_query(
blockchain=blockchain, accounts=accounts, type_=EntityType.LOG
)
Expand Down Expand Up @@ -194,7 +193,6 @@ def indexer_event_to_chain_tx(
chain: Chain,
indexer_event: Union[MessageEvent, SyncEvent],
) -> ChainTxDb:

if isinstance(indexer_event, MessageEvent):
protocol = ChainSyncProtocol.SMART_CONTRACT
protocol_version = 1
Expand Down Expand Up @@ -225,7 +223,6 @@ async def extract_aleph_messages_from_indexer_response(
chain: Chain,
indexer_response: IndexerEventResponse,
) -> List[ChainTxDb]:

message_events = indexer_response.data.message_events
sync_events = indexer_response.data.sync_events

Expand All @@ -240,7 +237,6 @@ async def extract_aleph_messages_from_indexer_response(


class AlephIndexerReader:

BLOCKCHAIN_MAP: Mapping[Chain, IndexerBlockchain] = {
Chain.BSC: IndexerBlockchain.BSC,
Chain.ETH: IndexerBlockchain.ETHEREUM,
Expand All @@ -251,11 +247,11 @@ def __init__(
self,
chain: Chain,
session_factory: DbSessionFactory,
chain_data_service: ChainDataService,
pending_tx_publisher: PendingTxPublisher,
):
self.chain = chain
self.session_factory = session_factory
self.chain_data_service = chain_data_service
self.pending_tx_publisher = pending_tx_publisher

self.blockchain = self.BLOCKCHAIN_MAP[chain]

Expand Down Expand Up @@ -299,9 +295,7 @@ async def fetch_range(
LOGGER.info("%d new txs", len(txs))
# Events are listed in reverse order in the indexer response
for tx in txs:
await self.chain_data_service.incoming_chaindata(
session=session, tx=tx
)
self.pending_tx_publisher.add_pending_tx(session=session, tx=tx)

if nb_events_fetched >= limit:
last_event_datetime = txs[-1].datetime
Expand All @@ -317,6 +311,7 @@ async def fetch_range(
)
else:
synced_range = Range(start_datetime, end_datetime, upper_inc=True)
txs = []

LOGGER.info(
"%s %s indexer: fetched %s",
Expand All @@ -336,6 +331,10 @@ async def fetch_range(
# of events.
session.commit()

# Now that the txs are committed to the DB, add them to the pending tx message queue
for tx in txs:
await self.pending_tx_publisher.publish_pending_tx(tx)

if nb_events_fetched < limit:
LOGGER.info(
"%s %s event indexer: done fetching events.",
Expand Down
16 changes: 11 additions & 5 deletions src/aleph/chains/nuls2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSessionFactory
from aleph.utils import run_in_executor
from .chain_data_service import ChainDataService
from .chain_data_service import ChainDataService, PendingTxPublisher
from .abc import Verifier, ChainWriter
from aleph.schemas.chains.tx_context import TxContext
from ..db.models import ChainTxDb
Expand Down Expand Up @@ -78,9 +78,13 @@ async def verify_signature(self, message: BasePendingMessage) -> bool:

class Nuls2Connector(ChainWriter):
def __init__(
self, session_factory: DbSessionFactory, chain_data_service: ChainDataService
self,
session_factory: DbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
self.session_factory = session_factory
self.pending_tx_publisher = pending_tx_publisher
self.chain_data_service = chain_data_service

async def get_last_height(self, sync_type: ChainEventType) -> int:
Expand Down Expand Up @@ -154,7 +158,7 @@ async def fetcher(self, config: Config):
tx_context=context, tx_data=jdata
)
with self.session_factory() as db_session:
await self.chain_data_service.incoming_chaindata(
await self.pending_tx_publisher.add_and_publish_pending_tx(
session=db_session, tx=tx
)
db_session.commit()
Expand Down Expand Up @@ -197,8 +201,10 @@ async def packer(self, config: Config):

if len(messages):
# This function prepares a chain data file and makes it downloadable from the node.
sync_event_payload = await self.chain_data_service.prepare_sync_event_payload(
session=session, messages=messages
sync_event_payload = (
await self.chain_data_service.prepare_sync_event_payload(
session=session, messages=messages
)
)
# Required to apply update to the files table in get_chaindata
session.commit()
Expand Down
Loading

0 comments on commit d29ab5b

Please sign in to comment.