Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SBK-251] Store the last successfully processed block for fast restart #33

Closed
fubuloubu opened this issue Aug 25, 2023 · 8 comments · Fixed by #45
Closed

[SBK-251] Store the last successfully processed block for fast restart #33

fubuloubu opened this issue Aug 25, 2023 · 8 comments · Fixed by #45
Assignees
Labels
enhancement New feature or request

Comments

@fubuloubu
Copy link
Member

fubuloubu commented Aug 25, 2023

Overview

If the script restarts, you might want to know what the last successfully processed block was so you can continue from that point

Specification

Would be useful for a pattern like this

@app.on_startup()
def process_event(state):
    data = contract.MyEvent.query("*", start_block=state.last_block)
    ...

Dependencies

Include links to any open issues that must be resolved before this feature can be implemented.

SBK-251

@fubuloubu fubuloubu added the enhancement New feature or request label Aug 25, 2023
@vany365 vany365 changed the title Store the last successfully processed block for fast restart Store the last successfully processed block for fast restart [SBK-250] Aug 25, 2023
@fubuloubu fubuloubu changed the title Store the last successfully processed block for fast restart [SBK-250] [SBK-251] Store the last successfully processed block for fast restart Aug 25, 2023
@fubuloubu
Copy link
Member Author

Related to #18

@mikeshultz mikeshultz self-assigned this Nov 10, 2023
@mikeshultz
Copy link
Contributor

Still not an expert on taskiq but learned a decent amount now. This is my understanding:

  • "state" is something stored in memory, unique to that process (runner, worker, whatever). It's useful for setting up things like connections to other services or keeping some kind of state of that process. Storing anything here is ephemeral.
  • "Results backends" are just dumb storage of data blobs returned from the worker handlers (and some job metadata). I believe this is a critical component when runner and worker are split. Looks like the broker would normally store results here, and without configuring it the runner would never receive the results. Results backends can be configured to be permanent stores, but the redis one defaults to "store until read" like a message queue.

Neither of these are especially fitting for our use case. I think that leaves us with only one option; to implement our own state storage for Silverback. That comes with the bonus of ultimate flexibility but I'm left wondering what other use cases it might have when thinking about the design.

My initial thought was that we could use files in ~/.config. I think Ape already leverages this design. However, that requires filesystem persistence and that wouldn't be ideal in containerized services, like ApePay. Redis might be a good option if we're expecting to utilize it for a results backend. Rolling a DB would be major overkill to store a block number cursor.

@fubuloubu I could use your input and future-think on this. Is there other state we might be interested in storing? Other databases or storage options we might leverage for other things in Silverback? Do we want to offer options to users or go for just what we need right now?

@mikeshultz
Copy link
Contributor

After writing all that up, I'm kind of leaning towards Redis being required for persistence in Silverback. Maybe remove results backend settings and just offer up Redis connection settings to the user. Then we either have Redis to do what we want with, or the whole configuration is ephemeral.

@fubuloubu
Copy link
Member Author

fubuloubu commented Nov 15, 2023

Hmm, so there's a few ways to look at this:

  1. Reading results database to gain metrics from worker execution e.g. see the last successfully processed block. Persistence would be required for this, but instead of redis persistence we could migrate the redis result store into either a memory store in the runner or a more permanent datastore like postgres/sqlite (configurable via sqlalchemy) or Mongo.
  2. The runner itself should implement a REST api to communicate metrics, status, etc. See [SBK-314] Add REST API for command/control/monitoring #39. This result data is critical for filling in metrics for running apps and will be an important part of the Silverback SaaS platform. However, there should be a simple implementation of this, which is what I'm planning to do e.g. the built-in runners host a FastAPI app using stored metrics in the runner implementation.

P.s. the data is read by the runner via the _handle_result

@mikeshultz
Copy link
Contributor

Reading results database to gain metrics from worker execution e.g. see the last successfully processed block. Persistence would be required for this, but instead of redis persistence we could migrate the redis result store into either a memory store in the runner or a more permanent datastore like postgres/sqlite (configurable via sqlalchemy) or Mongo.

The results backend isn't going to cut it for us. We can't query it for what we want (e.g. last executed job). There's also a chance that other results backends are more friendly to this (e.g. if there's a postgres one with metadata cols). I'll look today. But I suspect we'll need to move to our own persistence layer.

This result data is critical for filling in metrics for running apps and will be an important part of the Silverback SaaS platform. However, there should be a simple implementation of this, which is what I'm planning to do e.g. the built-in runners host a FastAPI app using stored metrics in the runner implementation.

Does this mean you're implementing the persistence layer or that you would leverage it as well?

@mikeshultz
Copy link
Contributor

mikeshultz commented Nov 15, 2023

Here's some roughing I did of what I'm thinking, with some psuedo-code.

We could have some kind of storage setup. There might be something like this already we can use on top of mongo or redis or whatever k/v storage we find most fitting.

class BaseStorage(ABC):
    @abstractmethod
    def get(k: str) -> BaseModel:
        ...

    @abstractmethod
    def store(k: str, v: BaseModel):
        ...

A possible model of silverback instance state:

class RunnerState(BaseModel):
    instance: str # UUID, or tag, or deployment/network name?
    network: str
    block_number: int
    updated: datetime

A rough model of handler results:

class HandlerResult(BaseModel):
    return_value: T
    labels: Dict[str, str]
    execution_time: float
    network: str
    block_number: int
    instance: str

We'd still need something to handle relations. We need a way to query "give me all events for this contract" which this doesn't cover. Maybe we can tag results with block and event data that might allow us to fetch. Or we add a contract model that has a list of event IDs or something. Depends on the storage, really.

Rough example of how it might work in the runner:

    def _checkpoint(self, block_number: int) -> int:
        """Set latest checkpoint block number"""
        if block_number > self.latest_block_number:
            logger.debug(f"Checkpoint block #{self.latest_block_number}")
            self.latest_block_number = block_number

            self._storage.store(
                f"{self.instance}:runner_state",
                RunnerState(
                    instance=self.instance,
                    network=self.network,
                    block_number=result.result.block_number,
                    updated=datetime.utcnow(),
                ),
            )

        return self.latest_block_number
    def _handle_result(self, task_type: str, result: TaskiqResult):
    	store_key: str

    	if task_type == "block":
	        store_key = f"{self.instance}:block:{result.block_number}:result"
    	elif task_type == "event":
    		store_key = f"{self.instance}:event:{result.block_number}:{contract_event.contract.address}:{contract_event.name}result"

        self._storage.store(
            store_key,
            HandlerResult(
                instance=self.instance,
                network=self.network,
                block_number=result.block_number,
                execution_time=result.execution_time,
                labels=result.labels,
                return_value=result.return_value,
            ),
        )

@fubuloubu let me know if this sounds ok to you or if I'm barking up the wrong tree.

@mikeshultz
Copy link
Contributor

Beanie looks interesting if we're itching to go the mongo route. Seems it fits well in with FastAPI and Pydantic.

@mikeshultz
Copy link
Contributor

Raw draft PR up at #45 for early feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants