Skip to content

Commit

Permalink
feat: sync-registries task refinements
Browse files Browse the repository at this point in the history
  • Loading branch information
wwills2 committed Oct 8, 2024
1 parent ac9b560 commit a37775c
Showing 1 changed file with 27 additions and 29 deletions.
56 changes: 27 additions & 29 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,41 +180,42 @@ const syncOrganizationAudit = async (organization) => {

if (!rootHistory?.length) {
logger.warn(
`Could not find root history for ${organization.name} (orgUid ${organization.orgUid}) with timestamp ${currentGeneration.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`,
`Could not find root history for ${organization.name} (orgUid ${organization.orgUid}), something is wrong and the sync for this organization will be paused until this is resolved.`,
);
return;
}

let lastRootSaved;
let lastRootSavedToAuditTable;

if (CONFIG.USE_SIMULATOR) {
console.log('USING MOCK ROOT HISTORY');
lastRootSaved = rootHistory[0];
lastRootSaved.rootHash = lastRootSaved.root_hash;
lastRootSaved.generation = 0;
lastRootSavedToAuditTable = rootHistory[0];
lastRootSavedToAuditTable.rootHash = lastRootSavedToAuditTable.root_hash;
lastRootSavedToAuditTable.generation = 0;
} else {
logger.debug(
`querying audit table for last root of ${organization.name}`,
);
lastRootSaved = await Audit.findOne({
lastRootSavedToAuditTable = await Audit.findOne({
where: { registryId: organization.registryId },
order: [['generation', 'DESC']],
raw: true,
});

if (lastRootSaved) {
if (lastRootSavedToAuditTable) {
// There was an oversight in the audit model where we named it onChainConfirmationTimeStamp but
// the RPC result calls in timestamp. This is a temporary fix to ensure that we can still sync
lastRootSaved.timestamp = Number(
lastRootSaved?.onchainConfirmationTimeStamp || 0,
lastRootSavedToAuditTable.timestamp = Number(
lastRootSavedToAuditTable?.onchainConfirmationTimeStamp || 0,
);
lastRootSaved.root_hash = lastRootSaved.rootHash;
lastRootSavedToAuditTable.root_hash =
lastRootSavedToAuditTable.rootHash;
}
}

let currentGeneration = _.get(rootHistory, '[0]');
const highestStoreGeneration = _.get(rootHistory, '[0]');

if (!lastRootSaved) {
if (!lastRootSavedToAuditTable) {
logger.info(
`Syncing new registry ${organization.name} (store ${organization.orgUid})`,
);
Expand All @@ -223,12 +224,13 @@ const syncOrganizationAudit = async (organization) => {
await Audit.create({
orgUid: organization.orgUid,
registryId: organization.registryId,
rootHash: currentGeneration.root_hash,
rootHash: highestStoreGeneration.root_hash,
type: 'CREATE REGISTRY',
generation: 0,
change: null,
table: null,
onchainConfirmationTimeStamp: currentGeneration.timestamp.toString(),
onchainConfirmationTimeStamp:
highestStoreGeneration.timestamp.toString(),
});

// Destroy existing records for this singleton
Expand All @@ -238,7 +240,7 @@ const syncOrganizationAudit = async (organization) => {
await Promise.all(
Object.keys(ModelKeys).map(async (modelKey) => {
logger.debug(
`peforming destroy operation on home organization data in model ${modelKey}`,
`performing destroy operation on home organization data in model ${modelKey}`,
);
ModelKeys[modelKey].destroy({
where: {
Expand All @@ -249,11 +251,9 @@ const syncOrganizationAudit = async (organization) => {
);

return;
} else {
currentGeneration = lastRootSaved;
}

const lastProcessedIndex = currentGeneration.generation;
const lastProcessedIndex = lastRootSavedToAuditTable.generation - 1;
logger.debug(
`1 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
);
Expand All @@ -262,7 +262,7 @@ const syncOrganizationAudit = async (organization) => {
const syncRemaining = rootHistoryZeroBasedCount - lastProcessedIndex;
const isSynced = syncRemaining === 0;
logger.debug(
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation is ${lastProcessedIndex}`,
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${lastRootSavedToAuditTable.generation}`,
);
logger.debug(
`2 the highest root history index is ${rootHistoryZeroBasedCount}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
Expand All @@ -287,16 +287,16 @@ const syncOrganizationAudit = async (organization) => {

const toBeProcessedIndex = lastProcessedIndex + 1;
logger.debug(
`3 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`3 Last processed generation index of ${organization.name}: ${lastProcessedIndex}`,
);
logger.debug(
`4 To be processed index of ${organization.name}: ${toBeProcessedIndex}`,
`4 To be processed generation index of ${organization.name}: ${toBeProcessedIndex}`,
);

// Organization not synced, sync it
logger.info(' ');
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedIndex} (store ${organization.orgUid})`,
`Syncing ${organization.name} generation index ${toBeProcessedIndex} (store ${organization.orgUid})`,
);
logger.info(
`${organization.name} is ${syncRemaining} DataLayer generations away from being fully synced (store ${organization.orgUid}).`,
Expand Down Expand Up @@ -328,7 +328,7 @@ const syncOrganizationAudit = async (organization) => {
const orgRequiredResetDueToInvalidGenerationIndex =
await orgGenerationMismatchCheck(
organization.orgUid,
lastProcessedIndex,
lastRootSavedToAuditTable.generation,
sync_status.generation,
sync_status.target_generation,
);
Expand Down Expand Up @@ -427,15 +427,13 @@ const syncOrganizationAudit = async (organization) => {
author: '',
};

logger.debug(`optimized kv diff is empty between ${organization.name} generations ${lastProcessedIndex}
and ${toBeProcessedIndex}\n(roots [generation ${lastProcessedIndex}] ${lastProcessedRoot}
and [generation ${toBeProcessedIndex}] ${rootToBeProcessed})`);
logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex}
(roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`);
logger.debug(`creating audit entry`);
await Audit.create(auditData, { transaction, mirrorTransaction });
} else {
logger.debug(`processing optimized kv diff for ${organization.name} generations ${lastProcessedIndex}
and ${toBeProcessedIndex}\n(roots [generation ${lastProcessedIndex}] ${lastProcessedRoot}
and [generation ${toBeProcessedIndex}] ${rootToBeProcessed})`);
logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex}
(roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`);

for (const diff of optimizedKvDiff) {
const key = decodeHex(diff.key);
Expand Down

0 comments on commit a37775c

Please sign in to comment.