diff --git a/src/internal/concurrency/index.ts b/src/internal/concurrency/index.ts index c2cbc510..86792b45 100644 --- a/src/internal/concurrency/index.ts +++ b/src/internal/concurrency/index.ts @@ -1,3 +1,2 @@ export * from './mutex' -export * from './stream' export * from './async-abort-controller' diff --git a/src/internal/monitoring/otel.ts b/src/internal/monitoring/otel.ts index e801808a..37d1ac91 100644 --- a/src/internal/monitoring/otel.ts +++ b/src/internal/monitoring/otel.ts @@ -34,6 +34,7 @@ import { S3Store } from '@tus/s3-store' import { Upload } from '@aws-sdk/lib-storage' import { StreamSplitter } from '@tus/server' import { PgLock } from '@storage/protocols/tus' +import { Semaphore, Permit } from '@shopify/semaphore' const tracingEnabled = process.env.TRACING_ENABLED === 'true' const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || '' @@ -265,6 +266,16 @@ const sdk = new NodeSDK({ enabled: true, methodsToInstrument: ['lock', 'unlock', 'acquireLock'], }), + new ClassInstrumentation({ + targetClass: Semaphore, + enabled: true, + methodsToInstrument: ['acquire'], + }), + new ClassInstrumentation({ + targetClass: Permit, + enabled: true, + methodsToInstrument: ['release'], + }), new ClassInstrumentation({ targetClass: S3Client, enabled: true, diff --git a/src/internal/concurrency/stream.ts b/src/internal/streams/byte-counter.ts similarity index 85% rename from src/internal/concurrency/stream.ts rename to src/internal/streams/byte-counter.ts index d3551736..0200c364 100644 --- a/src/internal/concurrency/stream.ts +++ b/src/internal/streams/byte-counter.ts @@ -1,4 +1,4 @@ -import { Transform, TransformCallback } from 'stream' +import { Transform, TransformCallback } from 'node:stream' export const createByteCounterStream = () => { let bytes = 0 diff --git a/src/internal/streams/index.ts b/src/internal/streams/index.ts new file mode 100644 index 00000000..4fc27fe1 --- /dev/null +++ b/src/internal/streams/index.ts @@ -0,0 +1,3 @@ +export * from './stream-speed' +export * from './byte-counter' +export * from './monitor' diff --git a/src/internal/streams/monitor.ts b/src/internal/streams/monitor.ts new file mode 100644 index 00000000..e6088499 --- /dev/null +++ b/src/internal/streams/monitor.ts @@ -0,0 +1,42 @@ +import { createByteCounterStream } from './byte-counter' +import { monitorStreamSpeed } from './stream-speed' +import { trace } from '@opentelemetry/api' +import { Readable } from 'node:stream' + +/** + * Monitor readable streams by tracking their speed and bytes read + * @param dataStream + */ +export function monitorStream(dataStream: Readable) { + const speedMonitor = monitorStreamSpeed(dataStream) + const byteCounter = createByteCounterStream() + + let measures: number[] = [] + + // Handle the 'speed' event to collect speed measurements + speedMonitor.on('speed', (bps) => { + measures.push(bps) + const span = trace.getActiveSpan() + span?.setAttributes({ 'stream.speed': measures, bytesRead: byteCounter.bytes }) + }) + + speedMonitor.on('close', () => { + measures = [] + const span = trace.getActiveSpan() + span?.setAttributes({ uploadRead: byteCounter.bytes }) + }) + + // Handle errors by cleaning up and destroying the downstream stream + speedMonitor.on('error', (err) => { + // Destroy the byte counter stream with the error + byteCounter.transformStream.destroy(err) + }) + + // Ensure the byteCounter stream ends when speedMonitor ends + speedMonitor.on('end', () => { + byteCounter.transformStream.end() + }) + + // Return the piped stream + return speedMonitor.pipe(byteCounter.transformStream) +} diff --git a/src/internal/streams/stream-speed.ts b/src/internal/streams/stream-speed.ts new file mode 100644 index 00000000..19df4dc0 --- /dev/null +++ b/src/internal/streams/stream-speed.ts @@ -0,0 +1,51 @@ +import { Readable } from 'stream' +import { PassThrough } from 'node:stream' + +/** + * Keep track of a stream's speed + * @param stream + * @param frequency + */ +/** + * Keep track of a stream's speed + * @param stream + * @param frequency + */ +export function monitorStreamSpeed(stream: Readable, frequency = 1000) { + let totalBytes = 0 + const startTime = Date.now() + + const passThrough = new PassThrough() + + const interval = setInterval(() => { + const currentTime = Date.now() + const elapsedTime = (currentTime - startTime) / 1000 + const currentSpeedBytesPerSecond = totalBytes / elapsedTime + + passThrough.emit('speed', currentSpeedBytesPerSecond) + }, frequency) + + passThrough.on('data', (chunk) => { + totalBytes += chunk.length + }) + + const cleanup = () => { + clearInterval(interval) + passThrough.removeAllListeners('speed') + } + + // Handle close event to ensure cleanup + passThrough.on('close', cleanup) + + // Propagate errors from the source stream to the passThrough + stream.on('error', (err) => { + passThrough.destroy(err) + }) + + // Ensure the passThrough ends when the source stream ends + stream.on('end', () => { + passThrough.end() + }) + + return stream.pipe(passThrough) +} diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index 681048f7..3e61573c 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -27,27 +27,12 @@ import { import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { ERRORS, StorageBackendError } from '@internal/errors' import { getConfig } from '../../config' -import { addAbortSignal, PassThrough, Readable } from 'node:stream' -import { trace } from '@opentelemetry/api' -import { createByteCounterStream } from '@internal/concurrency' -import { AgentStats, createAgent, gatherHttpAgentStats, InstrumentedAgent } from '@internal/http' +import { Readable } from 'node:stream' +import { createAgent, InstrumentedAgent } from '@internal/http' +import { monitorStream } from '@internal/streams' const { tracingFeatures, storageS3MaxSockets, tracingEnabled } = getConfig() -interface StreamStatus { - time: Date - bytesUploaded: number - progress: Progress[] - dataStream: { - closed: boolean - paused: boolean - errored: boolean - writable: boolean - byteRead: number - } - httpAgentStats: AgentStats -} - export interface S3ClientOptions { endpoint?: string region?: string @@ -154,22 +139,19 @@ export class S3Backend implements StorageBackendAdapter { throw ERRORS.Aborted('Upload was aborted') } - const streamWatcher = tracingFeatures?.upload ? this.watchUploadStream(body, signal) : undefined - const uploadStream = streamWatcher ? streamWatcher.dataStream : body + const dataStream = tracingFeatures?.upload ? monitorStream(body) : body const upload = new Upload({ client: this.client, params: { Bucket: bucketName, Key: withOptionalVersion(key, version), - Body: uploadStream, + Body: dataStream, ContentType: contentType, CacheControl: cacheControl, }, }) - streamWatcher?.watchUpload(upload) - signal?.addEventListener( 'abort', () => { @@ -178,6 +160,12 @@ export class S3Backend implements StorageBackendAdapter { { once: true } ) + if (tracingFeatures?.upload) { + upload.on('httpUploadProgress', (progress: Progress) => { + dataStream.emit('s3_progress', JSON.stringify(progress)) + }) + } + try { const data = await upload.done() const metadata = await this.headObject(bucketName, key, version) @@ -194,26 +182,9 @@ export class S3Backend implements StorageBackendAdapter { } } catch (err) { if (err instanceof Error && err.name === 'AbortError') { - const span = trace.getActiveSpan() - if (span) { - // Print how far we got uploading the file - const lastSeenStatus = streamWatcher?.lastSeenStreamStatus - const lastStreamStatus = streamWatcher?.getStreamStatus() - - if (lastSeenStatus && lastStreamStatus) { - const { progress, ...lastSeenStream } = lastSeenStatus - span.setAttributes({ - lastStreamStatus: JSON.stringify(lastStreamStatus), - lastSeenStatus: JSON.stringify(lastSeenStream), - }) - } - } - throw ERRORS.AbortedTerminate('Upload was aborted', err) } throw StorageBackendError.fromError(err) - } finally { - streamWatcher?.stop() } } @@ -493,92 +464,6 @@ export class S3Backend implements StorageBackendAdapter { this.agent.close() } - protected watchUploadStream(body: Readable, signal?: AbortSignal) { - const passThrough = new PassThrough() - - if (signal) { - addAbortSignal(signal, passThrough) - } - - passThrough.on('error', () => { - body.unpipe(passThrough) - }) - - body.on('error', (err) => { - if (!passThrough.closed) { - passThrough.destroy(err) - } - }) - - const byteReader = createByteCounterStream() - const bodyStream = body.pipe(passThrough) - - // Upload stats - const uploadProgress: Progress[] = [] - const getStreamStatus = (): StreamStatus => ({ - time: new Date(), - bytesUploaded: uploadProgress[uploadProgress.length - 1]?.loaded || 0, - dataStream: { - closed: bodyStream.closed, - paused: bodyStream.isPaused(), - errored: Boolean(bodyStream.errored), - writable: bodyStream.writable, - byteRead: byteReader.bytes, - }, - httpAgentStats: gatherHttpAgentStats(this.agent.httpsAgent.getCurrentStatus()), - progress: uploadProgress, - }) - - let streamStatus = getStreamStatus() - - const streamWatcher = setInterval(() => { - streamStatus = getStreamStatus() - }, 1000) - - const dataStream = passThrough.pipe(byteReader.transformStream) - - body.on('error', (err) => { - passThrough.destroy(err) - }) - - passThrough.on('error', (err) => { - body.destroy(err) - }) - - passThrough.on('close', () => { - body.unpipe(passThrough) - }) - - function watchUpload(upload: Upload) { - upload.on('httpUploadProgress', (progress) => { - uploadProgress.push({ - total: progress.total, - part: progress.part, - loaded: progress.loaded, - }) - if (uploadProgress.length > 100) { - uploadProgress.shift() - } - }) - } - - return { - dataStream, - byteReader, - get uploadProgress() { - return uploadProgress - }, - get lastSeenStreamStatus() { - return streamStatus - }, - getStreamStatus, - stop() { - clearInterval(streamWatcher) - }, - watchUpload, - } - } - protected createS3Client(options: S3ClientOptions & { name: string }) { const params: S3ClientConfig = { region: options.region,