diff --git a/src/__tests__/api/master/asyncJobWorker.test.ts b/src/__tests__/api/master/asyncJobWorker.test.ts new file mode 100644 index 00000000..a4e7b203 --- /dev/null +++ b/src/__tests__/api/master/asyncJobWorker.test.ts @@ -0,0 +1,393 @@ +import 'should'; +import nock from 'nock'; +import sinon from 'sinon'; +import { BitGoAPI } from '@bitgo-beta/sdk-api'; +import { Environments } from '@bitgo-beta/sdk-core'; +import { OsoBridgeClient } from '../../../masterBitgoExpress/clients/bridgeClient'; +import { BridgeJobResponse } from '../../../masterBitgoExpress/clients/bridgeClient.types'; +import { + startAsyncJobWorker, + processPendingJobs, + handleKeyGenerationOperation, +} from '../../../masterBitgoExpress/workers/asyncJobWorker'; +import { AppMode, MasterExpressConfig, TlsMode } from '../../../shared/types'; +import { DEFAULT_ASYNC_MODE_CONFIG } from './testUtils'; + +const BRIDGE_URL = 'http://bridge.invalid'; +const BITGO_API_URL = Environments.test.uri; +const COIN = 'tbtc'; +const POLL_INTERVAL_MS = 1000; + +function makeUserKeychain() { + return { + id: 'user-key-id', + pub: 'xpub_user', + encryptedPrv: 'encrypted-user-prv', + type: 'independent' as const, + source: 'user' as const, + coin: COIN, + }; +} + +function makeBackupKeychain() { + return { + id: 'backup-key-id', + pub: 'xpub_backup', + encryptedPrv: 'encrypted-backup-prv', + type: 'independent' as const, + source: 'backup' as const, + coin: COIN, + }; +} + +function awmOk(body: Record) { + return { status: 200, body }; +} + +function makeJob(overrides: Partial = {}): BridgeJobResponse { + return { + jobId: 'job-123', + status: 'awaiting_bitgo', + version: 1, + coin: COIN, + operationType: 'multisig_keygen', + awmResponse: awmOk({ ...makeUserKeychain() }), + awmBackupResponse: awmOk({ ...makeBackupKeychain() }), + request: { body: { label: 'test-wallet', enterprise: 'test-enterprise' } }, + createdAt: '2026-06-10T00:00:00.000Z', + updatedAt: '2026-06-10T00:00:00.000Z', + ...overrides, + }; +} + +function makeConfig(overrides: Partial = {}): MasterExpressConfig { + return { + appMode: AppMode.MASTER_EXPRESS, + port: 0, + bind: 'localhost', + timeout: 60000, + httpLoggerFile: '', + env: 'test', + disableEnvCheck: true, + authVersion: 2, + advancedWalletManagerUrl: 'http://awm.invalid', + awmServerCaCert: 'dummy-cert', + tlsMode: TlsMode.DISABLED, + clientCertAllowSelfSigned: true, + bitgoAccessToken: 'test-access-token', + asyncModeConfig: { + ...DEFAULT_ASYNC_MODE_CONFIG, + enabled: true, + awmAsyncUrl: BRIDGE_URL, + pollIntervalInMs: POLL_INTERVAL_MS, + }, + ...overrides, + }; +} + +function nockBitgoKeychainRegistration(options: { + pub: string; + source: 'user' | 'backup'; + keyId: string; +}) { + return nock(BITGO_API_URL) + .post( + `/api/v2/${COIN}/key`, + (body) => body.pub === options.pub && body.source === options.source, + ) + .matchHeader('any', () => true) + .reply(200, { id: options.keyId, pub: options.pub, source: options.source }); +} + +function nockBitgoKeyCreate(keyId: string) { + return nock(BITGO_API_URL) + .post(`/api/v2/${COIN}/key`, (body) => body.source === 'bitgo') + .matchHeader('any', () => true) + .reply(200, { id: keyId, pub: 'xpub_bitgo', source: 'bitgo' }); +} + +function nockWalletAdd(walletId: string) { + return nock(BITGO_API_URL) + .post(`/api/v2/${COIN}/wallet/add`) + .matchHeader('any', () => true) + .reply(200, { + id: walletId, + coin: COIN, + label: 'test-wallet', + keys: ['user-key-id', 'backup-key-id', 'bitgo-key-id'], + }); +} + +function nockUpdateJobFailed(jobId: string) { + return nock(BRIDGE_URL) + .patch(`/job/${jobId}`, (body) => body.status === 'failed') + .reply(204); +} + +function nockUpdateJobComplete(jobId: string, walletId: string) { + return nock(BRIDGE_URL) + .patch( + `/job/${jobId}`, + (body) => body.status === 'complete' && body.result?.walletId === walletId, + ) + .reply(204); +} + +describe('asyncJobWorker', () => { + let bitgo: BitGoAPI; + let bridge: OsoBridgeClient; + + before(() => { + nock.disableNetConnect(); + }); + + after(() => { + nock.enableNetConnect(); + }); + + beforeEach(() => { + bitgo = new BitGoAPI({ env: 'test', accessToken: 'test-access-token' }); + bridge = new OsoBridgeClient(BRIDGE_URL, 60000); + }); + + afterEach(() => { + nock.cleanAll(); + sinon.restore(); + }); + + describe('startAsyncJobWorker()', () => { + it('starts polling at the configured interval', async () => { + const clock = sinon.useFakeTimers(); + + const listJobsNock = nock(BRIDGE_URL) + .get('/jobs') + .query({ status: 'awaiting_bitgo' }) + .reply(200, { jobs: [] }); + + startAsyncJobWorker(makeConfig()); + + await clock.tickAsync(POLL_INTERVAL_MS); + + listJobsNock.done(); + clock.restore(); + }); + + it('does not fire a second handler job while the first is still running', async () => { + const clock = sinon.useFakeTimers(); + let callCount = 0; + + nock(BRIDGE_URL) + .get('/jobs') + .query({ status: 'awaiting_bitgo' }) + .times(1) + .reply(200, () => { + callCount++; + return { jobs: [] }; + }); + + startAsyncJobWorker(makeConfig()); + + await clock.tickAsync(POLL_INTERVAL_MS * 3); + + callCount.should.equal(1); + clock.restore(); + }); + }); + + describe('processPendingJobs()', () => { + it('returns early when no awaiting_bitgo jobs exist', async () => { + const n = nock(BRIDGE_URL) + .get('/jobs') + .query({ status: 'awaiting_bitgo' }) + .reply(200, { jobs: [] }); + + await processPendingJobs(bridge, bitgo).should.be.fulfilled(); + + n.done(); + }); + + it('processes all returned jobs', async () => { + const job1 = makeJob({ jobId: 'job-1' }); + const job2 = makeJob({ jobId: 'job-2' }); + + nock(BRIDGE_URL) + .get('/jobs') + .query({ status: 'awaiting_bitgo' }) + .reply(200, { jobs: [job1, job2] }); + + nockBitgoKeychainRegistration({ pub: 'xpub_user', source: 'user', keyId: 'user-key-id' }); + nockBitgoKeychainRegistration({ + pub: 'xpub_backup', + source: 'backup', + keyId: 'backup-key-id', + }); + nockBitgoKeyCreate('bitgo-key-id'); + nockWalletAdd('wallet-1'); + nockUpdateJobComplete('job-1', 'wallet-1'); + + nockBitgoKeychainRegistration({ pub: 'xpub_user', source: 'user', keyId: 'user-key-id' }); + nockBitgoKeychainRegistration({ + pub: 'xpub_backup', + source: 'backup', + keyId: 'backup-key-id', + }); + nockBitgoKeyCreate('bitgo-key-id'); + nockWalletAdd('wallet-2'); + nockUpdateJobComplete('job-2', 'wallet-2'); + + await processPendingJobs(bridge, bitgo).should.be.fulfilled(); + + nock.pendingMocks().should.have.length(0); + }); + + it('continues processing remaining jobs when one fails', async () => { + const badJob = makeJob({ + jobId: 'job-bad', + awmResponse: { status: 200, body: {} }, + }); + const goodJob = makeJob({ jobId: 'job-good' }); + + nock(BRIDGE_URL) + .get('/jobs') + .query({ status: 'awaiting_bitgo' }) + .reply(200, { jobs: [badJob, goodJob] }); + + nockUpdateJobFailed('job-bad'); + + nockBitgoKeychainRegistration({ pub: 'xpub_user', source: 'user', keyId: 'user-key-id' }); + nockBitgoKeychainRegistration({ + pub: 'xpub_backup', + source: 'backup', + keyId: 'backup-key-id', + }); + nockBitgoKeyCreate('bitgo-key-id'); + nockWalletAdd('wallet-good'); + nockUpdateJobComplete('job-good', 'wallet-good'); + + await processPendingJobs(bridge, bitgo).should.be.fulfilled(); + + nock.pendingMocks().should.have.length(0); + }); + + it('skips jobs with unknown operationType', async () => { + const job = makeJob({ operationType: 'multisig_sign' }); + + const n = nock(BRIDGE_URL) + .get('/jobs') + .query({ status: 'awaiting_bitgo' }) + .reply(200, { jobs: [job] }); + + await processPendingJobs(bridge, bitgo).should.be.fulfilled(); + + n.done(); + }); + }); + + describe('handleKeyGenerationOperation()', () => { + it('registers keychains, creates wallet, and PATCHes job complete', async () => { + const job = makeJob(); + const walletId = 'new-wallet-id'; + + const userKeyNock = nockBitgoKeychainRegistration({ + pub: 'xpub_user', + source: 'user', + keyId: 'user-key-id', + }); + const backupKeyNock = nockBitgoKeychainRegistration({ + pub: 'xpub_backup', + source: 'backup', + keyId: 'backup-key-id', + }); + const bitgoKeyNock = nockBitgoKeyCreate('bitgo-key-id'); + const walletNock = nockWalletAdd(walletId); + const updateNock = nockUpdateJobComplete(job.jobId, walletId); + + await handleKeyGenerationOperation(job, bridge, bitgo); + + userKeyNock.done(); + backupKeyNock.done(); + bitgoKeyNock.done(); + walletNock.done(); + updateNock.done(); + }); + + it('throws when awmResponse is missing', async () => { + const job = makeJob({ awmResponse: undefined }); + + await handleKeyGenerationOperation(job, bridge, bitgo).should.be.rejected(); + }); + + it('throws when awmBackupResponse is missing', async () => { + const job = makeJob({ awmBackupResponse: undefined }); + + await handleKeyGenerationOperation(job, bridge, bitgo).should.be.rejected(); + }); + + it('throws when awmResponse is not a valid AwmResponse envelope', async () => { + const job = makeJob({ + awmResponse: { unexpected: 'shape' } as unknown as BridgeJobResponse['awmResponse'], + }); + + await handleKeyGenerationOperation(job, bridge, bitgo).should.be.rejected(); + }); + + it('throws when WP keychain registration fails', async () => { + const job = makeJob(); + + nock(BITGO_API_URL) + .post(`/api/v2/${COIN}/key`) + .matchHeader('any', () => true) + .reply(500, { message: 'internal server error' }); + + await handleKeyGenerationOperation(job, bridge, bitgo).should.be.rejected(); + }); + + it('throws when wallet creation fails', async () => { + const job = makeJob(); + + nockBitgoKeychainRegistration({ pub: 'xpub_user', source: 'user', keyId: 'user-key-id' }); + nockBitgoKeychainRegistration({ + pub: 'xpub_backup', + source: 'backup', + keyId: 'backup-key-id', + }); + nockBitgoKeyCreate('bitgo-key-id'); + + nock(BITGO_API_URL) + .post(`/api/v2/${COIN}/wallet/add`) + .matchHeader('any', () => true) + .reply(500, { message: 'wallet creation failed' }); + + await handleKeyGenerationOperation(job, bridge, bitgo).should.be.rejected(); + }); + + it('uses enterprise from request body when provided', async () => { + const job = makeJob({ + request: { body: { label: 'ent-wallet', enterprise: 'my-enterprise' } }, + }); + const walletId = 'ent-wallet-id'; + + nockBitgoKeychainRegistration({ pub: 'xpub_user', source: 'user', keyId: 'user-key-id' }); + nockBitgoKeychainRegistration({ + pub: 'xpub_backup', + source: 'backup', + keyId: 'backup-key-id', + }); + + nock(BITGO_API_URL) + .post( + `/api/v2/${COIN}/key`, + (body) => body.source === 'bitgo' && body.enterprise === 'my-enterprise', + ) + .matchHeader('any', () => true) + .reply(200, { id: 'bitgo-key-id', pub: 'xpub_bitgo', source: 'bitgo' }); + + nockWalletAdd(walletId); + nockUpdateJobComplete(job.jobId, walletId); + + await handleKeyGenerationOperation(job, bridge, bitgo); + + nock.pendingMocks().should.have.length(0); + }); + }); +}); diff --git a/src/__tests__/api/master/bridgeClient.test.ts b/src/__tests__/api/master/bridgeClient.test.ts index 64e5d0e9..72daf9fe 100644 --- a/src/__tests__/api/master/bridgeClient.test.ts +++ b/src/__tests__/api/master/bridgeClient.test.ts @@ -25,6 +25,10 @@ describe('OsoBridgeClient', () => { client = new OsoBridgeClient(BASE_URL, TIMEOUT); }); + after(() => { + nock.enableNetConnect(); + }); + afterEach(() => nock.cleanAll()); describe('constructor', () => { diff --git a/src/__tests__/config.test.ts b/src/__tests__/config.test.ts index 4e2e3b98..f57b8e08 100644 --- a/src/__tests__/config.test.ts +++ b/src/__tests__/config.test.ts @@ -673,6 +673,7 @@ describe('Configuration', () => { process.env.TLS_MODE = 'disabled'; process.env.ASYNC_MODE = 'true'; process.env.AWM_ASYNC_URL = 'http://awm-async:8080'; + process.env.BITGO_ACCESS_TOKEN = 'test-token'; process.env.MBE_POLL_INTERVAL_MS = '5000'; process.env.MBE_JOB_TTL_S = '1800'; process.env.MBE_JOB_TTL_MPC_S = '3600'; @@ -691,6 +692,7 @@ describe('Configuration', () => { process.env.TLS_MODE = 'disabled'; process.env.ASYNC_MODE = 'true'; process.env.AWM_ASYNC_URL = 'http://awm-async:8080'; + process.env.BITGO_ACCESS_TOKEN = 'test-token'; const cfg = initConfig(); isMasterExpressConfig(cfg).should.be.true(); if (isMasterExpressConfig(cfg)) { @@ -710,6 +712,7 @@ describe('Configuration', () => { process.env.TLS_MODE = 'disabled'; process.env.ASYNC_MODE = 'true'; process.env.AWM_ASYNC_URL = 'http://awm-async:8080'; + process.env.BITGO_ACCESS_TOKEN = 'test-token'; delete process.env.ADVANCED_WALLET_MANAGER_URL; (() => initConfig()).should.not.throw(); }); @@ -718,6 +721,7 @@ describe('Configuration', () => { process.env.TLS_MODE = 'disabled'; process.env.ASYNC_MODE = 'true'; process.env.AWM_ASYNC_URL = 'http://awm-async:8080'; + process.env.BITGO_ACCESS_TOKEN = 'test-token'; process.env.MBE_POLL_INTERVAL_MS = '-1'; (() => initConfig()).should.throw('MBE_POLL_INTERVAL_MS must be a positive number, got -1'); }); @@ -726,6 +730,7 @@ describe('Configuration', () => { process.env.TLS_MODE = 'disabled'; process.env.ASYNC_MODE = 'true'; process.env.AWM_ASYNC_URL = 'http://awm-async:8080'; + process.env.BITGO_ACCESS_TOKEN = 'test-token'; process.env.MBE_JOB_TTL_S = '-1'; (() => initConfig()).should.throw('MBE_JOB_TTL_S must be a positive number, got -1'); }); @@ -734,6 +739,7 @@ describe('Configuration', () => { process.env.TLS_MODE = 'disabled'; process.env.ASYNC_MODE = 'true'; process.env.AWM_ASYNC_URL = 'http://awm-async:8080'; + process.env.BITGO_ACCESS_TOKEN = 'test-token'; process.env.MBE_JOB_TTL_MPC_S = '-1'; (() => initConfig()).should.throw('MBE_JOB_TTL_MPC_S must be a positive number, got -1'); }); diff --git a/src/__tests__/integration/asyncJobWorker.integ.test.ts b/src/__tests__/integration/asyncJobWorker.integ.test.ts new file mode 100644 index 00000000..e64fc2bb --- /dev/null +++ b/src/__tests__/integration/asyncJobWorker.integ.test.ts @@ -0,0 +1,124 @@ +import 'should'; +import assert from 'assert'; +import { startServices, IntegServices } from './helpers/setup'; +import { BridgeJobResponse } from '../../masterBitgoExpress/clients/bridgeClient.types'; +import { MockBridgeServer } from './helpers/mockBridgeServer'; + +const COIN = 'tbtc'; + +async function waitForJobCompletion( + bridge: MockBridgeServer, + jobId: string, + maxWaitMs: number, +): Promise { + const startTime = Date.now(); + const pollIntervalMs = 50; + + while (Date.now() - startTime < maxWaitMs) { + const patchCall = bridge.calls.find((c) => c.method === 'PATCH' && c.path === `/job/${jobId}`); + if (patchCall) return; + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); + } + + throw new Error( + `Job ${jobId} did not complete within ${maxWaitMs}ms. Recorded calls: ${JSON.stringify( + bridge.calls, + null, + 2, + )}`, + ); +} + +function makeAwaitingBitgoJob(overrides: Partial = {}): BridgeJobResponse { + return { + jobId: 'integ-job-123', + status: 'awaiting_bitgo', + version: 1, + coin: COIN, + operationType: 'multisig_keygen', + awmResponse: { + status: 200, + body: { + id: 'user-key-id', + pub: 'xpub_user', + type: 'independent', + source: 'user', + coin: COIN, + }, + }, + awmBackupResponse: { + status: 200, + body: { + id: 'backup-key-id', + pub: 'xpub_backup', + type: 'independent', + source: 'backup', + coin: COIN, + }, + }, + request: { body: { label: 'integ-test-wallet', enterprise: 'test-enterprise' } }, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + ...overrides, + }; +} + +describe('asyncJobWorker: end-to-end polling', () => { + let services: IntegServices; + + before(async () => { + services = await startServices({ asyncMode: true }); + }); + + after(async () => { + await services.teardown(); + }); + + beforeEach(() => { + services.bitgo.calls.length = 0; + assert(services.bridge, 'bridge service should be defined'); + services.bridge.calls.length = 0; + }); + + it('picks up an awaiting_bitgo keygen job, registers keychains, creates wallet, and PATCHes complete', async () => { + const jobId = 'integ-job-123'; + assert(services.bridge, 'bridge service should be defined'); + services.bridge.setPendingJobs([makeAwaitingBitgoJob()]); + + await waitForJobCompletion(services.bridge, jobId, 5000); + + const keyCalls = services.bitgo.calls.filter( + (c) => c.method === 'POST' && c.path.endsWith('/key'), + ); + keyCalls.should.have.length(3); + + const walletAddCalls = services.bitgo.calls.filter((c) => c.path.endsWith('/wallet/add')); + walletAddCalls.should.have.length(1); + + const patchCall = services.bridge.calls.find( + (c) => c.method === 'PATCH' && c.path === `/job/${jobId}`, + ); + assert(patchCall !== undefined, `expected PATCH /job/${jobId} to be called`); + const patchBody = patchCall.body as { status: string; result: { walletId: string } }; + patchBody.status.should.equal('complete'); + patchBody.result.should.have.property('walletId', 'test-wallet-id'); + }); + + it('PATCHes job failed when awmResponse.body is not a valid keychain', async () => { + const jobId = 'integ-job-123'; + assert(services.bridge, 'bridge service should be defined'); + services.bridge.setPendingJobs([ + makeAwaitingBitgoJob({ + awmResponse: { status: 200, body: { bad: 'shape' } }, + }), + ]); + + await waitForJobCompletion(services.bridge, jobId, 5000); + + const patchCall = services.bridge.calls.find( + (c) => c.method === 'PATCH' && c.path === `/job/${jobId}`, + ); + assert(patchCall !== undefined, `expected PATCH /job/${jobId} to be called`); + (patchCall.body as { status: string }).status.should.equal('failed'); + }); +}); diff --git a/src/__tests__/integration/helpers/mockBridgeServer.ts b/src/__tests__/integration/helpers/mockBridgeServer.ts index 41c53246..d3050d83 100644 --- a/src/__tests__/integration/helpers/mockBridgeServer.ts +++ b/src/__tests__/integration/helpers/mockBridgeServer.ts @@ -1,6 +1,7 @@ import * as http from 'http'; import express from 'express'; import { listen, close } from './servers'; +import { BridgeJobResponse } from '../../../masterBitgoExpress/clients/bridgeClient.types'; export interface MockBridgeCall { method: string; @@ -11,11 +12,14 @@ export interface MockBridgeCall { export interface MockBridgeServer { port: number; calls: MockBridgeCall[]; + /** Load jobs that GET /jobs?status=awaiting_bitgo will return on the next poll (one-shot). */ + setPendingJobs(jobs: BridgeJobResponse[]): void; close(): Promise; } export async function startMockBridgeServer(): Promise { const calls: MockBridgeCall[] = []; + let pendingJobs: BridgeJobResponse[] = []; const app = express(); app.use(express.json()); @@ -25,6 +29,18 @@ export async function startMockBridgeServer(): Promise { next(); }); + /** Worker polls this — returns pending jobs once, then empty so the worker doesn't loop forever */ + app.get('/jobs', (_req, res) => { + const jobs = pendingJobs; + pendingJobs = []; + res.json({ jobs }); + }); + + app.patch('/job/:jobId', (_req, res) => { + res.status(204).send(); + }); + + /** Async submit from MBE request handlers */ app.post('*', (_req, res) => { res.status(202).json({ jobId: 'test-job-id' }); }); @@ -32,5 +48,12 @@ export async function startMockBridgeServer(): Promise { const server = http.createServer(app); const port = await listen(server); - return { port, calls, close: () => close(server) }; + return { + port, + calls, + setPendingJobs(jobs: BridgeJobResponse[]) { + pendingJobs = jobs; + }, + close: () => close(server), + }; } diff --git a/src/__tests__/integration/helpers/setup.ts b/src/__tests__/integration/helpers/setup.ts index 392d8ac7..3dd388c4 100644 --- a/src/__tests__/integration/helpers/setup.ts +++ b/src/__tests__/integration/helpers/setup.ts @@ -1,12 +1,13 @@ import * as http from 'http'; import { app as awmApp } from '../../../advancedWalletManagerApp'; import { app as mbeApp } from '../../../masterBitGoExpressApp'; -import { AppMode, TlsMode, SigningMode } from '../../../shared/types'; +import { AppMode, MasterExpressConfig, TlsMode, SigningMode } from '../../../shared/types'; import { DEFAULT_ASYNC_MODE_CONFIG } from '../../api/master/testUtils'; import { listen, close, LOCALHOST } from './servers'; import { startMockKeyProviderServer, MockKeyProviderServer } from './mockKeyProviderServer'; import { startMockBitgoServer, MockBitgoServer } from './mockBitgoServer'; import { startMockBridgeServer, MockBridgeServer } from './mockBridgeServer'; +import { startAsyncJobWorker } from '../../../masterBitgoExpress/workers/asyncJobWorker'; export interface IntegServices { mbePort: number; @@ -45,39 +46,43 @@ export async function startServices(opts: StartServicesOptions = {}): Promise { clientCertAllowSelfSigned, recoveryMode: readEnvVar('RECOVERY_MODE') === 'true', asyncModeConfig: readAsyncModeConfig(isAsyncMode), + bitgoAccessToken: readEnvVar('BITGO_ACCESS_TOKEN'), }; } @@ -492,6 +496,7 @@ function mergeMasterExpressConfigs( clientCertAllowSelfSigned: get('clientCertAllowSelfSigned'), recoveryMode: get('recoveryMode'), asyncModeConfig: get('asyncModeConfig'), + bitgoAccessToken: get('bitgoAccessToken'), }; } diff --git a/src/masterBitGoExpressApp.ts b/src/masterBitGoExpressApp.ts index 476e7543..91a7b816 100644 --- a/src/masterBitGoExpressApp.ts +++ b/src/masterBitGoExpressApp.ts @@ -16,6 +16,7 @@ import { } from './shared/appUtils'; import logger from './shared/logger'; import { setupRoutes } from './masterBitgoExpress/routers/masterBitGoExpress'; +import { startAsyncJobWorker } from './masterBitgoExpress/workers/asyncJobWorker'; /** * Create a startup function which will be run upon server initialization @@ -143,4 +144,8 @@ export async function init(): Promise { } else { server.listen(port, bind, startup(cfg, baseUri)); } + + if (cfg.asyncModeConfig.enabled) { + startAsyncJobWorker(cfg); + } } diff --git a/src/masterBitgoExpress/clients/advancedWalletManagerClient.ts b/src/masterBitgoExpress/clients/advancedWalletManagerClient.ts index ff39ca61..fcd709c6 100644 --- a/src/masterBitgoExpress/clients/advancedWalletManagerClient.ts +++ b/src/masterBitgoExpress/clients/advancedWalletManagerClient.ts @@ -1,3 +1,4 @@ +import { z } from 'zod'; import assert from 'assert'; import https from 'https'; import superagent from 'superagent'; @@ -71,14 +72,16 @@ interface CreateIndependentKeychainParams { seed?: string; } -export interface IndependentKeychainResponse { - id: string; - pub: string; - encryptedPrv?: string; - type: 'independent'; - source: 'user' | 'backup' | 'bitgo'; - coin: string; -} +export const IndependentKeychainResponseSchema = z.object({ + id: z.string(), + pub: z.string(), + encryptedPrv: z.string().optional(), + type: z.literal('independent'), + source: z.enum(['user', 'backup', 'bitgo']), + coin: z.string(), +}); + +export type IndependentKeychainResponse = z.infer; interface SignMultisigOptions { txPrebuild: TransactionPrebuild; diff --git a/src/masterBitgoExpress/clients/bridgeClient.types.ts b/src/masterBitgoExpress/clients/bridgeClient.types.ts index 402af437..becec292 100644 --- a/src/masterBitgoExpress/clients/bridgeClient.types.ts +++ b/src/masterBitgoExpress/clients/bridgeClient.types.ts @@ -20,8 +20,17 @@ export const JobStatusSchema = z.enum([ ]); export type JobStatus = z.infer; +export const AwmResponseSchema = z.object({ + status: z.number(), + body: z.record(z.unknown()), + error: z.string().optional(), +}); +export type AwmResponse = z.infer; + const unknownOptional = z.unknown().optional(); +const bridgeTimestampSchema = z.union([z.number(), z.string()]); + export const SubmitResponseSchema = z.object({ jobId: z.string(), }); @@ -33,15 +42,24 @@ export const BridgeJobResponseSchema = z.object({ version: z.number(), coin: z.string(), operationType: OperationTypeSchema, - awmResponse: unknownOptional, - awmBackupResponse: unknownOptional, + request: z + .object({ + endpoint: z.string().optional(), + method: z.string().optional(), + body: z.record(z.unknown()).optional(), + headers: z.record(z.string()).optional(), + }) + .optional(), + awmResponse: AwmResponseSchema.optional(), + awmBackupResponse: AwmResponseSchema.optional(), result: unknownOptional, error: z.string().optional(), currentRound: z.number().optional(), totalRounds: z.number().optional(), sessionState: unknownOptional, - createdAt: z.string(), - updatedAt: z.string(), + createdAt: bridgeTimestampSchema, + updatedAt: bridgeTimestampSchema, + ttl: z.number().optional(), }); export type BridgeJobResponse = z.infer; diff --git a/src/masterBitgoExpress/handlers/handleGenerateWallet.ts b/src/masterBitgoExpress/handlers/handleGenerateWallet.ts index da6d866c..6cd56afd 100644 --- a/src/masterBitgoExpress/handlers/handleGenerateWallet.ts +++ b/src/masterBitgoExpress/handlers/handleGenerateWallet.ts @@ -13,6 +13,7 @@ import { BadRequestError } from '../../shared/errors'; import { KeySource } from '../../shared/types'; import { submitJobViaBridgeClient } from './utils/asyncUtils'; import { createOnchainKeyGenCallback } from './walletGenerationCallbacks'; +import { getBaseWalletParams } from './utils/walletCreationUtils'; /** * Request handler for generating an advanced wallet. @@ -63,7 +64,6 @@ async function handleGenerateOnChainWallet( multisigType: 'onchain', createKeychainCallback, }); - return { ...result, wallet: result.wallet.toJSON() }; } @@ -97,12 +97,8 @@ async function handleGenerateMpcWallet( const walletParams: SupplementGenerateWalletOptions = { ...req.decoded, - label: label, - m: 2, - n: 3, - keys: [], - type: 'advanced', - multisigType: 'tss', + label, + ...getBaseWalletParams('tss'), }; if (!_.isUndefined(enterprise)) { diff --git a/src/masterBitgoExpress/handlers/utils/walletCreationUtils.ts b/src/masterBitgoExpress/handlers/utils/walletCreationUtils.ts new file mode 100644 index 00000000..6cfa6e19 --- /dev/null +++ b/src/masterBitgoExpress/handlers/utils/walletCreationUtils.ts @@ -0,0 +1,85 @@ +import { + Keychain, + KeychainsTriplet, + promiseProps, + RequestTracer, + SupplementGenerateWalletOptions, + Wallet, + WalletWithKeychains, +} from '@bitgo-beta/sdk-core'; +import { BitGoAPI } from '@bitgo-beta/sdk-api'; +import _ from 'lodash'; +import { IndependentKeychainResponse } from '../../clients/advancedWalletManagerClient'; +import coinFactory from '../../../shared/coinFactory'; + +export function getBaseWalletParams(multisigType: 'onchain' | 'tss') { + return { m: 2, n: 3, keys: [] as string[], type: 'advanced', multisigType } as const; +} + +export interface RegisterKeychainsAndCreateWalletParams { + coin: string; + bitgo: BitGoAPI; + userKeychain: IndependentKeychainResponse; + backupKeychain: IndependentKeychainResponse; + walletParams: SupplementGenerateWalletOptions; + isDistributedCustody?: boolean; +} + +export async function registerKeychainsAndCreateWallet({ + bitgo, + coin, + walletParams, + userKeychain, + backupKeychain, + isDistributedCustody, +}: RegisterKeychainsAndCreateWalletParams): Promise { + const baseCoin = await coinFactory.getCoin(coin, bitgo); + const reqId = new RequestTracer(); + + const registerKeychain = async (keyChain: IndependentKeychainResponse): Promise => { + const registered = await baseCoin.keychains().add({ + pub: keyChain.pub, + keyType: keyChain.type, + source: keyChain.source, + reqId, + }); + return _.extend({}, registered, keyChain); + }; + + const { + userKeychain: registeredUser, + backupKeychain: registeredBackup, + bitgoKeychain, + }: KeychainsTriplet = await promiseProps({ + userKeychain: registerKeychain(userKeychain), + backupKeychain: registerKeychain(backupKeychain), + bitgoKeychain: baseCoin.keychains().createBitGo({ + enterprise: walletParams.enterprise, + keyType: 'independent', + reqId, + isDistributedCustody, + }), + }); + + const keychains: KeychainsTriplet = { + userKeychain: registeredUser, + backupKeychain: registeredBackup, + bitgoKeychain, + }; + + const finalWalletParams = await baseCoin.supplementGenerateWallet( + { ...walletParams, keys: [registeredUser.id, registeredBackup.id, bitgoKeychain.id] }, + keychains, + ); + + bitgo.setRequestTracer(reqId); + const newWallet = await bitgo.post(baseCoin.url('/wallet/add')).send(finalWalletParams).result(); + + return { + wallet: new Wallet(bitgo, baseCoin, newWallet), + userKeychain: registeredUser, + backupKeychain: registeredBackup, + bitgoKeychain, + responseType: 'WalletWithKeychains', + }; +} diff --git a/src/masterBitgoExpress/workers/asyncJobWorker.ts b/src/masterBitgoExpress/workers/asyncJobWorker.ts new file mode 100644 index 00000000..4c59cae0 --- /dev/null +++ b/src/masterBitgoExpress/workers/asyncJobWorker.ts @@ -0,0 +1,146 @@ +import { CreateBitGoOptions, SupplementGenerateWalletOptions } from '@bitgo-beta/sdk-core'; +import { BitGoAPI } from '@bitgo-beta/sdk-api'; +import { OsoBridgeClient } from '../clients/bridgeClient'; +import { AwmResponseSchema, BridgeJobResponse } from '../clients/bridgeClient.types'; +import { + IndependentKeychainResponseSchema, + type IndependentKeychainResponse, +} from '../clients/advancedWalletManagerClient'; +import { + getBaseWalletParams, + registerKeychainsAndCreateWallet, +} from '../handlers/utils/walletCreationUtils'; +import { MasterExpressConfig } from '../../shared/types'; +import logger from '../../shared/logger'; + +const ASYNC_OPERATIONS_TO_HANDLERS: Partial< + Record< + BridgeJobResponse['operationType'], + (job: BridgeJobResponse, bridge: OsoBridgeClient, bitgo: BitGoAPI) => Promise + > +> = { + multisig_keygen: handleKeyGenerationOperation, +}; + +function parseKeychainFromAwmResponse( + awmResponse: BridgeJobResponse['awmResponse'], + field: 'awmResponse' | 'awmBackupResponse', +): IndependentKeychainResponse { + if (awmResponse === undefined) { + throw new Error(`job missing ${field}`); + } + const envelope = AwmResponseSchema.safeParse(awmResponse); + if (!envelope.success) { + throw new Error(`job ${field} is not a valid AwmResponse (expected { status, body })`); + } + const r = envelope.data; + if (r.status >= 400 || r.error) { + throw new Error(r.error ?? `AWM ${field} returned status ${r.status}`); + } + return IndependentKeychainResponseSchema.parse(r.body); +} + +export function startAsyncJobWorker(cfg: MasterExpressConfig): () => void { + const logPrefix = '[asyncJobWorker:startAsyncJobWorker]'; + const bridge = new OsoBridgeClient(cfg.asyncModeConfig.awmAsyncUrl, cfg.timeout); + const bitgo = new BitGoAPI({ + env: cfg.env, + customRootURI: cfg.customRootUri, + accessToken: cfg.bitgoAccessToken, + }); + + let isWorkerRunning = false; + const handle = setInterval(async () => { + if (isWorkerRunning) { + logger.warn(`${logPrefix} previous job still running, skipping this interval`); + return; + } + isWorkerRunning = true; + try { + await processPendingJobs(bridge, bitgo); + } catch (err) { + logger.error(`${logPrefix} unhandled error: ${JSON.stringify(err)}`); + } finally { + isWorkerRunning = false; + } + }, cfg.asyncModeConfig.pollIntervalInMs); + + logger.info(`${logPrefix} started, polling every ${cfg.asyncModeConfig.pollIntervalInMs}ms`); + return () => clearInterval(handle); +} + +export async function processPendingJobs(bridge: OsoBridgeClient, bitgo: BitGoAPI): Promise { + const { jobs } = await bridge.listJobs({ status: 'awaiting_bitgo' }); + if (jobs.length === 0) { + logger.info('[asyncJobWorker:processPendingJobs] no awaiting_bitgo jobs found'); + return; + } + + const logPrefix = '[asyncJobWorker:processPendingJobs]'; + logger.info(`${logPrefix} found ${jobs.length} awaiting_bitgo jobs`); + + await Promise.allSettled(jobs.map((job) => processJob(job, bridge, bitgo))); +} + +async function processJob( + job: BridgeJobResponse, + bridge: OsoBridgeClient, + bitgo: BitGoAPI, +): Promise { + const logPrefix = '[asyncJobWorker:processJob]'; + const handler = ASYNC_OPERATIONS_TO_HANDLERS[job.operationType]; + if (!handler) { + logger.debug(`${logPrefix} no handler for operationType ${job.operationType}, skipping`); + return; + } + try { + await handler(job, bridge, bitgo); + } catch (err) { + logger.error(`${logPrefix} job ${job.jobId} failed: ${JSON.stringify(err)}`); + await bridge.updateJob({ + jobId: job.jobId, + version: job.version, + status: 'failed', + error: (err as Error).message, + }); + } +} + +export async function handleKeyGenerationOperation( + job: BridgeJobResponse, + bridge: OsoBridgeClient, + bitgo: BitGoAPI, +): Promise { + const logPrefix = '[asyncJobWorker:handleKeyGenerationOperation]'; + const userKeychain = parseKeychainFromAwmResponse(job.awmResponse, 'awmResponse'); + const backupKeychain = parseKeychainFromAwmResponse(job.awmBackupResponse, 'awmBackupResponse'); + const { jobId, coin, version } = job; + + const requestBody = (job.request?.body ?? {}) as unknown as SupplementGenerateWalletOptions & + Pick; + + const walletParams: SupplementGenerateWalletOptions = { + ...requestBody, + ...getBaseWalletParams('onchain'), + }; + + const result = await registerKeychainsAndCreateWallet({ + bitgo, + walletParams, + userKeychain, + backupKeychain, + coin, + isDistributedCustody: requestBody.isDistributedCustody, + }); + + logger.info(`${logPrefix} job ${jobId} created wallet - updating job status to complete`); + + await bridge.updateJob({ + jobId, + version, + status: 'complete', + result: { walletId: result.wallet.id() }, + }); + + logger.info(`${logPrefix} job ${jobId} complete, walletId ${result.wallet.id()}`); +} diff --git a/src/shared/types/index.ts b/src/shared/types/index.ts index 31b08e29..b1b4ce66 100644 --- a/src/shared/types/index.ts +++ b/src/shared/types/index.ts @@ -113,6 +113,7 @@ export interface MasterExpressConfig extends BaseConfig { clientCertAllowSelfSigned?: boolean; recoveryMode?: boolean; asyncModeConfig: AsyncModeConfig; + bitgoAccessToken?: string; } // Union type for the configuration