Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add findPeer & dht refactor #793

Merged
merged 6 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,6 @@ jobs:
- run: docker image ls
- name: Delete default runner images
run: |
docker image rm node:20
docker image rm node:20-alpine
docker image rm node:18
docker image rm node:18-alpine
docker image rm debian:10
docker image rm debian:11
docker image rm ubuntu:22.04
docker image rm ubuntu:20.04
docker image rm moby/buildkit:latest
rm -rf /usr/share/swift/
- name: Wait for contracts deployment and C2D cluster to be ready
working-directory: ${{ github.workspace }}/barge
Expand Down Expand Up @@ -226,15 +217,6 @@ jobs:
- run: docker image ls
- name: Delete default runner images
run: |
docker image rm node:20
docker image rm node:20-alpine
docker image rm node:18
docker image rm node:18-alpine
docker image rm debian:10
docker image rm debian:11
docker image rm ubuntu:22.04
docker image rm ubuntu:20.04
docker image rm moby/buildkit:latest
rm -rf /usr/share/swift/

- name: Wait for contracts deployment and C2D cluster to be ready
Expand Down
32 changes: 32 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,38 @@ returns P2P peer

---

## find peer multiaddress

### `HTTP` GET /findPeer/?

#### Description

returns P2P peer multiaddresses if found in DHT

#### Query Parameters

| name | type | required | description |
| ------- | ------ | -------- | ----------- |
| peerId | string | v | peer id |
| timeout | int | optional | timeout |

#### Response

```
{
"id": "16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP",
"multiaddrs": [
"/ip4/127.0.0.1/tcp/9000",
"/ip4/127.0.0.1/tcp/9001/ws",
"/ip4/172.18.0.2/tcp/9000",
"/ip4/172.18.0.2/tcp/9001/ws",
"/ip6/::1/tcp/9002"
]
}
```

---

## Get P2P Peers

### `HTTP` GET /getP2PPeers
Expand Down
2 changes: 1 addition & 1 deletion docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
- `P2P_pubsubPeerDiscoveryInterval`: Interval (in ms) for discovery using pubsub. Defaults to `10000` (three seconds). Example: `10000`
- `P2P_dhtMaxInboundStreams`: Maximum number of DHT inbound streams. Defaults to `500`. Example: `500`
- `P2P_dhtMaxOutboundStreams`: Maximum number of DHT outbound streams. Defaults to `500`. Example: `500`
- `P2P_ENABLE_DHT_SERVER`: Enable DHT server mode. This should be enabled for bootstrapers & well established nodes. Default: `false`
- `P2P_DHT_FILTER`: Filter address in DHT. 0 = (Default) No filter 1. Filter private ddresses. 2. Filter public addresses
- `P2P_mDNSInterval`: Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds). Example: `20000`
- `P2P_connectionsMaxParallelDials`: Maximum number of parallel dials. Defaults to `150`. Example: `150`
- `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds). Example: `10000`
Expand Down
9 changes: 7 additions & 2 deletions src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ export interface OceanNodeKeys {
privateKey: any
ethAddress: string
}

/* eslint-disable no-unused-vars */
export enum dhtFilterMethod {
filterPrivate = 'filterPrivate', // default, remove all private addresses from DHT
filterPublic = 'filterPublic', // remove all public addresses from DHT
filterNone = 'filterNone' // do not remove all any addresses from DHT
}
export interface OceanNodeP2PConfig {
bootstrapNodes: string[]
bootstrapTimeout: number
Expand All @@ -41,7 +46,7 @@ export interface OceanNodeP2PConfig {
pubsubPeerDiscoveryInterval: number
dhtMaxInboundStreams: number
dhtMaxOutboundStreams: number
enableDHTServer: boolean
dhtFilter: dhtFilterMethod
mDNSInterval: number
connectionsMaxParallelDials: number
connectionsDialTimeout: number
Expand Down
69 changes: 42 additions & 27 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,22 @@ import { autoNAT } from '@libp2p/autonat'
import { uPnPNAT } from '@libp2p/upnp-nat'
import { ping } from '@libp2p/ping'
import { dcutr } from '@libp2p/dcutr'
import { kadDHT, passthroughMapper } from '@libp2p/kad-dht'
import {
kadDHT,
passthroughMapper,
removePrivateAddressesMapper,
removePublicAddressesMapper
} from '@libp2p/kad-dht'
// import { gossipsub } from '@chainsafe/libp2p-gossipsub'

import { EVENTS, cidFromRawString } from '../../utils/index.js'
import { Transform } from 'stream'
import { Database } from '../database'
import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode'
import {
OceanNodeConfig,
FindDDOResponse,
dhtFilterMethod
} from '../../@types/OceanNode.js'
// eslint-disable-next-line camelcase
import is_ip_private from 'private-ip'
import ip from 'ip'
Expand Down Expand Up @@ -275,8 +284,22 @@ export class OceanP2P extends EventEmitter {
multiaddrs.filter((m) => this.shouldAnnounce(m))
}
}
const dhtOptions = {
allowQueryWithZeroPeers: false,
maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams,
maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams,
clientMode: false, // always be a server
kBucketSize: 20,
protocol: '/ocean/nodes/1.0.0/kad/1.0.0',
peerInfoMapper: passthroughMapper // see below
}
if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPrivate)
dhtOptions.peerInfoMapper = removePrivateAddressesMapper
if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPublic)
dhtOptions.peerInfoMapper = removePublicAddressesMapper
let servicesConfig = {
identify: identify(),
dht: kadDHT(dhtOptions),
identifyPush: identifyPush(),
/*
pubsub: gossipsub({
Expand All @@ -292,27 +315,10 @@ export class OceanP2P extends EventEmitter {
// enabled: true
allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol']
}), */
dht: kadDHT({
// this is necessary because this node is not connected to the public network
// it can be removed if, for example bootstrappers are configured
allowQueryWithZeroPeers: true,
maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams,
maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams,

clientMode: false,
kBucketSize: 20,
protocol: '/ocean/nodes/1.0.0/kad/1.0.0',
peerInfoMapper: passthroughMapper
// protocolPrefix: '/ocean/nodes/1.0.0'
// randomWalk: {
// enabled: true, // Allows to disable discovery (enabled by default)
// interval: 300e3,
// timeout: 10e3
// }
}),
ping: ping(),
dcutr: dcutr()
}

// eslint-disable-next-line no-constant-condition, no-self-compare
if (config.p2pConfig.enableCircuitRelayServer) {
P2P_LOGGER.info('Enabling Circuit Relay Server')
Expand Down Expand Up @@ -420,13 +426,6 @@ export class OceanP2P extends EventEmitter {
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
}

if (config.p2pConfig.enableDHTServer) {
try {
await node.services.dht.setMode('server')
} catch (e) {
P2P_LOGGER.warn(`Failed to set mode server for DHT`)
}
}
return node
} catch (e) {
P2P_LOGGER.logMessageWithEmoji(
Expand Down Expand Up @@ -598,6 +597,22 @@ export class OceanP2P extends EventEmitter {
return finalmultiaddrs
}

async findPeerInDht(peerName: string, timeout?: number) {
try {
const peer = peerIdFromString(peerName)
const data = await this._libp2p.peerRouting.findPeer(peer, {
signal:
isNaN(timeout) || timeout === 0
? AbortSignal.timeout(5000)
: AbortSignal.timeout(timeout),
useCache: true,
useNetwork: true
})
return data
} catch (e) {}
return null
}

async sendTo(
peerName: string,
message: string,
Expand Down
20 changes: 20 additions & 0 deletions src/components/httpRoutes/getOceanPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ getOceanPeersRoute.get(
}
}
)

getOceanPeersRoute.get(
'/findPeer',
express.urlencoded({ extended: true }),
async (req: Request, res: Response): Promise<void> => {
if (!req.query.peerId) {
res.sendStatus(400)
return
}
if (hasP2PInterface) {
const peers = await req.oceanNode
.getP2PNode()
.findPeerInDht(String(req.query.peerId), parseInt(String(req.query.timeout)))
if (peers) res.json(peers)
else res.sendStatus(404).send('Cannot find peer')
} else {
sendMissingP2PResponse(res)
}
}
)
getOceanPeersRoute.get(
'/getOceanPeers',
async (req: Request, res: Response): Promise<void> => {
Expand Down
15 changes: 14 additions & 1 deletion src/utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
OceanNodeKeys,
OceanNodeDockerConfig
} from '../@types/OceanNode'
import { dhtFilterMethod } from '../@types/OceanNode.js'
import type { C2DClusterInfo } from '../@types/C2D.js'
import { C2DClusterType } from '../@types/C2D.js'
import { createFromPrivKey } from '@libp2p/peer-id-factory'
Expand Down Expand Up @@ -548,6 +549,18 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> {
const interfaces = getNodeInterfaces(isStartup)
let bootstrapTtl = getIntEnvValue(process.env.P2P_BOOTSTRAP_TTL, 120000)
if (bootstrapTtl === 0) bootstrapTtl = Infinity
let dhtFilterOption
switch (getIntEnvValue(process.env.P2P_DHT_FILTER, 0)) {
case 1:
dhtFilterOption = dhtFilterMethod.filterPrivate
break
case 2:
dhtFilterOption = dhtFilterMethod.filterPublic
break
default:
dhtFilterOption = dhtFilterMethod.filterNone
}

const config: OceanNodeConfig = {
authorizedDecrypters: getAuthorizedDecrypters(isStartup),
allowedValidators: getAllowedValidators(isStartup),
Expand Down Expand Up @@ -584,7 +597,7 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> {
),
dhtMaxInboundStreams: getIntEnvValue(process.env.P2P_dhtMaxInboundStreams, 500),
dhtMaxOutboundStreams: getIntEnvValue(process.env.P2P_dhtMaxOutboundStreams, 500),
enableDHTServer: getBoolEnvValue('P2P_ENABLE_DHT_SERVER', false),
dhtFilter: dhtFilterOption,
mDNSInterval: getIntEnvValue(process.env.P2P_mDNSInterval, 20e3), // 20 seconds
connectionsMaxParallelDials: getIntEnvValue(
process.env.P2P_connectionsMaxParallelDials,
Expand Down
Loading