-
Notifications
You must be signed in to change notification settings - Fork 49
Implement node sync #170
Comments
Tx replication:Normal Tx process is: Validate -> Verify dep -> propagate -> Save -> Execute -> Mark as executed the algo of the node init and bootstrap2 nodes are involved: the booting node that start and the bootstrap node that send the missing tx.
Bootstrap algo
Creation Tx timestamp:A creation timestamp must be added to the Tx struct. There are 2 possibilities:
From the issue description:
To be defined:
DevThis process is independent of the scheduling and normal Tx processing. To ease the integration, I think we should do a little refactoring first. Move all Txvalidation into Mempool and mempool decide to send Tx to execution. We change from a pull to a push to execution model for mempool. |
Your proposal has a good idea when it begins from the latest transactions, as that allows node to quickly catch up the online operation. However, I believe it has several problems: Chunking
Wrong. I don't think you understood how it works. Data deduplication is one of the most obvious applications of it, but the content defined chunking itself solves the problem of having distributed agreement on chunk boundaries over large amount of data when bits change anywhere in it. Your proposal has the very problem that CDC actually solves: When a single transaction arrives in the middle of all transactions (i.e. some long running node finally syncs with another node that has that single missing piece), all chunks invalidate because that simple chunk boundary of 10 transactions shifts everything and this then changes every single chunk hash. This brings us to second problem: Chunking from newest to oldest will always shift the chunk boundaries if those are counted by number of transactions. Tx creation timestampI don't like this idea at all. It introduces obvious problems when time is not strictly, precisely synced on every single node of the network. There will be situations where Run transaction has been created after the first Proof, which is wrong. The timestamp must never be used for anything else than rough sampling of transactions that happened around the same time. This is somewhat similar to location: Timestamp in our case is similar to district name in location. It's not a high precision coordinate to specific spot, but a vague concept of "about there", which helps to limit the amount of data needed to be processed. The timestamp should be kept always as a node-local concept, because then it's consistent between distinct samples. When all timestamps have been created locally, then one can analyze transaction execution time and progress by looking at timestamps in the database. In distributed environment, wall clock timestamps must never be used for anything that needs absolute precision. Defect in my proposal and resolution of itWhen writing above, I realized that even my first proposal has a severe defect because it uses the timestamps. It can get into infinite syncing across nodes over time, if two nodes have same transaction with different timestamps where the delta is larger than the size of a single sampling window, like e.g. 5 days apart, then in this case, every time these nodes sync, depending on with whom they sync, the changes go back and forth. However, at this point I'd accept this defect in order to keep the solution simple enough:
|
To agree on an algorithm, I think we should first agree on the goal and assumption. GoalWe have 2 nodes:
The goal is:
If the node start with an empty database, we should consider using a snapshot or say that the node won't have the Tx before a certain time and use this algo. Assumptions:The process is to manage the case that a node crash and the operator restart it later.
Example of Tx timeline between the 2 nodes when the booting node start
From this timeline, the algo will determine to copy from the boostrap node to the booting one the Tx: 11,12,13,14,15,16,17 From this timeline we define these assumptions:
Algorithms :All Tx arriving from the p2p or RPC during the bootstrap are processed and added to the DB as the normal process. Algo1 the simplest:Define a time TBegin that is before T1. For example, we estimate that T1 can't be before 30 mn behind T2crash (last Tx saved). Pro: simple
Algo2 Try to optimize the T1 discoveryThe idea is the booting node request the bootstrap node to build a chunk of tx, calculate the hash of each chunk and send all chunk hash back. When the booting node detect a chunk with a different hash, it requests all the Tx hash of the chunk, compare the Tx hash with the one in the Db and request the TX that are missing. How to determine the chunk size.In my first version, I propose to use the same size for all chunk. It can be optimized by increasing the size more the chunk is far from T1 (before/after) because it's more likely that there's no modification if it's far before T1 or all is missing if far after. As we don't have 1000 Tx per second, not sure if it optimizes a lot of the transferred data volume. About the CDCCDC operate on arbitrary binary data and try to build chunk of these data where chunk are build using hashing technique so that if we take 2 files with the same data and a few modifications in one. The chunk of unmodified data will be the same, and modified data will be inside the chunk by changing the chunk size. General Algorithm.Never mind this can can be changed later if we agree on the base algorithm:
Other possibility:Order the Tx hash set by Hash and use CDC to see if it's efficient to determine the chunk (We've to solve the Tx hash boundary pb). Change to allow the synchronization:Move the Tx waiting part after saving and propagation so that waiting process don't interfere with the sync process. Questions:At start, does not executed TX should be executed or just marked as executed by the bootstrap process? For example, Tx (T8,T10) are not executed but found executed on the bootstrap node. Do we execute them as we do today? T15,T16,T17 are not executed on the bootstrap node but will be sent to the booting node during the bootstrap process. Do the booting node execute them? |
AssumptionI think your first assumption undermines rest of the reasoning:
This is wrong. A booting node can, and often will, miss transactions in-between. Easy way to produce this: Say there are nodes A, B and C. Node A is broadcasting transaction to B and C. It manages to send it to B but it crashes/restarts/looses network connectivity before it gets to send it to C. Now Node A and B have transaction T, but node C doesn't. Nodes continue working as before. Node C was functioning correctly all the time. In fact, it has the longest uptime of all of them, until it's restarted for an upgrade. At this point, when it syncs itself at startup, it realizes it misses individual transactions from here & there. This is one scenario where the static chunk size fails, because changes in the middle will result with changes in all consecutive chunks, when only one chunk would need re-syncing. Empty node boot
I agree with this, but I think this is kind of an optimization (an important, but still). We probably want to build a separate tool/worker/task that on longer interval generates bigger exports from DB that can be directly imported to empty DB. CDC
If our stream of data, that CDC computes over, is aligned by transaction hashes, can't we then easily calculate the Nth transaction by simply dividing the offset by hash length and add 1?
Actually, it does. As I initally wrote:
To elaborate on that: When the nodes agree on chunk boundaries, they can hash the chunk contents (only the concatenated tx hashes, not all data) and these hash lists can be very efficiently transferred & compared between nodes. This way, nodes can sync whole chunks and it can easily cover also missing chunks from the beginning and in the middle. Risk
True, but is this considerable problem? I'm thinking that we should transfer chunks as list of transaction hashes and then transactions are fetched separately. If there are individual transactions missing here and there, it can cause number of chunks to be compared, but if the contained transactions are mostly present, the amount of processed data will still be kept relatively low and in the end, wouldn't this scenario still be relatively rare? |
AssumptionI put this assumption to express the fact that at one moment, there must be a consensus on the Tx state on the network. If not, all nodes can have missing Tx during the normal process (Tx propagation can fail at any moment), so it's impossible to synchronize all the Tx from one node. Even if the booting node synchronize with all nodes, it can miss Tx if another node is down. So as your scenario is possible, we have 2 choices I think:
I remove the master/slave possibility. Otherwise, we have to accept that node can have missing tx. As the rest of the algo depend on this assumption, we've to agree on it first. CDCAbout the hash cut, what I say is. let say the CDC do a chunk of 3 Tx with hash: AA, BB, CC RiskFrom the way crypto hash are created, I think this scenario can arrive with a few Tx because the spread on the hash space of the generated hash are maximum. So in the case we use alphanumeric sort for example, if you take a few Tx you'll get a very different first letter of the hash and the few Tx will be spread with the others. |
Yes. I've tried to convey this the whole time: The syncing of all nodes together in relation to each other is eventual. Therefore it is expected that some nodes might have missing transactions at any point in time, but when they restart over time, they will catch up with those older missing transactions, while there might be newer ones again, depending on functioning of everything and timing of everything. Still, over time, in abstract level, this whole system approaches towards full sync in the grand scheme.
This is the reason why there must be agreement on inclusion and exclusion (begin/end). I'll take over on this for now and work on the first iteration for the implementation. The discussion here hasn't lead anywhere and we have spent two weeks from the original three week ideal. As I've tried to express here, I have an idea how this can be implemented so I'll proceed with this and we'll see how it comes out. |
Ok, try to implement a first version. |
Introduction
Currently, each Gevulot node runs in isolation and if there is downtime for any given node, it will miss out the transactions distributed meanwhile. Due to deterministic and cross-verifying (and kind HA) nature of the overall devnet cluster, this is ok for the devnet so far.
Recently, however, we learned from a use case where single proving workload would be too resource intensive to execute on all [available] nodes as the design is now and we need to incorporate a VRF to schedule Run transactions only to individual nodes. This will then require a node syncing functionality so that transactions won't get lost from the devnet overall and that there is possibility to catch up from where the node was left, when there is maintenance downtime for the node.
Objective
Implement node syncing so that when the node starts from scratch or when the node has been shutdown for some period of time, that it will sync itself from another peer.
After syncing, the new node must have all transactions in the database.
Transactions that have been received via syncing mechanism, must not be re-executed. These transactions must also have original timestamp retained. Missing deployments / programs must be downloaded.
Syncing should take place before the scheduler is started.
In all cases, the nodes in the cloud should always have all data at any given point in time.
Possible ideas for implementation
Since devnet does not have any kind of consensus protocol or other distributed system's ordering mechanism employed, there is no way of putting all transactions into absolute order on distinct nodes.
However, each transaction has node specific timestamp and a hash that is globally consistent. With these properties, there should be a way to implement the syncing in relatively efficient and reliable way.
Transactions can be aggregated into groups by some time period and then within the group they can be sorted by the transaction hash. When overlapping the time periods between nodes, the nodes should be able to find common begin and end for the aggregation groups. To make this grouping deterministic in distributed setting, we could borrow a "chunking" mechanism from the FastCDC algorithm and computing a deterministic check point from a stream of concatenated transaction hashes.
These groups can be then hashed and marked as synced.
When node starts, it can submit a list of synced groups (or maybe last 50 groups?) and that way find the optimal starting point for the syncing.
When the starting point has been found out, the node that needs missing data, can then request all transactions (serialized, containing timestamp and execution status) for the next group and persist. Iterating this way up until the present state. P2P network should be running all the time, receiving transactions into the scheduling queue (but scheduler must not run yet). If the syncing method receives a transaction with
executed: true
, while the same transaction is still sitting in the scheduling queue, it should be removed from the queue.One should also consider situation where two nodes have forked for some reason and have diverged. In this case, the nodes should find the last common checkpoint (the group of transactions) and from there onward, proceed by exchanging lists of transaction hashes for the following groups and sync each other's missing transactions, effectively "zipping up" themselves up to the present state.
In the present architecture, it is most natural to incorporate functions related to this, into P2P implementation where individuals can perform unicast RPC messaging.
The text was updated successfully, but these errors were encountered: