Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Implement node sync #170

Open
tuommaki opened this issue Apr 2, 2024 · 7 comments
Open

Implement node sync #170

tuommaki opened this issue Apr 2, 2024 · 7 comments
Assignees

Comments

@tuommaki
Copy link
Contributor

tuommaki commented Apr 2, 2024

Introduction

Currently, each Gevulot node runs in isolation and if there is downtime for any given node, it will miss out the transactions distributed meanwhile. Due to deterministic and cross-verifying (and kind HA) nature of the overall devnet cluster, this is ok for the devnet so far.

Recently, however, we learned from a use case where single proving workload would be too resource intensive to execute on all [available] nodes as the design is now and we need to incorporate a VRF to schedule Run transactions only to individual nodes. This will then require a node syncing functionality so that transactions won't get lost from the devnet overall and that there is possibility to catch up from where the node was left, when there is maintenance downtime for the node.

Objective

Implement node syncing so that when the node starts from scratch or when the node has been shutdown for some period of time, that it will sync itself from another peer.

After syncing, the new node must have all transactions in the database.

Transactions that have been received via syncing mechanism, must not be re-executed. These transactions must also have original timestamp retained. Missing deployments / programs must be downloaded.

Syncing should take place before the scheduler is started.

In all cases, the nodes in the cloud should always have all data at any given point in time.

Possible ideas for implementation

Since devnet does not have any kind of consensus protocol or other distributed system's ordering mechanism employed, there is no way of putting all transactions into absolute order on distinct nodes.

However, each transaction has node specific timestamp and a hash that is globally consistent. With these properties, there should be a way to implement the syncing in relatively efficient and reliable way.

Transactions can be aggregated into groups by some time period and then within the group they can be sorted by the transaction hash. When overlapping the time periods between nodes, the nodes should be able to find common begin and end for the aggregation groups. To make this grouping deterministic in distributed setting, we could borrow a "chunking" mechanism from the FastCDC algorithm and computing a deterministic check point from a stream of concatenated transaction hashes.

These groups can be then hashed and marked as synced.

When node starts, it can submit a list of synced groups (or maybe last 50 groups?) and that way find the optimal starting point for the syncing.

When the starting point has been found out, the node that needs missing data, can then request all transactions (serialized, containing timestamp and execution status) for the next group and persist. Iterating this way up until the present state. P2P network should be running all the time, receiving transactions into the scheduling queue (but scheduler must not run yet). If the syncing method receives a transaction with executed: true, while the same transaction is still sitting in the scheduling queue, it should be removed from the queue.

One should also consider situation where two nodes have forked for some reason and have diverged. In this case, the nodes should find the last common checkpoint (the group of transactions) and from there onward, proceed by exchanging lists of transaction hashes for the following groups and sync each other's missing transactions, effectively "zipping up" themselves up to the present state.

In the present architecture, it is most natural to incorporate functions related to this, into P2P implementation where individuals can perform unicast RPC messaging.

@musitdev
Copy link
Contributor

musitdev commented Apr 4, 2024

Tx replication:

Normal Tx process is: Validate -> Verify dep -> propagate -> Save -> Execute -> Mark as executed

the algo of the node init and bootstrap

2 nodes are involved: the booting node that start and the bootstrap node that send the missing tx.

  1. If not already defined, get the last executed Tx. Use it for bootstrap, must be done before receiving new Tx. This Tx is saved (db or local conf) because if the bootstrap fail it should restart from the same point.
  2. Connect p2p. Wait enough peers are connected (define the min number of peers)
  3. Receive new Tx from p2p: process as normal. If there's missing Tx to process, the verify dep step will put in wait the Tx.
    New Tx that has all its dep can be executed.
    New Tx are received before the start of the bootstrap to avoid to miss tx that arrive during the bootstrap process.
  4. Start bootstrap process in //

Bootstrap algo

  • From last executed Tx get the creation Tx timestamp, see spec about it later.
  • Calculate the start Tx time recovery using: <create timestamp> - (2 * <max Tx execution time>).
    We open a window from last executed Tx less the max time that a Tx can be executed.
    If the node was running correctly, it should have executed all Tx before this time.
    So before this time the node is up-to-date. I multiply by 2 the max exec time to add some room for Tx executing on other nodes.
  • Get all local Tx after this start time until now, sorted by the creation time.
  • If no Tx are present (node init) the sync will be done with no start Tx.
  • Cut this set in chunk of 10 Tx for example. I don't think CDC will help here because it's done for deduplication of data in local that we don't need.
    Simple chunk do the same, and it's less complex.
  • Calculate the hash of each chunk. The hash of a chunk is calculated as follows: Tx in the chunk are ordered by their hash. Build an array of Tx hash from this ordered Tx.
    Hash the array to get the chunk hash.
  • Build an array of all chunk hash.
  • Define the bootstrap node:
    • From the peers list: order the peers by Ip. Generate a random number to get the index of the bootstrap node in the ordered peers list.
    • Bootstrap request use the RPC entry point of the node (to be validated)
  • The booting node send the start Tx hash if present and create date to the bootstrap node.
  • The bootstrap node generates the chunk hash list the same way using the provided Tx hash and create date or from the beginning if no start Tx is sent.
    • If the bootstrap node doesn't have the start Tx if return an error and the booting node find a new peer until it get one that reply ok. This error is logged on both nodes.
  • The bootstrap node returns the chunk hash list it has generated.
  • The booting node verify each chunk hash from the oldest to the new one. If the chunk match, it goes to the next.
  • If the chunk mismatches, the booting node request the Tx hash of the chunk. The booting node verify each received tx to detect the one that is missing.
    • For each local missing Tx it asks the bootstrap node the Tx content. The tx is sent to the received Tx process for validation and verification. The Tx is saved as executed, and it's not propagated or executed.
    • For each Tx missing on the bootstrap node, the booting node do nothing and log the error. It's nearly impossible to tell why a Tx miss in a chunk from the bootstrap node. For example, in the last chunk, there's last received Tx on both side. A missing Tx in the bootstrap node chunk can mean it doesn't already receive it, so if we send it, the bootstrap node won't execute it when it will receive it from the network.
  • This process is done for each chunk until all missing tx are downloaded.
  • If a missing tx from the chunk is found to be already present in the db (executed or not), it means that the Tx arrive via the p2p during the bootstrap process. The missing chunk Tx is not saved.
  • After this verification, the booting node send a message to the bootstrap node to indicate the bootstrap process is ended. The bootstrap node removes the calculated chunk from memory.
  • The booting node, remove the get last executed Tx from config or db.
  • bootstrap end

Creation Tx timestamp:

A creation timestamp must be added to the Tx struct. There are 2 possibilities:

  • Initialize it when the Tx is created by the client. Pro: the time is part of the Tx hash and can be verified. Cons: it will have the client time. On the RPC side, we can reject Tx that are in the future or too far in the past.
  • Initialize it when the RPC receive it. Pro: it uses the RPC server time that we control. Cons: it's not part of the Tx hash, so it can be changed by a node.

From the issue description:

  • starting point for the syncing: <Last executed Tx create timestamp> - (2 * <max Tx execution time>)
  • Chunk definition: 10 Tx per chunk from the starting point Tx.
  • Tx received during the bootstrap process are processed and executed.
  • CDC doesn't add to the replication algo because its goal is to avoid duplication from file content. Simple chunk verification is enough. To be define is the size of the chunk. Can be variable and define from the nb Tx found on the booting node is needed. At first, I define 10 tx. The last chunk can be less, it's ok.
  • RPC is open at the end of the process, but it's not mandatory. Node can process RPC request during the bootstrap.
  • The important point is the start date of the first chunk, Tx. I define - (2 * ) to be not too late and to maximize the risk of not skipping a tx.
  • Detected missing Tx on the bootstrap node are node send to the bootstrap node.

To be defined:

  • Size of the chunk: fix, dynamic, default size.
  • The communication process between the nodes to get synchronization data. Currently, I define it using RPC.
  • Where the "get last executed Tx Hash" is stored: local config file, config table in the DB, ... If the Tx received during bootstrap are not executed but put in a buffer, we can avoid this saving. On the other hand, we buffer Tx that fill the memory. Tx sync can be long because of file size.

Dev

This process is independent of the scheduling and normal Tx processing. To ease the integration, I think we should do a little refactoring first. Move all Txvalidation into Mempool and mempool decide to send Tx to execution. We change from a pull to a push to execution model for mempool.
With this change, we can integrate the bootstrap process.
After we should remove Tx management from execution layer. Some Tx doesn't need execution like Verify or deploy Tx that need only to be marked to be executed. So this will allow to separate Tx processing (I decide what to do for each Tx) and task execution (execute on the VM).
In the end, the execution module executes only tasks and return the result to the mempool Tx processing.
The change to task execution will help to integrate new type of tx or new way to execute Tx like not always executing Tx.

@tuommaki
Copy link
Contributor Author

tuommaki commented Apr 6, 2024

Your proposal has a good idea when it begins from the latest transactions, as that allows node to quickly catch up the online operation. However, I believe it has several problems:

Chunking

I don't think CDC will help here because it's done for deduplication of data in local that we don't need.
...
CDC doesn't add to the replication algo because its goal is to avoid duplication from file content. Simple chunk verification is enough.

Wrong. I don't think you understood how it works. Data deduplication is one of the most obvious applications of it, but the content defined chunking itself solves the problem of having distributed agreement on chunk boundaries over large amount of data when bits change anywhere in it.

Your proposal has the very problem that CDC actually solves: When a single transaction arrives in the middle of all transactions (i.e. some long running node finally syncs with another node that has that single missing piece), all chunks invalidate because that simple chunk boundary of 10 transactions shifts everything and this then changes every single chunk hash.

This brings us to second problem: Chunking from newest to oldest will always shift the chunk boundaries if those are counted by number of transactions.

Tx creation timestamp

I don't like this idea at all. It introduces obvious problems when time is not strictly, precisely synced on every single node of the network. There will be situations where Run transaction has been created after the first Proof, which is wrong.

The timestamp must never be used for anything else than rough sampling of transactions that happened around the same time. This is somewhat similar to location: Timestamp in our case is similar to district name in location. It's not a high precision coordinate to specific spot, but a vague concept of "about there", which helps to limit the amount of data needed to be processed.

The timestamp should be kept always as a node-local concept, because then it's consistent between distinct samples. When all timestamps have been created locally, then one can analyze transaction execution time and progress by looking at timestamps in the database.

In distributed environment, wall clock timestamps must never be used for anything that needs absolute precision.

Defect in my proposal and resolution of it

When writing above, I realized that even my first proposal has a severe defect because it uses the timestamps. It can get into infinite syncing across nodes over time, if two nodes have same transaction with different timestamps where the delta is larger than the size of a single sampling window, like e.g. 5 days apart, then in this case, every time these nodes sync, depending on with whom they sync, the changes go back and forth.

However, at this point I'd accept this defect in order to keep the solution simple enough:

  • Because of CDC, that re-syncing of transaction[s] is limited to small window.
  • I assume and expect that the nodes that are participating in our network are roughly within 5 minutes apart from each other.
    • We can even add small functionality to node to ensure this and reject from running if this is not the case.

@musitdev
Copy link
Contributor

musitdev commented Apr 8, 2024

To agree on an algorithm, I think we should first agree on the goal and assumption.

Goal

We have 2 nodes:

  • One that start called Booting Node that have missed some Tx.
  • One that work correctly call bootstrap node and have all Txs .

The goal is:

  • Copy all Txs present in the bootstrap node and missing in the booting node to the booting node. The opposite is not done.
  • During the process, new Tx are created on the network and should be present in the booting node at the end of the process.
  • a side goal of the algorithm is to minimize the number of data exchanged between the node to transfer the missing Txs.

If the node start with an empty database, we should consider using a snapshot or say that the node won't have the Tx before a certain time and use this algo.

Assumptions:

The process is to manage the case that a node crash and the operator restart it later.
We suppose that:

  • before the crash, the node was working correctly and didn't miss Tx.
  • During the crash, the node didn't get any Tx.
  • Each node receives the Tx in a different order, but they get the same Tx after some time that depend mostly on the time to propagate in the network and the time to download the Tx files. When a node creates a tx, it propagates it to the network just after downloading its file. Another node receive the Tx, download the corresponding asset and save it. In this process, the longest step is the download.

Example of Tx timeline between the 2 nodes when the booting node start

 Boostrap node ..|t4e|t5e|t6e|t7e|t8e|t9e|t10e|t11e|t12e|t13s|t14e|t15s|t16d|t17d|t18s|t19e|t20d

 Booting node   ..|t5e|t4e|t6e|t7s|t9e|t10s|t8s|t11d|14d|             ..         |t18s|t20d|t19d
                                             T1         T2Crash                 T3Start        T4Endbt
                            TBegin

e:executed, s: saved, d: downloading/propagate

From this timeline, the algo will determine to copy from the boostrap node to the booting one the Tx: 11,12,13,14,15,16,17

From this timeline we define these assumptions:

  • To avoid testing all Tx in the boostrap node DB we've to find a way to detect T1. Before T1 there's no missing Tx in the booting node.
  • T2Crash is the date of the last Tx save in the DB.
  • T3Start is the time when the booting node's start.
  • Between T1 and T2Crash booting node can miss Tx.
  • After T2Crash all Tx are missing in the booting node.

Algorithms :

All Tx arriving from the p2p or RPC during the bootstrap are processed and added to the DB as the normal process.

Algo1 the simplest:

Define a time TBegin that is before T1. For example, we estimate that T1 can't be before 30 mn behind T2crash (last Tx saved).
From TBegin/Tx6, bootstrap node send all its Tx Hash (saved and downloading) that are after Tx6. So it returns Tx6 to Tx18 Hashes (it gets Tx18 before getting the booting node request).
The booting node compare all received Tx Hash with its Tx in the DB. Missing Tx are requested and saved executed.

Pro: simple
Cons:

  • can miss Tx if Tbegin is after T1
  • can transfer a lot more Tx Hash is Tbegin is for behind T1

Algo2 Try to optimize the T1 discovery

The idea is the booting node request the bootstrap node to build a chunk of tx, calculate the hash of each chunk and send all chunk hash back.
The booting node build the same chunk hash set and determine which chunk are different. If the chunks are ordered by time, the beginning chunk should have no modification (before T1) after some modifications are visible (after T1) then some chunks are missing on the booting node part.
To do that, Tx should be ordered in the time the same way on both nodes. I propose to introduce a creation_time field, set when the Tx is created first. It contains the timestamp (UTC +0) of the creation. If nodes order the Tx using this fields, they will have the same order because it has the same value on all nodes, and it's timely ordered. If all node has nearly the same date (a few second diff) the chunk will be timely ordered.
If the time between node is too different, some Tx can be outside the chunk windows [Tbegin, now] and miss. For this reason, the chunk windows should be important: 30 mn or 1 hour.

When the booting node detect a chunk with a different hash, it requests all the Tx hash of the chunk, compare the Tx hash with the one in the Db and request the TX that are missing.

How to determine the chunk size.

In my first version, I propose to use the same size for all chunk. It can be optimized by increasing the size more the chunk is far from T1 (before/after) because it's more likely that there's no modification if it's far before T1 or all is missing if far after. As we don't have 1000 Tx per second, not sure if it optimizes a lot of the transferred data volume.

About the CDC

CDC operate on arbitrary binary data and try to build chunk of these data where chunk are build using hashing technique so that if we take 2 files with the same data and a few modifications in one. The chunk of unmodified data will be the same, and modified data will be inside the chunk by changing the chunk size.
Here, we don't have an arbitrary binary data but Tx hash with fixed length. There is no way to force the CDC algo to cut chunk only at hash boundary. So chunk will have at the beginning or at the end a truncated Tx hash. If we find a chunk modified, how to request the truncated Tx hash?
In our problem we have modification only at a specific point T1 and after all chunks are modified or missing in the bootstrap node.
I really not sure that CDC will help in finding T1. Using size varying chunk depending on the distance to T2 crash can be more efficient to minimize the number of chunk hash transferred and analyzed.
I am not sure that we need to have varying chunks. We can define instead of 10 Tx for the size a value proportional to the number of Tx between TBegin and T2Crash on the booting node.
After if the data transfer is important we can build the algo in an interactive way where the booting node request chunk one by one starting a T2Crash and increasing the size depending on the number of missing found.

General Algorithm.

Never mind this can can be changed later if we agree on the base algorithm:

  • On the booting node, get all Tx hash between T2Crash and less some time. Define TxBegin in the past where the search starts. It defines the interval [Txbegin, T2Crash]
  • Order this set. Using a creation date will help to minimize the number of chunk containing missing Tx.
  • build chunk from this set
  • On bootstrap node, do the same with the Tx interval: [TXbegin, now (db + download)]
  • On the booting node, compare the chunks Hash.
  • If one chunk differ or is missing, request the Tx hash of the chunk to the bootstrap node
  • Query the booting node (DB + Download) to determine the missing Tx.
  • Request the missing Tx to the bootstrap nodes.

Other possibility:

Order the Tx hash set by Hash and use CDC to see if it's efficient to determine the chunk (We've to solve the Tx hash boundary pb).
The risk is that all Tx missing send after T2Crash are spread in all chunk and every chunk are modified, so in the end we transfer all Tx hash.

Change to allow the synchronization:

Move the Tx waiting part after saving and propagation so that waiting process don't interfere with the sync process.

Questions:

At start, does not executed TX should be executed or just marked as executed by the bootstrap process? For example, Tx (T8,T10) are not executed but found executed on the bootstrap node. Do we execute them as we do today?

T15,T16,T17 are not executed on the bootstrap node but will be sent to the booting node during the bootstrap process. Do the booting node execute them?

@tuommaki
Copy link
Contributor Author

tuommaki commented Apr 9, 2024

Assumption

I think your first assumption undermines rest of the reasoning:

before the crash, the node was working correctly and didn't miss Tx.

This is wrong. A booting node can, and often will, miss transactions in-between.

Easy way to produce this:

Say there are nodes A, B and C.

Node A is broadcasting transaction to B and C. It manages to send it to B but it crashes/restarts/looses network connectivity before it gets to send it to C.

Now Node A and B have transaction T, but node C doesn't.

Nodes continue working as before.

Node C was functioning correctly all the time. In fact, it has the longest uptime of all of them, until it's restarted for an upgrade. At this point, when it syncs itself at startup, it realizes it misses individual transactions from here & there.

This is one scenario where the static chunk size fails, because changes in the middle will result with changes in all consecutive chunks, when only one chunk would need re-syncing.

Empty node boot

If the node start with an empty database, we should consider using a snapshot or say that the node won't have the Tx before a certain time and use this algo.

I agree with this, but I think this is kind of an optimization (an important, but still). We probably want to build a separate tool/worker/task that on longer interval generates bigger exports from DB that can be directly imported to empty DB.

CDC

There is no way to force the CDC algo to cut chunk only at hash boundary. So chunk will have at the beginning or at the end a truncated Tx hash. If we find a chunk modified, how to request the truncated Tx hash?

If our stream of data, that CDC computes over, is aligned by transaction hashes, can't we then easily calculate the Nth transaction by simply dividing the offset by hash length and add 1?

I really not sure that CDC will help in finding T1.

Actually, it does. As I initally wrote:

These groups can be then hashed and marked as synced.

To elaborate on that: When the nodes agree on chunk boundaries, they can hash the chunk contents (only the concatenated tx hashes, not all data) and these hash lists can be very efficiently transferred & compared between nodes. This way, nodes can sync whole chunks and it can easily cover also missing chunks from the beginning and in the middle.

Risk

The risk is that all Tx missing send after T2Crash are spread in all chunk and every chunk are modified, so in the end we transfer all Tx hash.

True, but is this considerable problem? I'm thinking that we should transfer chunks as list of transaction hashes and then transactions are fetched separately. If there are individual transactions missing here and there, it can cause number of chunks to be compared, but if the contained transactions are mostly present, the amount of processed data will still be kept relatively low and in the end, wouldn't this scenario still be relatively rare?

@musitdev
Copy link
Contributor

Assumption

I put this assumption to express the fact that at one moment, there must be a consensus on the Tx state on the network. If not, all nodes can have missing Tx during the normal process (Tx propagation can fail at any moment), so it's impossible to synchronize all the Tx from one node. Even if the booting node synchronize with all nodes, it can miss Tx if another node is down.

So as your scenario is possible, we have 2 choices I think:

  • integrate a consensus in the process
  • mitigate all these scenarios so that risk that one node has missing Tx is null.

I remove the master/slave possibility.

Otherwise, we have to accept that node can have missing tx.

As the rest of the algo depend on this assumption, we've to agree on it first.

CDC

About the hash cut, what I say is. let say the CDC do a chunk of 3 Tx with hash: AA, BB, CC
The input data of CDC is AABBCC and the chunk can be AAB BCC. So if on the booting node the first chunk is different, it's hard for the bootstrap node to get the hash of the Tx BB from the chunk. We need another algo to recompute all the Tx that belong to the chunk. For example, here chunk1: AA BB and chunk2: BB CC. We've to validate that there's no risk of missing tx in this reconstruction algo.

Risk

From the way crypto hash are created, I think this scenario can arrive with a few Tx because the spread on the hash space of the generated hash are maximum. So in the case we use alphanumeric sort for example, if you take a few Tx you'll get a very different first letter of the hash and the few Tx will be spread with the others.

@tuommaki
Copy link
Contributor Author

Otherwise, we have to accept that node can have missing tx.

Yes. I've tried to convey this the whole time: The syncing of all nodes together in relation to each other is eventual. Therefore it is expected that some nodes might have missing transactions at any point in time, but when they restart over time, they will catch up with those older missing transactions, while there might be newer ones again, depending on functioning of everything and timing of everything. Still, over time, in abstract level, this whole system approaches towards full sync in the grand scheme.

So if on the booting node the first chunk is different, it's hard for the bootstrap node to get the hash of the Tx BB from the chunk.

This is the reason why there must be agreement on inclusion and exclusion (begin/end).

I'll take over on this for now and work on the first iteration for the implementation. The discussion here hasn't lead anywhere and we have spent two weeks from the original three week ideal.

As I've tried to express here, I have an idea how this can be implemented so I'll proceed with this and we'll see how it comes out.

@musitdev
Copy link
Contributor

Ok, try to implement a first version.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants