diff --git a/constants/index.ts b/constants/index.ts index 2201f762..187b819d 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -272,6 +272,11 @@ export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 2 export const CHRONIK_INITIALIZATION_DELAY = 2000 export const MEMPOOL_PROCESS_DELAY = 100 +// Number of tries before failing a chronik call (min 1) +export const CHRONIK_TRIES = 3 +// Initial delay between retries in milliseconds. This is multiplied by 2 for each retry. +export const CHRONIK_RETRY_DELAY_MS = 1000 + /* WIP RENAME ALL THOSE */ // When fetching some address transactions, number of transactions to fetch at a time. // On chronik, the max allowed is 200 diff --git a/services/chronikService.ts b/services/chronikService.ts index 75d222ff..470eb618 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1,7 +1,7 @@ import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client' import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs' import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes' -import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY } from 'constants/index' +import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY, CHRONIK_TRIES, CHRONIK_RETRY_DELAY_MS } from 'constants/index' import { productionAddresses } from 'prisma-local/seeds/addresses' import prisma from 'prisma-local/clientInstance' import { @@ -237,15 +237,44 @@ export class ChronikBlockchainClient { if (NETWORK_IDS_FROM_SLUGS[networkSlug] !== this.networkId) { throw new Error(RESPONSE_MESSAGES.INVALID_NETWORK_SLUG_400.message) } } + private async chronikCallWithRetry ( + label: string, + fn: () => Promise, + tries = CHRONIK_TRIES, + delayMs = CHRONIK_RETRY_DELAY_MS + ): Promise { + for (let i = 0; i < tries - 1; i++) { + try { + return await fn() + } catch (e: unknown) { + const delay = delayMs * Math.pow(2, i) + const errMsg = e instanceof Error ? e.message : String(e) + console.warn( + `${this.CHRONIK_MSG_PREFIX}: ${label} failed (${errMsg}), ` + + `attempt ${i + 1}/${tries}, retrying in ${(delay / 1000).toFixed(1)}s...` + ) + await new Promise(resolve => setTimeout(resolve, delay)) + } + } + + return await fn() + } + async getBlockchainInfo (networkSlug: string): Promise { this.validateNetwork(networkSlug) - const blockchainInfo = await this.chronik.blockchainInfo() + const blockchainInfo = await this.chronikCallWithRetry( + 'blockchainInfo', + async () => await this.chronik.blockchainInfo() + ) return { height: blockchainInfo.tipHeight, hash: blockchainInfo.tipHash } } async getBlockInfo (networkSlug: string, height: number): Promise { this.validateNetwork(networkSlug) - const blockInfo: BlockInfo = (await this.chronik.block(height)).blockInfo + const blockInfo: BlockInfo = (await this.chronikCallWithRetry( + `block ${height}`, + async () => await this.chronik.block(height) + )).blockInfo return { hash: blockInfo.hash, height: blockInfo.height, timestamp: blockInfo.timestamp } } @@ -315,7 +344,10 @@ export class ChronikBlockchainClient { public async getPaginatedTxs (addressString: string, page: number, pageSize: number): Promise { const { type, hash160 } = toHash160(addressString) - const txsPage = (await this.chronik.script(type, hash160).history(page, pageSize)) + const txsPage = await this.chronikCallWithRetry( + `script history ${addressString} page ${page}`, + async () => await this.chronik.script(type, hash160).history(page, pageSize) + ) // If there are too many txs, this might be too expensive to sync. Raise an // error to skip this address. @@ -622,7 +654,10 @@ export class ChronikBlockchainClient { private async getUtxos (address: string): Promise { const { type, hash160 } = toHash160(address) - const scriptsUtxos = await this.chronik.script(type, hash160).utxos() + const scriptsUtxos = await this.chronikCallWithRetry( + `script utxos ${address}`, + async () => await this.chronik.script(type, hash160).utxos() + ) return scriptsUtxos.utxos } @@ -632,7 +667,10 @@ export class ChronikBlockchainClient { } async getTransactionDetails (hash: string): Promise { - const tx = await this.chronik.tx(hash) + const tx = await this.chronikCallWithRetry( + `tx ${hash}`, + async () => await this.chronik.tx(hash) + ) const details: TransactionDetails = { hash: tx.txid, @@ -755,20 +793,17 @@ export class ChronikBlockchainClient { } } - private async fetchTxWithRetry (txid: string, tries = 3, delayMs = 1000): Promise { - for (let i = 0; i < tries; i++) { - try { - return await this.chronik.tx(txid) - } catch (e: any) { - const msg = String(e?.message ?? e) - const is404 = /not found in the index|404/.test(msg) - if (!is404 || i === tries - 1) throw e - const delay = delayMs * Math.pow(2, i) - console.error(`Got a 404 Error trying to fetch tx ${txid} on the attempt number ${i + 1}, waiting ${(delay / 1000).toFixed(1)}s...`) - await new Promise(resolve => setTimeout(resolve, delay)) - } - } - throw new Error('unreachable') + private async fetchTxWithRetry ( + txid: string, + tries = CHRONIK_TRIES, + delayMs = CHRONIK_RETRY_DELAY_MS + ): Promise { + return await this.chronikCallWithRetry( + `tx ${txid}`, + async () => await this.chronik.tx(txid), + tries, + delayMs + ) } private async processWsMessage (msg: WsMsgClient): Promise { @@ -909,17 +944,29 @@ export class ChronikBlockchainClient { return broadcastTxData } + private async getBlockTxsPage ( + blockHash: string, + page: number, + pageSize: number + ): Promise { + const blockPage = await this.chronikCallWithRetry( + `blockTxs ${blockHash} page ${page}`, + async () => await this.chronik.blockTxs(blockHash, page, pageSize) + ) + return blockPage.txs + } + private async syncBlockTransactions (blockHash: string): Promise { let page = 0 const pageSize = 200 - let blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs + let blockPageTxs = await this.getBlockTxsPage(blockHash, page, pageSize) let blockTxsToSync: Tx[] = [] const confirmedTxHashes = new Set(this.confirmedTxsHashesFromLastBlock) while (blockPageTxs.length > 0 && blockTxsToSync.length < confirmedTxHashes.size) { const thisBlockTxsToSync = blockPageTxs.filter(tx => confirmedTxHashes.has(tx.txid)) blockTxsToSync = [...blockTxsToSync, ...thisBlockTxsToSync] page += 1 - blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs + blockPageTxs = await this.getBlockTxsPage(blockHash, page, pageSize) } for (const transaction of blockTxsToSync) { const addressesWithTransactions = await this.getAddressesForTransaction(transaction) @@ -1396,8 +1443,8 @@ class MultiBlockchainClient { public async syncMissedTransactions (): Promise { await this.waitForStart() - await this.clients.ecash.syncMissedTransactions() await this.clients.bitcoincash.syncMissedTransactions() + await this.clients.ecash.syncMissedTransactions() } public async syncAndSubscribeAddresses (addresses: Address[]): Promise { diff --git a/tests/unittests/chronikService.test.ts b/tests/unittests/chronikService.test.ts index b6e989bf..08b0ae95 100644 --- a/tests/unittests/chronikService.test.ts +++ b/tests/unittests/chronikService.test.ts @@ -1326,6 +1326,33 @@ describe('Regression: mempool + retries + onMessage + cache TTL', () => { expect(txMock).toHaveBeenCalledTimes(2) }) + it('chronikCallWithRetry retries on error then succeeds', async () => { + process.env.WS_AUTH_KEY = 'test-auth-key' + const client = new ChronikBlockchainClient('ecash') + await new Promise(resolve => setImmediate(resolve)) + + const fn = jest.fn() + .mockRejectedValueOnce(new Error('Transaction not found in the index')) + .mockResolvedValueOnce({ txs: [{ txid: 'tx1' }], numTxs: 1 }) + + const result = await (client as any).chronikCallWithRetry('script history', fn, 3, 1) + + expect(result.txs).toHaveLength(1) + expect(fn).toHaveBeenCalledTimes(2) + }) + + it('chronikCallWithRetry retries on any error until tries exhausted', async () => { + process.env.WS_AUTH_KEY = 'test-auth-key' + const client = new ChronikBlockchainClient('ecash') + await new Promise(resolve => setImmediate(resolve)) + + const fn = jest.fn().mockRejectedValue(new Error('connection refused')) + await expect( + (client as any).chronikCallWithRetry('test', fn, 3, 1) + ).rejects.toThrow('connection refused') + expect(fn).toHaveBeenCalledTimes(3) + }) + it('clearOldMessages expires entries by TTL and from the correct maps', () => { process.env.WS_AUTH_KEY = 'test-auth-key' const client = new ChronikBlockchainClient('ecash')