Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor connections rate, rate per minute instead of second #785

Merged
merged 13 commits into from
Jan 9, 2025
Merged
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export INDEXER_INTERVAL=
export ALLOWED_ADMINS=
export DASHBOARD=true
export RATE_DENY_LIST=
export MAX_REQ_PER_SECOND=
export MAX_REQ_PER_MINUTE=
export MAX_CHECKSUM_LENGTH=
export LOG_LEVEL=
export HTTP_API_PORT=
Expand Down
2 changes: 1 addition & 1 deletion docs/dockerDeployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ services:
# INDEXER_INTERVAL: ''
DASHBOARD: 'true'
# RATE_DENY_LIST: ''
# MAX_REQ_PER_SECOND: ''
# MAX_REQ_PER_MINUTE: ''
# MAX_CHECKSUM_LENGTH: ''
# LOG_LEVEL: ''
HTTP_API_PORT: '8000'
Expand Down
3 changes: 2 additions & 1 deletion docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
- `ALLOWED_ADMINS`: Sets the public address of accounts which have access to admin endpoints e.g. shutting down the node. Example: `"[\"0x967da4048cD07aB37855c090aAF366e4ce1b9F48\",\"0x388C818CA8B9251b393131C08a736A67ccB19297\"]"`
- `DASHBOARD`: If `false` the dashboard will not run. If not set or `true` the dashboard will start with the node. Example: `false`
- `RATE_DENY_LIST`: Blocked list of IPs and peer IDs. Example: `"{ \"peers\": [\"16Uiu2HAkuYfgjXoGcSSLSpRPD6XtUgV71t5RqmTmcqdbmrWY9MJo\"], \"ips\": [\"127.0.0.1\"] }"`
- `MAX_REQ_PER_SECOND`: Number of requests per second allowed by the same client. Example: `3`
- `MAX_REQ_PER_MINUTE`: Number of requests per minute allowed by the same client (IP or Peer id). Example: `30`
- `MAX_CONNECTIONS_PER_MINUTE`: Max number of requests allowed per minute (all clients). Example: `120`
- `MAX_CHECKSUM_LENGTH`: Define the maximum length for a file if checksum is required (Mb). Example: `10`
- `IS_BOOTSTRAP`: Is this node to be used as bootstrap node or not. Default is `false`.

Expand Down
3 changes: 2 additions & 1 deletion scripts/ocean-node-quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ services:
# INDEXER_INTERVAL: ''
DASHBOARD: 'true'
# RATE_DENY_LIST: ''
# MAX_REQ_PER_SECOND: ''
# MAX_REQ_PER_MINUTE: ''
# MAX_CONNECTIONS_PER_MINUTE: ''
# MAX_CHECKSUM_LENGTH: ''
# LOG_LEVEL: ''
HTTP_API_PORT: '$HTTP_API_PORT'
Expand Down
3 changes: 2 additions & 1 deletion src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ export interface OceanNodeConfig {
assetPurgatoryUrl: string
allowedAdmins?: string[]
codeHash?: string
rateLimit?: number
rateLimit?: number // per request ip or peer
maxConnections?: number // global, regardless of client address(es)
denyList?: DenyList
unsafeURLs?: string[]
isBootstrap?: boolean
Expand Down
21 changes: 21 additions & 0 deletions src/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import { pipe } from 'it-pipe'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from './utils/logging/Logger.js'
import { Handler } from './components/core/handler/handler.js'
import { C2DEngines } from './components/c2d/compute_engines.js'

export interface RequestLimiter {
requester: string | string[] // IP address or peer ID
lastRequestTime: number // time of the last request done (in miliseconds)
numRequests: number // number of requests done in the specific time period
}

export interface RequestDataCheck {
valid: boolean
updatedRequestData: RequestLimiter
}
export class OceanNode {
// eslint-disable-next-line no-use-before-define
private static instance: OceanNode
Expand All @@ -20,6 +31,7 @@ export class OceanNode {
private c2dEngines: C2DEngines
// requester
private remoteCaller: string | string[]
private requestMap: Map<string, RequestLimiter>
// eslint-disable-next-line no-useless-constructor
private constructor(
private db?: Database,
Expand All @@ -28,6 +40,7 @@ export class OceanNode {
private indexer?: OceanIndexer
) {
this.coreHandlers = CoreHandlersRegistry.getInstance(this)
this.requestMap = new Map<string, RequestLimiter>()
if (node) {
node.setCoreHandlers(this.coreHandlers)
}
Expand Down Expand Up @@ -95,6 +108,14 @@ export class OceanNode {
return this.remoteCaller
}

public getRequestMapSize(): number {
return this.requestMap.size
}

public getRequestMap(): Map<string, RequestLimiter> {
return this.requestMap
}

/**
* Use this method to direct calls to the node as node cannot dial into itself
* @param message command message
Expand Down
49 changes: 47 additions & 2 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import StreamConcat from 'stream-concat'
import { Handler } from '../core/handler/handler.js'
import { getConfiguration } from '../../utils/index.js'
import { checkConnectionsRateLimit } from '../httpRoutes/requestValidator.js'
import { CONNECTIONS_RATE_INTERVAL } from '../../utils/constants.js'
import { RequestLimiter } from '../../OceanNode.js'

// hold data about last request made
const connectionsData: RequestLimiter = {
lastRequestTime: Date.now(),
requester: '',
numRequests: 0
}

export class ReadableString extends Readable {
private sent = false
Expand Down Expand Up @@ -60,10 +70,14 @@ export async function handleProtocolCommands(otherPeerConnection: any) {
return status
}

const denyList = await (await getConfiguration()).denyList
const configuration = await getConfiguration()
// check deny list configs
const { denyList } = configuration
if (denyList.peers.length > 0) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)
P2P_LOGGER.warn(
`Incoming request denied to peer: ${remotePeer} (peer its on deny list)`
)

if (connectionStatus === 'open') {
statusStream = new ReadableString(
Expand All @@ -79,6 +93,37 @@ export async function handleProtocolCommands(otherPeerConnection: any) {
return
}
}
// check connections rate limit
const requestTime = Date.now()
if (requestTime - connectionsData.lastRequestTime > CONNECTIONS_RATE_INTERVAL) {
// last one was more than 1 minute ago? reset counter
connectionsData.numRequests = 0
}
// always increment counter
connectionsData.numRequests += 1
// update time and requester information
connectionsData.lastRequestTime = requestTime
connectionsData.requester = remoteAddr

// check global rate limits (not ip related)
const requestRateValidation = checkConnectionsRateLimit(configuration, connectionsData)
if (!requestRateValidation.valid) {
P2P_LOGGER.warn(
`Incoming request denied to peer: ${remotePeer} (rate limit exceeded)`
)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Rate limit exceeded'))
)
try {
await pipe(statusStream, otherPeerConnection.stream.sink)
} catch (e) {
P2P_LOGGER.error(e)
}
}
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}

try {
// eslint-disable-next-line no-unreachable-loop
Expand Down
57 changes: 27 additions & 30 deletions src/components/core/handler/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import { OceanNode } from '../../../OceanNode.js'
import { OceanNode, RequestDataCheck, RequestLimiter } from '../../../OceanNode.js'
import { Command, ICommandHandler } from '../../../@types/commands.js'
import {
ValidateParams,
Expand All @@ -9,23 +9,12 @@ import {
import { getConfiguration } from '../../../utils/index.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { ReadableString } from '../../P2P/handlers.js'
import { CONNECTION_HISTORY_DELETE_THRESHOLD } from '../../../utils/constants.js'

export interface RequestLimiter {
requester: string | string[] // IP address or peer ID
lastRequestTime: number // time of the last request done (in miliseconds)
numRequests: number // number of requests done in the specific time period
}

export interface RequestDataCheck {
valid: boolean
updatedRequestData: RequestLimiter
}
export abstract class Handler implements ICommandHandler {
private nodeInstance?: OceanNode
private requestMap: Map<string, RequestLimiter>
private nodeInstance: OceanNode
public constructor(oceanNode: OceanNode) {
this.nodeInstance = oceanNode
this.requestMap = new Map<string, RequestLimiter>()
}

abstract validate(command: Command): ValidateParams
Expand All @@ -38,62 +27,69 @@ export abstract class Handler implements ICommandHandler {

// TODO LOG, implement all handlers
async checkRateLimit(): Promise<boolean> {
const ratePerSecond = (await getConfiguration()).rateLimit
const requestMap = this.getOceanNode().getRequestMap()
const ratePerMinute = (await getConfiguration()).rateLimit
const caller: string | string[] = this.getOceanNode().getRemoteCaller()
const requestTime = new Date().getTime()
let isOK = true

// we have to clear this from time to time, so it does not grow forever
if (requestMap.size > CONNECTION_HISTORY_DELETE_THRESHOLD) {
CORE_LOGGER.info('Request history reached threeshold, cleaning cache...')
requestMap.clear()
}

const self = this
// common stuff
const updateRequestData = function (remoteCaller: string) {
const updatedRequestData = self.checkRequestData(
remoteCaller,
requestTime,
ratePerSecond
ratePerMinute
)
isOK = updatedRequestData.valid
self.requestMap.set(remoteCaller, updatedRequestData.updatedRequestData)
requestMap.set(remoteCaller, updatedRequestData.updatedRequestData)
}

let data: RequestLimiter = null
if (Array.isArray(caller)) {
for (const remote of caller) {
if (!this.requestMap.has(remote)) {
if (!requestMap.has(remote)) {
data = {
requester: remote,
lastRequestTime: requestTime,
numRequests: 1
}
this.requestMap.set(remote, data)
requestMap.set(remote, data)
} else {
updateRequestData(remote)
}
// do not proceed any further
if (!isOK) {
CORE_LOGGER.warn(
`Request denied (rate limit exceeded) for remote caller ${remote}. Current request map: ${JSON.stringify(
this.requestMap.get(remote)
requestMap.get(remote)
)}`
)
return false
}
}
} else {
if (!this.requestMap.has(caller)) {
if (!requestMap.has(caller)) {
data = {
requester: caller,
lastRequestTime: requestTime,
numRequests: 1
}
this.requestMap.set(caller, data)
requestMap.set(caller, data)
return true
} else {
updateRequestData(caller)
// log if request was denied
if (!isOK) {
CORE_LOGGER.warn(
`Request denied (rate limit exceeded) for remote caller ${caller}. Current request map: ${JSON.stringify(
this.requestMap.get(caller)
requestMap.get(caller)
)}`
)
}
Expand All @@ -105,18 +101,19 @@ export abstract class Handler implements ICommandHandler {
/**
* Checks if the request is within the rate limit defined
* @param remote remote endpoint (ip or peer identifier)
* @param ratePerSecond number of calls per second allowed
* @param ratePerMinute number of calls per minute allowed (per ip or peer identifier)
* @returns updated request data
*/
checkRequestData(
remote: string,
currentTime: number,
ratePerSecond: number
ratePerMinute: number
): RequestDataCheck {
const requestData: RequestLimiter = this.requestMap.get(remote)
const diffSeconds = (currentTime - requestData.lastRequestTime) / 1000
// more than 1 sec difference means no problem
if (diffSeconds >= 1) {
const requestMap = this.getOceanNode().getRequestMap()
const requestData: RequestLimiter = requestMap.get(remote)
const diffMinutes = ((currentTime - requestData.lastRequestTime) / 1000) * 60
// more than 1 minute difference means no problem
if (diffMinutes >= 1) {
// its fine
requestData.lastRequestTime = currentTime
requestData.numRequests = 1
Expand All @@ -128,7 +125,7 @@ export abstract class Handler implements ICommandHandler {
// requests in the same interval of 1 second
requestData.numRequests++
return {
valid: requestData.numRequests <= ratePerSecond,
valid: requestData.numRequests <= ratePerMinute,
updatedRequestData: requestData
}
}
Expand Down
Loading
Loading