Skip to content

Commit

Permalink
comms arraybuffer from scene (#5822)
Browse files Browse the repository at this point in the history
* wip comms arraybuffer from scene

* add sender to binary comms message

* add msgType to avoid sending uint8array messages to old MessageBus (string)

* remove lgos

* update versions to next

* fix versions
  • Loading branch information
gonpombo8 authored Dec 7, 2023
1 parent 1c973e2 commit 85907e6
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 57 deletions.
28 changes: 14 additions & 14 deletions browser-interface/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions browser-interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@
"@dcl/hashing": "^1.1.3",
"@dcl/kernel-interface": "^2.0.0-20230512115658.commit-b582e05",
"@dcl/legacy-ecs": "^6.11.11",
"@dcl/protocol": "^1.0.0-6314457636.commit-a9a962a",
"@dcl/protocol": "1.0.0-7117237552.commit-82dc93b",
"@dcl/rpc": "^1.1.1",
"@dcl/scene-runtime": "7.0.6-20230915161611.commit-59578a0",
"@dcl/scene-runtime": "7.0.6-20231206162622.commit-9ff48a9",
"@dcl/schemas": "^9.1.1",
"@dcl/single-sign-on-client": "^0.1.0",
"@dcl/urn-resolver": "^2.2.0",
Expand Down
73 changes: 40 additions & 33 deletions browser-interface/packages/shared/apis/host/CommsAPI.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import type { RpcServerPort } from '@dcl/rpc'
import * as codegen from '@dcl/rpc/dist/codegen'
import type { PortContext } from './context'
import { VideoTracksActiveStreamsRequest, VideoTracksActiveStreamsData, CommsApiServiceDefinition, VideoTrackSourceType } from 'shared/protocol/decentraland/kernel/apis/comms_api.gen'
import {
VideoTracksActiveStreamsRequest,
VideoTracksActiveStreamsData,
CommsApiServiceDefinition,
VideoTrackSourceType
} from 'shared/protocol/decentraland/kernel/apis/comms_api.gen'
import { isWorldLoaderActive } from 'shared/realm/selectors'
import { store } from 'shared/store/isolatedStore'
import { getLivekitActiveVideoStreams } from 'shared/comms/selectors'
Expand All @@ -10,41 +15,43 @@ import { ActiveVideoStreams } from 'shared/comms/adapters/types'
import { Track } from 'livekit-client'

export function registerCommsApiServiceServerImplementation(port: RpcServerPort<PortContext>) {
codegen.registerService(port, CommsApiServiceDefinition, async () => ({
async getActiveVideoStreams(req: VideoTracksActiveStreamsRequest, ctx: PortContext) {
codegen.registerService(port, CommsApiServiceDefinition, async () => ({
async getActiveVideoStreams(req: VideoTracksActiveStreamsRequest, ctx: PortContext) {
const realmAdapter = await ensureRealmAdapter()
const isWorld = isWorldLoaderActive(realmAdapter)
if (!isWorld) {
ctx.logger.error('API only available for Worlds')
return { streams: [] }
}

const realmAdapter = await ensureRealmAdapter()
const isWorld = isWorldLoaderActive(realmAdapter)
if (!isWorld) {
ctx.logger.error('API only available for Worlds')
return { streams: [] }
}

const activeVideoStreams: Map<string, ActiveVideoStreams> | undefined = getLivekitActiveVideoStreams(store.getState())
const activeVideoStreams: Map<string, ActiveVideoStreams> | undefined = getLivekitActiveVideoStreams(
store.getState()
)

if (!activeVideoStreams)
return { streams: [] }
if (!activeVideoStreams) return { streams: [] }

let streams: VideoTracksActiveStreamsData[] = []
const streams: VideoTracksActiveStreamsData[] = []

for (const [sid, videoStreamData] of activeVideoStreams) {
if (videoStreamData.videoTracks.size > 0) {
for (const [videoSid, trackData] of videoStreamData.videoTracks) {
if (!!trackData.source) {
streams.push({
identity: videoStreamData.identity,
trackSid: `livekit-video://${sid}/${videoSid}`,
sourceType: trackData.source === Track.Source.Camera ? VideoTrackSourceType.VTST_CAMERA
: trackData.source === Track.Source.ScreenShare ? VideoTrackSourceType.VTST_SCREEN_SHARE
: VideoTrackSourceType.VTST_UNKNOWN
})
}
}
}
for (const [sid, videoStreamData] of activeVideoStreams) {
if (videoStreamData.videoTracks.size > 0) {
for (const [videoSid, trackData] of videoStreamData.videoTracks) {
if (!!trackData.source) {
streams.push({
identity: videoStreamData.identity,
trackSid: `livekit-video://${sid}/${videoSid}`,
sourceType:
trackData.source === Track.Source.Camera
? VideoTrackSourceType.VTST_CAMERA
: trackData.source === Track.Source.ScreenShare
? VideoTrackSourceType.VTST_SCREEN_SHARE
: VideoTrackSourceType.VTST_UNKNOWN
})
}

return { streams }
}
}
})
)
}
}

return { streams }
}
}))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@ import { sendParcelSceneCommsMessage } from 'shared/comms'
import type { PortContext } from './context'
import { CommunicationsControllerServiceDefinition } from 'shared/protocol/decentraland/kernel/apis/communications_controller.gen'

/**
* MsgType utils to diff between old string messages, and new uint8Array messages.
*/
enum MsgType {
String = 1,
Uint8Array = 2
}

function decodeMessage(value: Uint8Array): [MsgType, Uint8Array] {
const msgType = value.at(0) as MsgType
const data = value.subarray(1)
return [msgType, data]
}

function encodeMessage(data: Uint8Array, type: MsgType) {
const message = new Uint8Array(data.byteLength + 1)
message.set([type])
message.set(data, 1)
return message
}

/**
* The CommunicationsControllerService connects messages from the comms controller with the scenes of Decentraland,
* particularly the `AvatarScene` that hosts the avatars of other players.
Expand All @@ -20,17 +41,35 @@ import { CommunicationsControllerServiceDefinition } from 'shared/protocol/decen
*/
export function registerCommunicationsControllerServiceServerImplementation(port: RpcServerPort<PortContext>) {
codegen.registerService(port, CommunicationsControllerServiceDefinition, async (port, ctx) => {
const eventsToProcess: Uint8Array[] = []
const textEncoder = new TextEncoder()
const textDecoder = new TextDecoder()
/**
* The `receiveCommsMessage` relays messages in direction: scene -> comms
*/
const commsController: ICommunicationsController = {
cid: ctx.sceneData.id,
receiveCommsMessage(data: Uint8Array, sender: PeerInformation) {
const message = new TextDecoder().decode(data)
ctx.sendSceneEvent('comms', {
message,
sender: sender.ethereumAddress
})
receiveCommsMessage(preData: Uint8Array, sender: PeerInformation) {
const [msgType, data] = decodeMessage(preData)
if (msgType === MsgType.String) {
const message = textDecoder.decode(data)

ctx.sendSceneEvent('comms', {
message,
sender: sender.ethereumAddress
})
} else if (msgType === MsgType.Uint8Array) {
if (!data.byteLength) return
const senderBytes = textEncoder.encode(sender.ethereumAddress)
const messageLength = senderBytes.byteLength + data.byteLength + 1

const serializedMessage = new Uint8Array(messageLength)
serializedMessage.set(new Uint8Array([senderBytes.byteLength]), 0)
serializedMessage.set(senderBytes, 1)
serializedMessage.set(data, senderBytes.byteLength + 1)

eventsToProcess.push(serializedMessage)
}
}
}

Expand All @@ -45,9 +84,21 @@ export function registerCommunicationsControllerServiceServerImplementation(port

return {
async send(req, ctx) {
const message = new TextEncoder().encode(req.message)
sendParcelSceneCommsMessage(ctx.sceneData.id, message)
const message = textEncoder.encode(req.message)
sendParcelSceneCommsMessage(ctx.sceneData.id, encodeMessage(message, MsgType.String))
return {}
},
async sendBinary(req, ctx) {
// Send messages
for (const data of req.data) {
sendParcelSceneCommsMessage(ctx.sceneData.id, encodeMessage(data, MsgType.Uint8Array))
}

// Process received messages
const messages = [...eventsToProcess]
// clean messages
eventsToProcess.length = 0
return { data: messages }
}
}
})
Expand Down

0 comments on commit 85907e6

Please sign in to comment.