From c7520581489123e4105f703402c7508b43808bff Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Tue, 28 Apr 2026 14:52:06 -0300 Subject: [PATCH] feat: add support to auto inject context data into workers --- configurer/worker | 16 -- package-lock.json | 4 +- package.json | 2 +- src/drivers/AwsSqsDriver.ts | 5 +- src/drivers/DatabaseDriver.ts | 4 +- src/drivers/Driver.ts | 37 ++++- src/drivers/FakeDriver.ts | 50 ++++-- src/drivers/MemoryDriver.ts | 4 +- src/helpers/QueueJobPropagationHelper.ts | 154 +++++++++++++++++++ src/kernels/WorkerKernel.ts | 23 --- src/queue/QueueImpl.ts | 23 ++- src/worker/QueueExecutionScope.ts | 146 +++++++++++++++--- src/worker/WorkerImpl.ts | 15 -- src/worker/WorkerTaskBuilder.ts | 13 +- tests/unit/drivers/AwsSqsDriverTest.ts | 3 +- tests/unit/drivers/DatabaseDriverTest.ts | 3 +- tests/unit/drivers/FakeDriverTest.ts | 3 +- tests/unit/drivers/MemoryDriverTest.ts | 3 +- tests/unit/kernels/WorkerKernelTest.ts | 43 +----- tests/unit/worker/QueueExecutionScopeTest.ts | 3 +- tests/unit/worker/WorkerImplTest.ts | 1 - 21 files changed, 402 insertions(+), 153 deletions(-) create mode 100644 src/helpers/QueueJobPropagationHelper.ts diff --git a/configurer/worker b/configurer/worker index 9fdf802..65824f5 100644 --- a/configurer/worker +++ b/configurer/worker @@ -1,20 +1,4 @@ export default { - /* - |-------------------------------------------------------------------------- - | Configurations for cls-rtracer plugin. - |-------------------------------------------------------------------------- - | - | This values defines all the configurations for cls-rtracer plugins. Check - | the documentation for more information: - | - | https://github.com/puzpuzpuz/cls-rtracer - | - */ - - rTracer: { - enabled: true - }, - /* |-------------------------------------------------------------------------- | Log worker tasks diff --git a/package-lock.json b/package-lock.json index 4f8b864..cc6597a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@athenna/queue", - "version": "5.29.0", + "version": "5.31.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@athenna/queue", - "version": "5.29.0", + "version": "5.31.0", "license": "MIT", "dependencies": { "@aws-sdk/client-sqs": "^3.1019.0" diff --git a/package.json b/package.json index 0c7ff71..4d13fd2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@athenna/queue", - "version": "5.30.0", + "version": "5.31.0", "description": "The Athenna queue handler.", "license": "MIT", "author": "João Lenon ", diff --git a/src/drivers/AwsSqsDriver.ts b/src/drivers/AwsSqsDriver.ts index f7cd5bd..584a5b2 100644 --- a/src/drivers/AwsSqsDriver.ts +++ b/src/drivers/AwsSqsDriver.ts @@ -23,6 +23,7 @@ import { Is, Options, Uuid } from '@athenna/common' import type { ConnectionOptions } from '#src/types' import { ConnectionFactory } from '#src/factories/ConnectionFactory' import { QueueExecutionScope } from '#src/worker/QueueExecutionScope' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' import { AwsSqsDriverExceptionHandler } from '#src/handlers/AwsSqsDriverExceptionHandler' import { NotFifoSqsQueueTypeException } from '#src/exceptions/NotFifoSqsQueueTypeException' @@ -433,6 +434,8 @@ export class AwsSqsDriver extends Driver { data: job.data } + const executionJob = QueueJobPropagationHelper.getJob(workerJob) + await this.runScopedQueueProcessor( processor, workerJob, @@ -440,7 +443,7 @@ export class AwsSqsDriver extends Driver { try { startHeartbeat() - await processor(workerJob) + await processor(executionJob) stopHeartbeat() diff --git a/src/drivers/DatabaseDriver.ts b/src/drivers/DatabaseDriver.ts index 306504e..17d91a4 100644 --- a/src/drivers/DatabaseDriver.ts +++ b/src/drivers/DatabaseDriver.ts @@ -13,6 +13,7 @@ import { Is, Options } from '@athenna/common' import type { ConnectionOptions } from '#src/types' import type { DatabaseImpl } from '@athenna/database' import { ConnectionFactory } from '#src/factories/ConnectionFactory' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' import { DatabaseDriverExceptionHandler } from '#src/handlers/DatabaseDriverExceptionHandler' export class DatabaseDriver extends Driver { @@ -331,10 +332,11 @@ export class DatabaseDriver extends Driver { attempts: job.attempts, data: job.data } + const executionJob = QueueJobPropagationHelper.getJob(workerJob) await this.runScopedQueueProcessor(processor, workerJob, async () => { try { - await processor(workerJob) + await processor(executionJob) /** * If the job still exists after processing, it means that the job was diff --git a/src/drivers/Driver.ts b/src/drivers/Driver.ts index fe2b4f2..18993e2 100644 --- a/src/drivers/Driver.ts +++ b/src/drivers/Driver.ts @@ -12,6 +12,7 @@ import { Is } from '@athenna/common' import { Config } from '@athenna/config' import type { Job, ConnectionOptions } from '#src/types' import { QueueExecutionScope } from '#src/worker/QueueExecutionScope' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' export const RUN_WITH_WORKER_CONTEXT = Symbol.for( '@athenna/queue.runWithWorkerContext' @@ -199,13 +200,19 @@ export abstract class Driver { return runner(data, callback, captureScope) } - const scope = new QueueExecutionScope({ - name: this.queueName, - connection: this.connection, - options: this.options, - traceId: null, - job: this.createContextJob(data) - }) + const scope = new QueueExecutionScope( + { + name: this.queueName, + connection: this.connection, + options: this.options, + traceId: null, + job: QueueJobPropagationHelper.getJob(this.createContextJob(data)) + }, + { + carrier: this.getJobCarrier(data), + currentContextValues: this.getJobCurrentContextValues(data) + } + ) captureScope?.(scope) @@ -224,6 +231,22 @@ export abstract class Driver { } as Job } + private getJobCarrier(data: T) { + if (!this.isJob(data)) { + return {} + } + + return QueueJobPropagationHelper.getCarrier(data.data) + } + + private getJobCurrentContextValues(data: T) { + if (!this.isJob(data)) { + return {} + } + + return QueueJobPropagationHelper.getCurrentContextValues(data.data) + } + private isJob(data: unknown): data is Job { if (!data || !Is.Object(data)) { return false diff --git a/src/drivers/FakeDriver.ts b/src/drivers/FakeDriver.ts index 9101049..04b5aa1 100644 --- a/src/drivers/FakeDriver.ts +++ b/src/drivers/FakeDriver.ts @@ -12,6 +12,7 @@ import { Is, Json, Options } from '@athenna/common' import type { Job, ConnectionOptions } from '#src/types' import { ConnectionFactory } from '#src/factories/ConnectionFactory' import { QueueExecutionScope } from '#src/worker/QueueExecutionScope' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' import { RUN_WITH_WORKER_CONTEXT, type ScopedQueueProcessor @@ -171,8 +172,8 @@ export class FakeDriver { * const user = await Queue.pop() * ``` */ - public static async pop(): Promise { - return {} + public static async pop(): Promise { + return null as T } /** @@ -185,8 +186,8 @@ export class FakeDriver { * const user = await Queue.pop() * ``` */ - public static async peek(): Promise { - return {} + public static async peek(): Promise { + return null as T } /** @@ -282,13 +283,19 @@ export class FakeDriver { return runner(data, callback, captureScope) } - const scope = new QueueExecutionScope({ - name: this.queueName, - connection: this.connection, - options: this.options, - traceId: null, - job: this.createContextJob(data) - }) + const scope = new QueueExecutionScope( + { + name: this.queueName, + connection: this.connection, + options: this.options, + traceId: null, + job: QueueJobPropagationHelper.getJob(this.createContextJob(data)) + }, + { + carrier: this.getJobCarrier(data), + currentContextValues: this.getJobCurrentContextValues(data) + } + ) captureScope?.(scope) @@ -311,10 +318,13 @@ export class FakeDriver { processor: (data: unknown) => any | Promise ) { const data = await this.pop() + const executionData = this.isJob(data) + ? QueueJobPropagationHelper.getJob(data) + : data await this.runScopedQueueProcessor(processor, data, async () => { try { - await processor(data) + await processor(executionData) } catch (err) { Log.channelOrVanilla('exception').error({ msg: `failed to process job: ${err.message}`, @@ -331,4 +341,20 @@ export class FakeDriver { } }) } + + private static getJobCarrier(data: T) { + if (!this.isJob(data)) { + return {} + } + + return QueueJobPropagationHelper.getCarrier(data.data) + } + + private static getJobCurrentContextValues(data: T) { + if (!this.isJob(data)) { + return {} + } + + return QueueJobPropagationHelper.getCurrentContextValues(data.data) + } } diff --git a/src/drivers/MemoryDriver.ts b/src/drivers/MemoryDriver.ts index 9678263..0c61cbd 100644 --- a/src/drivers/MemoryDriver.ts +++ b/src/drivers/MemoryDriver.ts @@ -11,6 +11,7 @@ import { Driver } from '#src/drivers/Driver' import { Options, Uuid } from '@athenna/common' import type { ConnectionOptions } from '#src/types' import { ConnectionFactory } from '#src/factories/ConnectionFactory' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' import { MemoryDriverExceptionHandler } from '#src/handlers/MemoryDriverExceptionHandler' export class MemoryDriver extends Driver { @@ -269,10 +270,11 @@ export class MemoryDriver extends Driver { attempts: job.attempts, data: job.data } + const executionJob = QueueJobPropagationHelper.getJob(workerJob) await this.runScopedQueueProcessor(processor, workerJob, async () => { try { - await processor(workerJob) + await processor(executionJob) /** * If the job still exists after processing, it means that the job was diff --git a/src/helpers/QueueJobPropagationHelper.ts b/src/helpers/QueueJobPropagationHelper.ts new file mode 100644 index 0000000..391643d --- /dev/null +++ b/src/helpers/QueueJobPropagationHelper.ts @@ -0,0 +1,154 @@ +/** + * @athenna/queue + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import type { Job } from '#src/types' +import { Is, Module } from '@athenna/common' +import { context, propagation } from '@opentelemetry/api' + +const otelModule = await Module.safeImport('@athenna/otel') +const QUEUE_ENVELOPE_KEY = '__athenna_queue__' +const QUEUE_ENVELOPE_VERSION = 1 + +type QueueTraceCarrier = Record + +type QueueEnvelopeMetadata = { + version: number + carrier: QueueTraceCarrier + currentContextValues: Record +} + +type QueueEnvelope = { + [QUEUE_ENVELOPE_KEY]: QueueEnvelopeMetadata + payload: T +} + +export class QueueJobPropagationHelper { + public static createEnvelope(data: T) { + if (QueueJobPropagationHelper.isEnvelope(data)) { + return data + } + + if (!otelModule?.Otel?.isEnabled()) { + return data + } + + const carrier = QueueJobPropagationHelper.injectContext({}) + const currentContextValues = + QueueJobPropagationHelper.captureCurrentContextValues() + + if ( + !Object.keys(carrier).length && + !Object.keys(currentContextValues).length + ) { + return data + } + + return { + [QUEUE_ENVELOPE_KEY]: { + version: QUEUE_ENVELOPE_VERSION, + carrier, + currentContextValues + }, + payload: data + } satisfies QueueEnvelope + } + + public static getCarrier(data: unknown): QueueTraceCarrier { + if (!QueueJobPropagationHelper.isEnvelope(data)) { + return {} + } + + return data[QUEUE_ENVELOPE_KEY].carrier || {} + } + + public static getCurrentContextValues(data: unknown) { + if (!QueueJobPropagationHelper.isEnvelope(data)) { + return {} + } + + return data[QUEUE_ENVELOPE_KEY].currentContextValues || {} + } + + public static getPayload(data: T) { + if (!QueueJobPropagationHelper.isEnvelope(data)) { + return data + } + + return data.payload as T + } + + public static getJob(job: Job) { + return { + ...job, + data: QueueJobPropagationHelper.getPayload(job.data) + } + } + + private static isEnvelope(data: unknown): data is QueueEnvelope { + if (!data || !Is.Object(data)) { + return false + } + + const candidate = data as Partial + + if (!(QUEUE_ENVELOPE_KEY in candidate) || !('payload' in candidate)) { + return false + } + + const metadata = candidate[QUEUE_ENVELOPE_KEY] + + return ( + Is.Object(metadata) && + metadata.version === QUEUE_ENVELOPE_VERSION && + Is.Object(metadata.carrier) && + Is.Object(metadata.currentContextValues) + ) + } + + private static injectContext(carrier: Record) { + if (Is.Function(otelModule?.Otel?.injectContext)) { + return otelModule.Otel.injectContext(carrier) + } + + propagation.inject(context.active(), carrier) + + return carrier + } + + private static captureCurrentContextValues() { + if (Is.Function(otelModule?.Otel?.captureCurrentContextValues)) { + return otelModule.Otel.captureCurrentContextValues() + } + + const values: Record = {} + const contextBagSymbol = QueueJobPropagationHelper.getContextBagSymbol() + const store = context.active().getValue(contextBagSymbol as any) as + | Map + | undefined + + if (!(store instanceof Map)) { + return values + } + + for (const [key, value] of store.entries()) { + if (Is.String(key)) { + values[key] = value + } + } + + return values + } + + private static getContextBagSymbol() { + return ( + otelModule?.Otel?.contextBagSymbol || + Symbol.for('athenna.otel.currentContextBag') + ) + } +} diff --git a/src/kernels/WorkerKernel.ts b/src/kernels/WorkerKernel.ts index 131e17d..96eaebc 100644 --- a/src/kernels/WorkerKernel.ts +++ b/src/kernels/WorkerKernel.ts @@ -17,29 +17,6 @@ import { sep, isAbsolute, resolve } from 'node:path' import { Annotation, type ServiceMeta } from '@athenna/ioc' export class WorkerKernel { - /** - * Register the cls-rtracer plugin in the Worker. - */ - public async registerRTracer(): Promise { - const rTracerPlugin = await Module.safeImport('cls-rtracer') - - if (Config.is('worker.rTracer.enabled', false)) { - debug( - 'Not able to register rTracer plugin. Set the worker.rTracer.enabled configuration as true.' - ) - - return - } - - if (!rTracerPlugin) { - debug('Not able to register tracer plugin. Install cls-rtracer package.') - - return - } - - Queue.worker().setRTracerPlugin(rTracerPlugin) - } - /** * Register the job logger in the Worker. */ diff --git a/src/queue/QueueImpl.ts b/src/queue/QueueImpl.ts index 3a233cb..bf3707e 100644 --- a/src/queue/QueueImpl.ts +++ b/src/queue/QueueImpl.ts @@ -15,6 +15,7 @@ import type { AwsSqsDriver } from '#src/drivers/AwsSqsDriver' import type { MemoryDriver } from '#src/drivers/MemoryDriver' import type { DatabaseDriver } from '#src/drivers/DatabaseDriver' import { ConnectionFactory } from '#src/factories/ConnectionFactory' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' export class QueueImpl extends Macroable { /** @@ -181,7 +182,7 @@ export class QueueImpl extends Macroable { * ``` */ public async add(item: unknown) { - await this.driver.add(item) + await this.driver.add(QueueJobPropagationHelper.createEnvelope(item)) } /** @@ -194,8 +195,14 @@ export class QueueImpl extends Macroable { * const user = await Queue.pop() * ``` */ - public async pop() { - return this.driver.pop() + public async pop>() { + const job = await this.driver.pop() + + if (!job) { + return job + } + + return QueueJobPropagationHelper.getJob(job) } /** @@ -208,8 +215,14 @@ export class QueueImpl extends Macroable { * const user = await Queue.peek() * ``` */ - public async peek() { - return this.driver.peek() + public async peek>() { + const job = await this.driver.peek() + + if (!job) { + return job + } + + return QueueJobPropagationHelper.getJob(job) } /** diff --git a/src/worker/QueueExecutionScope.ts b/src/worker/QueueExecutionScope.ts index cc61c2a..3de7424 100644 --- a/src/worker/QueueExecutionScope.ts +++ b/src/worker/QueueExecutionScope.ts @@ -7,8 +7,16 @@ * file that was distributed with this source code. */ +import { + trace, + context, + propagation, + SpanStatusCode, + type Context as OtelContext +} from '@opentelemetry/api' + import { Config } from '@athenna/config' -import { Module } from '@athenna/common' +import { Is, Module } from '@athenna/common' import type { Job, Context, ConnectionOptions } from '#src/types' const otelModule = await Module.safeImport('@athenna/otel') @@ -24,10 +32,8 @@ export type QueueExecutionScopeContext = { type QueueExecutionScopeOptions = { beforeRun?: () => void afterRun?: () => void - rTracerPlugin?: { - id?: () => string - runWithId?: (callback: () => R) => R - } + carrier?: Record + currentContextValues?: Record resolveBinding?: (binding: any, context: QueueExecutionScopeContext) => any } @@ -41,7 +47,7 @@ export class QueueExecutionScope { this.options.beforeRun?.() try { - const result = this.runInTracingContext(callback) + const result = this.runInOtelContext(callback) if (result instanceof Promise) { return result.finally(() => this.options.afterRun?.()) as R @@ -62,32 +68,124 @@ export class QueueExecutionScope { this.run(() => handler(...args))) as TCallback } - private runInTracingContext(callback: () => R): R { - const execute = () => { - this.context.traceId = - this.options.rTracerPlugin?.id?.() || this.context.traceId || null - - return this.runInOtelContext(callback) + private runInOtelContext(callback: () => R): R { + if (!Config.is('worker.otel.contextEnabled', true) || !otelModule) { + return callback() } - if (this.options.rTracerPlugin?.runWithId) { - return this.options.rTracerPlugin.runWithId(execute) + let parentContext = this.extractContext(this.options.carrier) + + if (Object.keys(this.options.currentContextValues || {}).length) { + parentContext = this.restoreCurrentContextValues( + this.options.currentContextValues, + parentContext + ) } - return execute() + return otelModule.Otel.withContext( + () => { + return this.runInsideSpan(callback) + }, + { + ctx: parentContext, + bindings: Config.get('worker.otel.contextBindings', []), + resolveBinding: (binding: any) => + this.options.resolveBinding + ? this.options.resolveBinding(binding, this.context) + : binding.resolve(this.context as Context) + } + ) } - private runInOtelContext(callback: () => R): R { - if (!Config.is('worker.otel.contextEnabled', true) || !otelModule) { - return callback() + private getSpanName() { + if (this.context.name) { + return `queue.process.${this.context.name}` + } + + if (this.context.connection) { + return `queue.process.${this.context.connection}` } - return otelModule.Otel.withContext(callback, { - bindings: Config.get('worker.otel.contextBindings', []), - resolveBinding: (binding: any) => - this.options.resolveBinding - ? this.options.resolveBinding(binding, this.context) - : binding.resolve(this.context as Context) + return 'queue.process' + } + + private runInsideSpan(callback: () => R): R { + const tracer = trace.getTracer('@athenna/queue') + + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + return tracer.startActiveSpan(this.getSpanName(), span => { + this.context.traceId = span.spanContext().traceId + + try { + const result = callback() + + if (result instanceof Promise) { + return result + .then(value => { + span.end() + + return value + }) + .catch(error => { + throw this.handleSpanError(error, span) + }) as R + } + + span.end() + + return result + } catch (error) { + throw this.handleSpanError(error, span) + } }) } + + private handleSpanError(error: any, span: any) { + span.recordException(error) + span.setStatus({ code: SpanStatusCode.ERROR, message: error?.message }) + span.end() + + return error + } + + private extractContext( + carrier?: Record + ) { + if (!carrier || !Object.keys(carrier).length) { + return otelModule.Otel.context.active() + } + + if (Is.Function(otelModule.Otel.extractContext)) { + return otelModule.Otel.extractContext(carrier) + } + + return propagation.extract(context.active(), carrier) + } + + private restoreCurrentContextValues( + values: Record, + parentContext: OtelContext + ) { + if (Is.Function(otelModule.Otel.restoreCurrentContextValues)) { + return otelModule.Otel.restoreCurrentContextValues(values, parentContext) + } + + let nextContext = parentContext + const store = new Map() + + for (const [key, value] of Object.entries(values)) { + store.set(key, value) + nextContext = nextContext.setValue(key as any, value) + } + + return nextContext.setValue(this.getContextBagSymbol(), store) + } + + private getContextBagSymbol() { + return ( + otelModule?.Otel?.contextBagSymbol || + Symbol.for('athenna.otel.currentContextBag') + ) + } } diff --git a/src/worker/WorkerImpl.ts b/src/worker/WorkerImpl.ts index 3d3f5b2..aa76f7c 100644 --- a/src/worker/WorkerImpl.ts +++ b/src/worker/WorkerImpl.ts @@ -12,7 +12,6 @@ import { NotFoundWorkerTaskException } from '#src/exceptions/NotFoundWorkerTaskE export class WorkerImpl { public static loggerIsSet = false - public static rTracerPlugin: any public static tasks: WorkerTaskBuilder[] = [] /** @@ -44,20 +43,6 @@ export class WorkerImpl { return this } - /** - * Set if the rTracer plugin should be set or not. - * - * @example - * ```ts - * Worker.setRTracerPlugin(true) - * ``` - */ - public setRTracerPlugin(rTracerPlugin: any) { - WorkerImpl.rTracerPlugin = rTracerPlugin - - return this - } - /** * Returns a map with all worker tasks that has been registered. * diff --git a/src/worker/WorkerTaskBuilder.ts b/src/worker/WorkerTaskBuilder.ts index 96020cb..5e7cee3 100644 --- a/src/worker/WorkerTaskBuilder.ts +++ b/src/worker/WorkerTaskBuilder.ts @@ -16,6 +16,7 @@ import type { WorkerHandler } from '#src/types/WorkerHandler' import { RUN_WITH_WORKER_CONTEXT } from '#src/drivers/Driver' import { QueueExecutionScope } from '#src/worker/QueueExecutionScope' import { WorkerTimeoutException } from '#src/exceptions/WorkerTimeoutException' +import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper' export class WorkerTaskBuilder { public worker: { @@ -103,7 +104,10 @@ export class WorkerTaskBuilder { this.worker.handler = async ctx => { return new QueueExecutionScope(ctx, { - rTracerPlugin: WorkerImpl.rTracerPlugin + carrier: QueueJobPropagationHelper.getCarrier(ctx.job.data), + currentContextValues: QueueJobPropagationHelper.getCurrentContextValues( + ctx.job.data + ) }).run(() => this.executeHandler(ctx)) } @@ -343,7 +347,10 @@ export class WorkerTaskBuilder { afterRun: () => { currentCtx = null }, - rTracerPlugin: WorkerImpl.rTracerPlugin + carrier: QueueJobPropagationHelper.getCarrier(job.data), + currentContextValues: QueueJobPropagationHelper.getCurrentContextValues( + job.data + ) }) captureScope?.(scope) @@ -360,7 +367,7 @@ export class WorkerTaskBuilder { traceId: null, connection: this.worker.connection, options: this.worker.options, - job + job: QueueJobPropagationHelper.getJob(job) } } diff --git a/tests/unit/drivers/AwsSqsDriverTest.ts b/tests/unit/drivers/AwsSqsDriverTest.ts index 29783a4..e97d37d 100644 --- a/tests/unit/drivers/AwsSqsDriverTest.ts +++ b/tests/unit/drivers/AwsSqsDriverTest.ts @@ -9,7 +9,7 @@ import { Is, Path } from '@athenna/common' import { EnvHelper } from '@athenna/config' -import { OtelProvider } from '@athenna/otel' +import { Otel, OtelProvider } from '@athenna/otel' import { BaseTest } from '#tests/helpers/BaseTest' import { Log, LoggerProvider } from '@athenna/logger' import { Queue, WorkerProvider, QueueProvider } from '#src' @@ -25,6 +25,7 @@ export class AwsSqsDriverTest extends BaseTest { await Config.loadAll(Path.fixtures('config')) new OtelProvider().register() + Otel.start() new QueueProvider().register() new WorkerProvider().register() new LoggerProvider().register() diff --git a/tests/unit/drivers/DatabaseDriverTest.ts b/tests/unit/drivers/DatabaseDriverTest.ts index 482c448..c1166fc 100644 --- a/tests/unit/drivers/DatabaseDriverTest.ts +++ b/tests/unit/drivers/DatabaseDriverTest.ts @@ -9,7 +9,7 @@ import { Queue, QueueProvider } from '#src' import { Path, Sleep } from '@athenna/common' -import { OtelProvider } from '@athenna/otel' +import { Otel, OtelProvider } from '@athenna/otel' import { Log, LoggerProvider } from '@athenna/logger' import { context, createContextKey } from '@opentelemetry/api' import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks' @@ -23,6 +23,7 @@ export class DatabaseDriverTest { await Config.loadAll(Path.fixtures('config')) new OtelProvider().register() + Otel.start() new DatabaseProvider().register() new QueueProvider().register() new LoggerProvider().register() diff --git a/tests/unit/drivers/FakeDriverTest.ts b/tests/unit/drivers/FakeDriverTest.ts index 3dcc25b..4b3eb92 100644 --- a/tests/unit/drivers/FakeDriverTest.ts +++ b/tests/unit/drivers/FakeDriverTest.ts @@ -9,7 +9,7 @@ import { Path } from '@athenna/common' import { Queue, QueueProvider } from '#src' -import { OtelProvider } from '@athenna/otel' +import { Otel, OtelProvider } from '@athenna/otel' import { Log, LoggerProvider } from '@athenna/logger' import { context, createContextKey } from '@opentelemetry/api' import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks' @@ -22,6 +22,7 @@ export class FakeDriverTest { await Config.loadAll(Path.fixtures('config')) new OtelProvider().register() + Otel.start() new QueueProvider().register() new LoggerProvider().register() } diff --git a/tests/unit/drivers/MemoryDriverTest.ts b/tests/unit/drivers/MemoryDriverTest.ts index 16bc53c..e42d8ca 100644 --- a/tests/unit/drivers/MemoryDriverTest.ts +++ b/tests/unit/drivers/MemoryDriverTest.ts @@ -9,7 +9,7 @@ import { Queue, QueueProvider } from '#src' import { Path, Sleep } from '@athenna/common' -import { OtelProvider } from '@athenna/otel' +import { Otel, OtelProvider } from '@athenna/otel' import { Log, LoggerProvider } from '@athenna/logger' import { context, createContextKey } from '@opentelemetry/api' import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks' @@ -22,6 +22,7 @@ export class MemoryDriverTest { await Config.loadAll(Path.fixtures('config')) new OtelProvider().register() + Otel.start() new QueueProvider().register() new LoggerProvider().register() } diff --git a/tests/unit/kernels/WorkerKernelTest.ts b/tests/unit/kernels/WorkerKernelTest.ts index 993b448..7781d95 100644 --- a/tests/unit/kernels/WorkerKernelTest.ts +++ b/tests/unit/kernels/WorkerKernelTest.ts @@ -9,10 +9,10 @@ import { Queue } from '#src/facades/Queue' import { Worker } from '#src/facades/Worker' -import { OtelProvider } from '@athenna/otel' import { Path, Sleep } from '@athenna/common' -import { Log, LoggerProvider } from '@athenna/logger' +import { Otel, OtelProvider } from '@athenna/otel' import { WorkerImpl } from '#src/worker/WorkerImpl' +import { Log, LoggerProvider } from '@athenna/logger' import { WorkerKernel } from '#src/kernels/WorkerKernel' import { constants } from '#tests/fixtures/constants/index' import { QueueProvider } from '#src/providers/QueueProvider' @@ -28,10 +28,10 @@ export class WorkerKernelTest { context.setGlobalContextManager(new AsyncLocalStorageContextManager().enable()) WorkerImpl.loggerIsSet = false - WorkerImpl.rTracerPlugin = undefined await Config.loadAll(Path.fixtures('config')) new OtelProvider().register() + Otel.start() new LoggerProvider().register() new QueueProvider().register() new WorkerProvider().register() @@ -52,35 +52,11 @@ export class WorkerKernelTest { } @Test() - public async shouldBeAbleToRegisterRTracerPluginInWorkerHandler({ assert }: Context) { - const kernel = new WorkerKernel() - - await kernel.registerRTracer() - - assert.isDefined(WorkerImpl.rTracerPlugin) - } - - @Test() - public async shouldNotRegisterRTracerPluginInWorkerHandlerIfRTracerConfigIsDisabled({ assert }: Context) { - Config.set('worker.rTracer.enabled', false) - - const kernel = new WorkerKernel() - - await kernel.registerRTracer() - - assert.isUndefined(WorkerImpl.rTracerPlugin) - } - - @Test() - public async shouldBeAbleToGetTraceIdInHandlerWhenRTracerPluginIsEnabled({ assert }: Context) { - const kernel = new WorkerKernel() - - await kernel.registerRTracer() - + public async shouldBeAbleToGetTraceIdInHandlerFromTheActiveOtelSpan({ assert }: Context) { let traceId = null Worker.task() - .name('r_tracer') + .name('otel_trace') .connection('memory') .handler(ctx => { traceId = ctx.traceId @@ -100,7 +76,6 @@ export class WorkerKernelTest { Config.set('worker.otel.contextBindings', []) }) public async shouldBeAbleToRunWorkerHandlersInsideConfiguredOtelContext({ assert }: Context) { - const kernel = new WorkerKernel() const workerNameKey = createContextKey('worker.name') const workerConnectionKey = createContextKey('worker.connection') let values: any = {} @@ -111,8 +86,6 @@ export class WorkerKernelTest { { key: workerConnectionKey, resolve: ctx => ctx.connection } ]) - await kernel.registerRTracer() - Worker.task() .name('otel_worker') .connection('memory') @@ -255,10 +228,9 @@ export class WorkerKernelTest { } @Test() - public async shouldBeAbleToRegisterRTracerPluginInWorkerHandlerAndRunAWorker({ assert }: Context) { + public async shouldBeAbleToRegisterWorkersAndRunAWorker({ assert }: Context) { const kernel = new WorkerKernel() - await kernel.registerRTracer() await kernel.registerWorkers() await Queue.add({ test: 1 }) @@ -283,11 +255,10 @@ export class WorkerKernelTest { } @Test() - public async shouldBeAbleToRegisterLoggerAndRTracerPluginInWorkerHandlerAndRunAWorker({ assert }: Context) { + public async shouldBeAbleToRegisterLoggerAndWorkersAndRunAWorker({ assert }: Context) { const kernel = new WorkerKernel() await kernel.registerLogger() - await kernel.registerRTracer() await kernel.registerWorkers() await Queue.add({ test: 1 }) diff --git a/tests/unit/worker/QueueExecutionScopeTest.ts b/tests/unit/worker/QueueExecutionScopeTest.ts index 0cefd6d..aa45d80 100644 --- a/tests/unit/worker/QueueExecutionScopeTest.ts +++ b/tests/unit/worker/QueueExecutionScopeTest.ts @@ -8,7 +8,7 @@ */ import { Path, Sleep } from '@athenna/common' -import { OtelProvider } from '@athenna/otel' +import { Otel, OtelProvider } from '@athenna/otel' import { QueueExecutionScope } from '#src/worker/QueueExecutionScope' import { context, createContextKey } from '@opentelemetry/api' import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks' @@ -21,6 +21,7 @@ export class QueueExecutionScopeTest { await Config.loadAll(Path.fixtures('config')) new OtelProvider().register() + Otel.start() } @AfterEach() diff --git a/tests/unit/worker/WorkerImplTest.ts b/tests/unit/worker/WorkerImplTest.ts index be2f762..b8186e5 100644 --- a/tests/unit/worker/WorkerImplTest.ts +++ b/tests/unit/worker/WorkerImplTest.ts @@ -29,7 +29,6 @@ export class WorkerImplTest { @AfterEach() public async afterEach() { WorkerImpl.loggerIsSet = false - WorkerImpl.rTracerPlugin = undefined await new QueueProvider().shutdown() await new WorkerProvider().shutdown()