Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only reindex singletons/events if service is synced #2348

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@dataclass
class IndexingStatus:
class AllIndexingStatus:
current_block_number: int
current_block_timestamp: int
erc20_block_number: int
Expand All @@ -39,10 +39,10 @@ class IndexingStatus:


@dataclass
class ERC20IndexingStatus:
class SpecificIndexingStatus:
current_block_number: int
erc20_block_number: int
erc20_synced: bool
block_number: int
synced: bool


class IndexingException(Exception):
Expand Down Expand Up @@ -126,16 +126,23 @@ def get_master_copies_current_indexing_block_number(self) -> Optional[int]:
min_master_copies_block_number=Min("tx_block_number")
)["min_master_copies_block_number"]

def get_indexing_status(self) -> IndexingStatus:
current_block = self.ethereum_client.get_block("latest")
current_block_number = current_block["number"]

# Indexing points to the next block to be indexed, we need the previous ones
def get_erc20_indexing_status(
self, current_block_number: int
) -> SpecificIndexingStatus:
erc20_block_number = min(
max(self.get_erc20_721_current_indexing_block_number() - 1, 0),
current_block_number,
)
erc20_synced = (
current_block_number - erc20_block_number <= self.eth_reorg_blocks
)
return SpecificIndexingStatus(
current_block_number, erc20_block_number, erc20_synced
)

def get_master_copies_indexing_status(
self, current_block_number: int
) -> SpecificIndexingStatus:
if (
master_copies_current_indexing_block_number := self.get_master_copies_current_indexing_block_number()
) is None:
Expand All @@ -146,33 +153,50 @@ def get_indexing_status(self) -> IndexingStatus:
current_block_number,
)

erc20_synced = (
current_block_number - erc20_block_number <= self.eth_reorg_blocks
)
master_copies_synced = (
current_block_number - master_copies_block_number <= self.eth_reorg_blocks
)
return SpecificIndexingStatus(
current_block_number, master_copies_block_number, master_copies_synced
)

def get_indexing_status(self) -> AllIndexingStatus:
current_block = self.ethereum_client.get_block("latest")
current_block_number = current_block["number"]

erc20_indexing_status = self.get_erc20_indexing_status(current_block_number)
master_copies_indexing_status = self.get_master_copies_indexing_status(
current_block_number
)

if erc20_block_number == master_copies_block_number == current_block_number:
if (
erc20_indexing_status.block_number
== master_copies_indexing_status.block_number
== current_block_number
):
erc20_block, master_copies_block = [current_block, current_block]
else:
erc20_block, master_copies_block = self.ethereum_client.get_blocks(
[erc20_block_number, master_copies_block_number]
[
erc20_indexing_status.block_number,
master_copies_indexing_status.block_number,
]
)
current_block_timestamp = current_block["timestamp"]
erc20_block_timestamp = erc20_block["timestamp"]
master_copies_block_timestamp = master_copies_block["timestamp"]

return IndexingStatus(
return AllIndexingStatus(
current_block_number=current_block_number,
current_block_timestamp=current_block_timestamp,
erc20_block_number=erc20_block_number,
erc20_block_number=erc20_indexing_status.block_number,
erc20_block_timestamp=erc20_block_timestamp,
erc20_synced=erc20_synced,
master_copies_block_number=master_copies_block_number,
erc20_synced=erc20_indexing_status.synced,
master_copies_block_number=master_copies_indexing_status.block_number,
master_copies_block_timestamp=master_copies_block_timestamp,
master_copies_synced=master_copies_synced,
synced=erc20_synced and master_copies_synced,
master_copies_synced=master_copies_indexing_status.synced,
synced=erc20_indexing_status.synced
and master_copies_indexing_status.synced,
)

def is_service_synced(self) -> bool:
Expand Down
26 changes: 24 additions & 2 deletions safe_transaction_service/history/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,21 @@ def process_decoded_internal_txs_for_safe_task(

@app.shared_task(bind=True)
@task_timeout(timeout_seconds=LOCK_TIMEOUT)
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> bool:
"""
Reindexes last hours for master copies to prevent indexing issues

:param hours: Hours to reindex from now
:return: `True` if reindexing is triggered, `False` otherwise
"""
with contextlib.suppress(LockError):
with only_one_running_task(self):
indexing_status = IndexServiceProvider().get_indexing_status()
if not indexing_status.master_copies_synced:
logger.warning(
"Reindexing master copies will not be executed as service is out of sync"
)
Uxio0 marked this conversation as resolved.
Show resolved Hide resolved
return False
if ethereum_block := EthereumBlock.objects.oldest_than(
seconds=60 * 60 * hours
).first():
Expand All @@ -333,16 +342,27 @@ def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[i
reindex_master_copies_task.delay(
from_block_number, to_block_number=to_block_number
)
return True
return False


@app.shared_task(bind=True)
@task_timeout(timeout_seconds=LOCK_TIMEOUT)
def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[int]:
def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> bool:
"""
Reindexes last hours for erx20 and erc721 to prevent indexing issues

:param hours: Hours to reindex from now
:return: `True` if reindexing is triggered, `False` otherwise
"""
with contextlib.suppress(LockError):
with only_one_running_task(self):
indexing_status = IndexServiceProvider().get_indexing_status()
if not indexing_status.erc20_synced:
logger.warning(
"Reindexing erc20/721 events will not be executed as service is out of sync"
)
Uxio0 marked this conversation as resolved.
Show resolved Hide resolved
return False
if ethereum_block := EthereumBlock.objects.oldest_than(
seconds=60 * 60 * hours
).first():
Expand All @@ -361,6 +381,8 @@ def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[i
reindex_erc20_events_task.delay(
from_block_number, to_block_number=to_block_number
)
return True
return False


@app.shared_task(bind=True)
Expand Down
42 changes: 33 additions & 9 deletions safe_transaction_service/history/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from eth_account import Account

from ...events.services import QueueService
from safe_transaction_service.events.services import QueueService

from ...utils.redis import get_redis
from ..indexers import (
Erc20EventsIndexerProvider,
Expand All @@ -30,6 +31,7 @@
ReorgService,
)
from ..services.collectibles_service import CollectibleWithMetadata
from ..services.index_service import SpecificIndexingStatus
from ..tasks import (
check_reorgs_task,
check_sync_status_task,
Expand Down Expand Up @@ -128,54 +130,76 @@ def test_index_new_proxies_task(self):
def test_index_safe_events_task(self):
self.assertEqual(index_safe_events_task.delay().result, (0, 0))

@patch.object(IndexService, "get_master_copies_indexing_status")
@patch.object(IndexService, "reindex_master_copies")
def test_reindex_mastercopies_last_hours_task(
self, reindex_master_copies_mock: MagicMock
self,
reindex_master_copies_mock: MagicMock,
get_master_copies_indexing_status_mock: MagicMock,
):
get_master_copies_indexing_status_mock.return_value = SpecificIndexingStatus(
0, 0, True
)

now = timezone.now()
one_hour_ago = now - datetime.timedelta(hours=1)
one_day_ago = now - datetime.timedelta(days=1)
one_week_ago = now - datetime.timedelta(weeks=1)

reindex_mastercopies_last_hours_task()
self.assertFalse(reindex_mastercopies_last_hours_task())
reindex_master_copies_mock.assert_not_called()

ethereum_block_0 = EthereumBlockFactory(timestamp=one_week_ago)
ethereum_block_1 = EthereumBlockFactory(timestamp=one_day_ago)
ethereum_block_2 = EthereumBlockFactory(timestamp=one_hour_ago)
ethereum_block_3 = EthereumBlockFactory(timestamp=now)

reindex_mastercopies_last_hours_task()
self.assertTrue(reindex_mastercopies_last_hours_task())
reindex_master_copies_mock.assert_called_once_with(
ethereum_block_1.number,
to_block_number=ethereum_block_3.number,
addresses=None,
)

get_master_copies_indexing_status_mock.return_value = SpecificIndexingStatus(
0, 0, False
)
self.assertFalse(reindex_mastercopies_last_hours_task())

@patch.object(IndexService, "get_erc20_indexing_status")
@patch.object(IndexService, "reindex_erc20_events")
def test_reindex_erc20_erc721_last_hours_task(
self, reindex_erc20_events: MagicMock
self,
reindex_erc20_events_mock: MagicMock,
get_erc20_indexing_status_mock: MagicMock,
):
get_erc20_indexing_status_mock.return_value = SpecificIndexingStatus(0, 0, True)

now = timezone.now()
one_hour_ago = now - datetime.timedelta(hours=1)
one_day_ago = now - datetime.timedelta(days=1)
one_week_ago = now - datetime.timedelta(weeks=1)

reindex_erc20_erc721_last_hours_task()
reindex_erc20_events.assert_not_called()
self.assertFalse(reindex_erc20_erc721_last_hours_task())
reindex_erc20_events_mock.assert_not_called()

ethereum_block_0 = EthereumBlockFactory(timestamp=one_week_ago)
ethereum_block_1 = EthereumBlockFactory(timestamp=one_day_ago)
ethereum_block_2 = EthereumBlockFactory(timestamp=one_hour_ago)
ethereum_block_3 = EthereumBlockFactory(timestamp=now)

reindex_erc20_erc721_last_hours_task()
reindex_erc20_events.assert_called_once_with(
self.assertTrue(reindex_erc20_erc721_last_hours_task())
reindex_erc20_events_mock.assert_called_once_with(
ethereum_block_1.number,
to_block_number=ethereum_block_3.number,
addresses=None,
)

get_erc20_indexing_status_mock.return_value = SpecificIndexingStatus(
0, 0, False
)
self.assertFalse(reindex_erc20_erc721_last_hours_task())

def test_process_decoded_internal_txs_task(self):
owner = Account.create().address
safe_address = Account.create().address
Expand Down
Loading