From ead772002e4e711baf5cd6955120cb615874a857 Mon Sep 17 00:00:00 2001 From: Chen Mingliang Date: Tue, 3 Dec 2024 16:10:36 +0800 Subject: [PATCH] Implement rtmp play --- README.md | 3 +- src/protocol/flv.js | 25 +++++++--------- src/protocol/rtmp.js | 53 ++++++++++++++++++++++++++-------- src/server/broadcast_server.js | 28 +++++++++++++++++- 4 files changed, 80 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index fa2a7c6f..272da582 100644 --- a/README.md +++ b/README.md @@ -19,10 +19,9 @@ npx node-media-server ## Features * HTTP/HTTP2-flv Push/Play -* RTMP Push +* RTMP Push/Play ## Roadmap -* RTMP Play * HTTP-API * Authentication * Notification diff --git a/src/protocol/flv.js b/src/protocol/flv.js index b7dd2ffb..b378c296 100644 --- a/src/protocol/flv.js +++ b/src/protocol/flv.js @@ -190,22 +190,19 @@ export default class Flv { }; /** - * @param {number} type - * @param {number} time - * @param {number} size - * @param {Buffer} data + * @param {AVPacket} avpacket * @returns {Buffer} */ - static createMessage = (type, time, size, data) => { - const buffer = Buffer.alloc(11 + size + 4); - buffer[0] = type; - buffer.writeUintBE(size, 1, 3); - buffer[4] = (time >> 16) & 0xFF; - buffer[5] = (time >> 8) & 0xFF; - buffer[6] = time & 0xFF; - buffer[7] = (time >> 24) & 0xFF; - data.copy(buffer, 11, 0, size); - buffer.writeUint32BE(11 + size, 11 + size); + static createMessage = (avpacket) => { + const buffer = Buffer.alloc(11 + avpacket.size + 4); + buffer[0] = avpacket.codec_type; + buffer.writeUintBE(avpacket.size, 1, 3); + buffer[4] = (avpacket.dts >> 16) & 0xFF; + buffer[5] = (avpacket.dts >> 8) & 0xFF; + buffer[6] = avpacket.dts & 0xFF; + buffer[7] = (avpacket.dts >> 24) & 0xFF; + avpacket.data.copy(buffer, 11, 0, avpacket.size); + buffer.writeUint32BE(11 + avpacket.size, 11 + avpacket.size); return buffer; }; diff --git a/src/protocol/rtmp.js b/src/protocol/rtmp.js index f94e098d..551e7058 100644 --- a/src/protocol/rtmp.js +++ b/src/protocol/rtmp.js @@ -231,7 +231,6 @@ class RtmpPacket { stream_id: 0 }; this.clock = 0; - /**@type {Buffer} */ this.payload = Buffer.alloc(0); this.capacity = 0; this.bytes = 0; @@ -350,8 +349,33 @@ export default class Rtmp { return null; }; + /** + * @param {AVPacket} avpacket + * @returns {Buffer} + */ + static createMessage = (avpacket) => { + let rtmpPacket = new RtmpPacket(); + rtmpPacket.header.fmt = MESSAGE_FORMAT_0; + switch (avpacket.codec_type) { + case 8: + rtmpPacket.header.cid = RTMP_CHANNEL_AUDIO; + break; + case 9: + rtmpPacket.header.cid = RTMP_CHANNEL_VIDEO; + break; + case 18: + rtmpPacket.header.cid = RTMP_CHANNEL_DATA; + break; + } + rtmpPacket.header.length = avpacket.size; + rtmpPacket.header.type = avpacket.codec_type; + rtmpPacket.header.timestamp = avpacket.dts; + rtmpPacket.clock = avpacket.dts; + rtmpPacket.payload = avpacket.data; + return Rtmp.chunksCreate(rtmpPacket); + }; - chunkBasicHeaderCreate = (fmt, cid) => { + static chunkBasicHeaderCreate = (fmt, cid) => { let out; if (cid >= 64 + 255) { out = Buffer.alloc(3); @@ -369,7 +393,7 @@ export default class Rtmp { return out; }; - chunkMessageHeaderCreate = (header) => { + static chunkMessageHeaderCreate = (header) => { let out = Buffer.alloc(rtmpHeaderSize[header.fmt % 4]); if (header.fmt <= RTMP_CHUNK_TYPE_2) { out.writeUIntBE(header.timestamp >= 0xffffff ? 0xffffff : header.timestamp, 0, 3); @@ -386,16 +410,21 @@ export default class Rtmp { return out; }; - chunksCreate = (packet) => { + /** + * + * @param {RtmpPacket} packet + * @returns {Buffer} + */ + static chunksCreate = (packet) => { let header = packet.header; let payload = packet.payload; let payloadSize = header.length; - let chunkSize = this.outChunkSize; + let chunkSize = RTMP_MAX_CHUNK_SIZE; let chunksOffset = 0; let payloadOffset = 0; - let chunkBasicHeader = this.chunkBasicHeaderCreate(header.fmt, header.cid); - let chunkBasicHeader3 = this.chunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid); - let chunkMessageHeader = this.chunkMessageHeaderCreate(header); + let chunkBasicHeader = Rtmp.chunkBasicHeaderCreate(header.fmt, header.cid); + let chunkBasicHeader3 = Rtmp.chunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid); + let chunkMessageHeader = Rtmp.chunkMessageHeaderCreate(header); let useExtendedTimestamp = header.timestamp >= 0xffffff; let headerSize = chunkBasicHeader.length + chunkMessageHeader.length + (useExtendedTimestamp ? 4 : 0); let n = headerSize + payloadSize + Math.floor(payloadSize / chunkSize); @@ -686,18 +715,18 @@ export default class Rtmp { } this.streamName = invokeMessage.streamName.split("?")[0]; this.streamId = this.parserPacket.header.stream_id; + this.respondPublish(); this.onConnectCallback(this.streamApp, this.streamName); this.onPushCallback(); - this.respondPublish(); }; onPlay = (invokeMessage) => { this.streamName = invokeMessage.streamName.split("?")[0]; this.streamPath = "/" + this.streamApp + "/" + this.streamName; this.streamId = this.parserPacket.header.stream_id; + this.respondPlay(); this.onConnectCallback(this.streamApp, this.streamName); this.onPlayCallback(); - this.respondPlay(); }; onDeleteStream = (invokeMessage) => { @@ -744,7 +773,7 @@ export default class Rtmp { packet.header.stream_id = sid; packet.payload = AMF.encodeAmf0Cmd(opt); packet.header.length = packet.payload.length; - let chunks = this.chunksCreate(packet); + let chunks = Rtmp.chunksCreate(packet); this.onOutputCallback(chunks); }; @@ -756,7 +785,7 @@ export default class Rtmp { packet.payload = AMF.encodeAmf0Data(opt); packet.header.length = packet.payload.length; packet.header.stream_id = sid; - let chunks = this.chunksCreate(packet); + let chunks = Rtmp.chunksCreate(packet); this.onOutputCallback(chunks); } diff --git a/src/server/broadcast_server.js b/src/server/broadcast_server.js index 08fd5169..ff6b6f68 100644 --- a/src/server/broadcast_server.js +++ b/src/server/broadcast_server.js @@ -7,6 +7,7 @@ import AVPacket from "../core/avpacket.js"; import Flv from "../protocol/flv.js"; +import Rtmp from "../protocol/rtmp.js"; import BaseSession from "../session/base_session.js"; export default class BroadcastServer { @@ -28,6 +29,15 @@ export default class BroadcastServer { /** @type {Buffer | null} */ this.flvVideoHeader = null; + + /** @type {Buffer | null} */ + this.rtmpMetaData = null; + + /** @type {Buffer | null} */ + this.rtmpAudioHeader = null; + + /** @type {Buffer | null} */ + this.rtmpVideoHeader = null; } /** @@ -47,6 +57,16 @@ export default class BroadcastServer { session.sendBuffer(this.flvVideoHeader); } break; + case "rtmp": + if (this.rtmpMetaData != null) { + session.sendBuffer(this.rtmpMetaData); + } + if (this.rtmpAudioHeader != null) { + session.sendBuffer(this.rtmpAudioHeader); + } + if (this.rtmpVideoHeader != null) { + session.sendBuffer(this.rtmpVideoHeader); + } } this.subscribers.set(session.id, session); @@ -88,16 +108,20 @@ export default class BroadcastServer { * @param {AVPacket} packet */ broadcastMessage = (packet) => { - const flvMessage = Flv.createMessage(packet.codec_type, packet.dts, packet.size, packet.data); + const flvMessage = Flv.createMessage(packet); + const rtmpMessage = Rtmp.createMessage(packet); switch (packet.flags) { case 0: this.flvAudioHeader = Buffer.from(flvMessage); + this.rtmpAudioHeader = Buffer.from(rtmpMessage); break; case 2: this.flvVideoHeader = Buffer.from(flvMessage); + this.rtmpVideoHeader = Buffer.from(rtmpMessage); break; case 5: this.flvMetaData = Buffer.from(flvMessage); + this.rtmpMetaData = Buffer.from(rtmpMessage); break; } @@ -106,6 +130,8 @@ export default class BroadcastServer { case "flv": v.sendBuffer(flvMessage); break; + case "rtmp": + v.sendBuffer(rtmpMessage); } }); };