Skip to content

Commit

Permalink
fix: duplicate bids publication (#1747)
Browse files Browse the repository at this point in the history
* chore: update development dump

* fix: fetch current bid number each time on creation

* fix: run publishBids with lock

* fix: change withLock signature and tests to accept job context

* chore: address pr comments

* chore: update snapshot testnet url

* chore: publish bids every 5 minutes
  • Loading branch information
1emu authored Apr 4, 2024
1 parent 63d29d6 commit c994b71
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GATSBY_VESTING_DASHBOARD_URL=https://vesting.decentraland.org/%23/

# Snapshot integration
GATSBY_SNAPSHOT_API=https://testnet.hub.snapshot.org/
GATSBY_SNAPSHOT_URL=https://demo.snapshot.org/
GATSBY_SNAPSHOT_URL=https://testnet.snapshot.org/
GATSBY_SNAPSHOT_QUERY_ENDPOINT=https://api.thegraph.com/subgraphs/name/snapshot-labs/snapshot
GATSBY_SNAPSHOT_SPACE=daotest.dcl.eth
GATSBY_SNAPSHOT_DURATION=600
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Then follow instructions on [Snapshot](https://docs.snapshot.org/spaces/create)

- [erc20-balance-of](https://snapshot.org/#/playground/erc20-balance-of)

- [delegation](https://demo.snapshot.org/#/strategy/delegation)
- [delegation](https://testnet.snapshot.org/#/strategy/delegation)

```json
{
Expand Down
Binary file modified scripts/development.dump
Binary file not shown.
73 changes: 73 additions & 0 deletions src/back/jobs/jobLocks.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import JobContext from 'decentraland-gatsby/dist/entities/Job/context'

import { JOB_LOCKS, isLockAcquired, withLock } from './jobLocks'

describe('Job Locking Mechanism', () => {
const exampleContext: JobContext = new JobContext(
'id',
'handler',
{},
async () => {},
async () => {}
)

const immediateJob = jest.fn().mockResolvedValue(0)

const delayedJob = jest
.fn()
.mockImplementation(
() => new Promise((resolve, reject) => setTimeout(() => reject(new Error('Simulated job error')), 100))
)

beforeEach(() => {
JOB_LOCKS.clear()
jest.clearAllMocks()
})

it('should acquire and release lock for a job that completes successfully', async () => {
const jobName = 'immediateJob'
const lockedJob = withLock(jobName, immediateJob)

expect(isLockAcquired(jobName)).toBe(false)
await lockedJob(exampleContext)

expect(immediateJob).toHaveBeenCalledTimes(1)
expect(isLockAcquired(jobName)).toBe(false)
})

it('should acquire and release lock for a job that fails', async () => {
const jobName = 'delayedJob'
const lockedJob = withLock(jobName, delayedJob)

try {
await lockedJob(exampleContext)
} catch (error) {
expect(delayedJob).toHaveBeenCalledTimes(1)
expect(isLockAcquired(jobName)).toBe(false) // Lock should be released even on error
expect(error).toBeDefined() // Ensure error is thrown
}
})

it('should not execute job if it is already running', async () => {
const jobName = 'concurrentJob'
const firstJob = withLock(jobName, delayedJob)
const secondJob = withLock(jobName, immediateJob)

const firstPromise = firstJob(exampleContext)
const secondPromise = secondJob(exampleContext)
try {
await Promise.all([firstPromise, secondPromise])
} catch (error) {
expect((error as Error).message).toBe('Simulated job error')
}

expect(delayedJob).toHaveBeenCalledTimes(1)
expect(immediateJob).not.toHaveBeenCalled()

// Try again after lock is released
await secondJob(exampleContext)
expect(immediateJob).toHaveBeenCalledTimes(1) // Second job should execute now

expect(isLockAcquired(jobName)).toBe(false)
})
})
30 changes: 30 additions & 0 deletions src/back/jobs/jobLocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import JobContext from 'decentraland-gatsby/dist/entities/Job/context'

type JobFunction = (context: JobContext) => Promise<void>

export const JOB_LOCKS = new Map<string, boolean>()

export const isLockAcquired = (jobName: string): boolean => {
return JOB_LOCKS.has(jobName)
}

const acquireLock = (jobName: string): void => {
JOB_LOCKS.set(jobName, true)
}

const releaseLock = (jobName: string): void => {
JOB_LOCKS.delete(jobName)
}

export const withLock = (jobName: string, jobFunction: JobFunction) => async (context: JobContext) => {
if (isLockAcquired(jobName)) {
console.log(`${jobName} is already running.`)
return
}
acquireLock(jobName)
try {
await jobFunction(context)
} finally {
releaseLock(jobName)
}
}
2 changes: 1 addition & 1 deletion src/config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"GATSBY_SNAPSHOT_API": "https://testnet.hub.snapshot.org/",
"GATSBY_SNAPSHOT_DURATION": "300",
"GATSBY_SNAPSHOT_SPACE": "daotest.dcl.eth",
"GATSBY_SNAPSHOT_URL": "https://demo.snapshot.org/",
"GATSBY_SNAPSHOT_URL": "https://testnet.snapshot.org/",
"GATSBY_SNAPSHOT_QUERY_ENDPOINT": "https://api.thegraph.com/subgraphs/name/snapshot-labs/snapshot",
"SNAPSHOT_STATUS_ENABLED": "true",
"GATSBY_SUBMISSION_THRESHOLD_DRAFT": "100",
Expand Down
2 changes: 1 addition & 1 deletion src/config/env/local.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"GATSBY_DEFAULT_CHAIN_ID": "11155111",
"GATSBY_GOVERNANCE_API": "https://localhost:8000/api",
"GATSBY_SNAPSHOT_API": "https://testnet.hub.snapshot.org/",
"GATSBY_SNAPSHOT_URL": "https://demo.snapshot.org/",
"GATSBY_SNAPSHOT_URL": "https://testnet.snapshot.org/",
"GATSBY_SNAPSHOT_QUERY_ENDPOINT": "https://api.thegraph.com/subgraphs/name/snapshot-labs/snapshot",
"GATSBY_SNAPSHOT_SPACE": "daotest.dcl.eth",
"GATSBY_SNAPSHOT_DURATION": "600",
Expand Down
3 changes: 2 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import YAML from 'yaml'

import { giveTopVoterBadges, runAirdropJobs } from './back/jobs/BadgeAirdrop'
import { pingSnapshot } from './back/jobs/PingSnapshot'
import { withLock } from './back/jobs/jobLocks'
import airdrops from './back/routes/airdrop'
import badges from './back/routes/badges'
import bid from './back/routes/bid'
Expand Down Expand Up @@ -62,7 +63,7 @@ import { GOVERNANCE_URL } from './constants'
const jobs = manager()
jobs.cron('@eachMinute', finishProposal)
jobs.cron('@eachMinute', activateProposals)
jobs.cron('@eachMinute', publishBids)
jobs.cron('@each5Minute', withLock('publishBids', publishBids))
jobs.cron('@each10Second', pingSnapshot)
jobs.cron('@daily', updateGovernanceBudgets)
jobs.cron('@daily', runAirdropJobs)
Expand Down
14 changes: 8 additions & 6 deletions src/services/BidService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,8 @@ export default class BidService {
tendersWithBidsToReject.push(tenderId)
continue
}

const totalBids = await ProposalModel.getProposalTotal({
type: ProposalType.Bid,
})
let bid_number = totalBids + 1
for (const bid of bids) {
const bid_number = await this.getCurrentBidNumber()
const { author_address, bid_proposal_data, linked_proposal_id, publish_at, created_at } = bid
const finish_at = Time.utc(publish_at).add(Number(process.env.DURATION_BID), 'seconds').toDate()
try {
Expand All @@ -119,7 +115,6 @@ export default class BidService {
required_to_pass,
finish_at,
})
bid_number++
await UnpublishedBidModel.removePendingBid(author_address, linked_proposal_id)
context.log(`Bid from ${author_address} for tender ${linked_proposal_id} published`)
} catch (error) {
Expand Down Expand Up @@ -148,6 +143,13 @@ export default class BidService {
}
}

private static async getCurrentBidNumber() {
const totalBids = await ProposalModel.getProposalTotal({
type: ProposalType.Bid,
})
return totalBids + 1
}

static async getUserBidOnTender(user: string, tenderId: string) {
const bids = await UnpublishedBidModel.getBidsInfoByTender(tenderId)
return bids.find((bid) => bid.author_address === user) || null
Expand Down

0 comments on commit c994b71

Please sign in to comment.