Skip to content

Commit

Permalink
Fix: Instead to raise an integrity issue and try to add the message, …
Browse files Browse the repository at this point in the history
…check before if exists on the database and return the already existing
  • Loading branch information
Andres D. Molins committed Jan 14, 2025
1 parent 24409ed commit 08bcc8a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
41 changes: 21 additions & 20 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from aleph_message.models import ItemHash, ItemType, MessageType
from configmanager import Config
from pydantic import ValidationError
from sqlalchemy import insert
from sqlalchemy.dialects.postgresql import insert

from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.accessors.files import insert_content_file_pin, upsert_file
Expand Down Expand Up @@ -241,38 +241,39 @@ async def add_pending_message(
session.commit()
return None

# Check if there are an already existing record
existing_message = (
session.query(PendingMessageDb)
.filter_by(
sender=pending_message.sender,
item_hash=pending_message.item_hash,
signature=pending_message.signature,
)
.one_or_none()
)
if existing_message:
return existing_message

upsert_message_status_stmt = make_message_status_upsert_query(
item_hash=pending_message.item_hash,
new_status=MessageStatus.PENDING,
reception_time=reception_time,
where=MessageStatusDb.status == MessageStatus.REJECTED,
)
insert_pending_message_stmt = insert(PendingMessageDb).values(
pending_message.to_dict(exclude={"id"})
insert_pending_message_stmt = (
insert(PendingMessageDb)
.values(pending_message.to_dict(exclude={"id"}))
.on_conflict_do_nothing("uq_pending_message")
)

try:
session.execute(upsert_message_status_stmt)
session.execute(insert_pending_message_stmt)
session.commit()
except sqlalchemy.exc.IntegrityError:
# Handle the unique constraint violation and log as debug to avoid multiple errors.
LOGGER.debug(
"Duplicate pending message detected. Fetching existing record."
)
session.rollback()
# Fetch the existing record
existing_message = (
session.query(PendingMessageDb)
.filter_by(
sender=pending_message.sender,
item_hash=pending_message.item_hash,
signature=pending_message.signature,
)
.one_or_none()
)
if existing_message:
return existing_message
# Handle the unique constraint violation.
LOGGER.warning("Duplicate pending message detected trying to save it.")
return None

except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e:
LOGGER.warning(
Expand Down
2 changes: 1 addition & 1 deletion tests/message_processing/test_process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.mark.asyncio
async def test_pending_message(
async def test_duplicated_pending_message(
mocker,
mock_config: Config,
session_factory: DbSessionFactory,
Expand Down

0 comments on commit 08bcc8a

Please sign in to comment.