Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 2
export const CHRONIK_INITIALIZATION_DELAY = 2000
export const MEMPOOL_PROCESS_DELAY = 100

// Number of tries before failing a chronik call (min 1)
export const CHRONIK_TRIES = 3
// Initial delay between retries in milliseconds. This is multiplied by 2 for each retry.
export const CHRONIK_RETRY_DELAY_MS = 1000

/* WIP RENAME ALL THOSE */
// When fetching some address transactions, number of transactions to fetch at a time.
// On chronik, the max allowed is 200
Expand Down
93 changes: 70 additions & 23 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client'
import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs'
import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes'
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY } from 'constants/index'
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY, CHRONIK_TRIES, CHRONIK_RETRY_DELAY_MS } from 'constants/index'
import { productionAddresses } from 'prisma-local/seeds/addresses'
import prisma from 'prisma-local/clientInstance'
import {
Expand Down Expand Up @@ -237,15 +237,44 @@ export class ChronikBlockchainClient {
if (NETWORK_IDS_FROM_SLUGS[networkSlug] !== this.networkId) { throw new Error(RESPONSE_MESSAGES.INVALID_NETWORK_SLUG_400.message) }
}

private async chronikCallWithRetry<T> (
label: string,
fn: () => Promise<T>,
tries = CHRONIK_TRIES,
delayMs = CHRONIK_RETRY_DELAY_MS
): Promise<T> {
for (let i = 0; i < tries - 1; i++) {
try {
return await fn()
} catch (e: unknown) {
const delay = delayMs * Math.pow(2, i)
const errMsg = e instanceof Error ? e.message : String(e)
console.warn(
`${this.CHRONIK_MSG_PREFIX}: ${label} failed (${errMsg}), ` +
`attempt ${i + 1}/${tries}, retrying in ${(delay / 1000).toFixed(1)}s...`
)
await new Promise(resolve => setTimeout(resolve, delay))
}
}

return await fn()
}

async getBlockchainInfo (networkSlug: string): Promise<BlockchainInfo> {
this.validateNetwork(networkSlug)
const blockchainInfo = await this.chronik.blockchainInfo()
const blockchainInfo = await this.chronikCallWithRetry(
'blockchainInfo',
async () => await this.chronik.blockchainInfo()
)
return { height: blockchainInfo.tipHeight, hash: blockchainInfo.tipHash }
}

async getBlockInfo (networkSlug: string, height: number): Promise<SimpleBlockInfo> {
this.validateNetwork(networkSlug)
const blockInfo: BlockInfo = (await this.chronik.block(height)).blockInfo
const blockInfo: BlockInfo = (await this.chronikCallWithRetry(
`block ${height}`,
async () => await this.chronik.block(height)
)).blockInfo
return { hash: blockInfo.hash, height: blockInfo.height, timestamp: blockInfo.timestamp }
}

Expand Down Expand Up @@ -315,7 +344,10 @@ export class ChronikBlockchainClient {

public async getPaginatedTxs (addressString: string, page: number, pageSize: number): Promise<Tx[]> {
const { type, hash160 } = toHash160(addressString)
const txsPage = (await this.chronik.script(type, hash160).history(page, pageSize))
const txsPage = await this.chronikCallWithRetry(
`script history ${addressString} page ${page}`,
async () => await this.chronik.script(type, hash160).history(page, pageSize)
)

// If there are too many txs, this might be too expensive to sync. Raise an
// error to skip this address.
Expand Down Expand Up @@ -622,7 +654,10 @@ export class ChronikBlockchainClient {

private async getUtxos (address: string): Promise<ScriptUtxo[]> {
const { type, hash160 } = toHash160(address)
const scriptsUtxos = await this.chronik.script(type, hash160).utxos()
const scriptsUtxos = await this.chronikCallWithRetry(
`script utxos ${address}`,
async () => await this.chronik.script(type, hash160).utxos()
)
return scriptsUtxos.utxos
}

Expand All @@ -632,7 +667,10 @@ export class ChronikBlockchainClient {
}

async getTransactionDetails (hash: string): Promise<TransactionDetails> {
const tx = await this.chronik.tx(hash)
const tx = await this.chronikCallWithRetry(
`tx ${hash}`,
async () => await this.chronik.tx(hash)
)

const details: TransactionDetails = {
hash: tx.txid,
Expand Down Expand Up @@ -755,20 +793,17 @@ export class ChronikBlockchainClient {
}
}

private async fetchTxWithRetry (txid: string, tries = 3, delayMs = 1000): Promise<Tx> {
for (let i = 0; i < tries; i++) {
try {
return await this.chronik.tx(txid)
} catch (e: any) {
const msg = String(e?.message ?? e)
const is404 = /not found in the index|404/.test(msg)
if (!is404 || i === tries - 1) throw e
const delay = delayMs * Math.pow(2, i)
console.error(`Got a 404 Error trying to fetch tx ${txid} on the attempt number ${i + 1}, waiting ${(delay / 1000).toFixed(1)}s...`)
await new Promise(resolve => setTimeout(resolve, delay))
}
}
throw new Error('unreachable')
private async fetchTxWithRetry (
txid: string,
tries = CHRONIK_TRIES,
delayMs = CHRONIK_RETRY_DELAY_MS
): Promise<Tx> {
return await this.chronikCallWithRetry(
`tx ${txid}`,
async () => await this.chronik.tx(txid),
tries,
delayMs
)
}

private async processWsMessage (msg: WsMsgClient): Promise<void> {
Expand Down Expand Up @@ -909,17 +944,29 @@ export class ChronikBlockchainClient {
return broadcastTxData
}

private async getBlockTxsPage (
blockHash: string,
page: number,
pageSize: number
): Promise<Tx[]> {
const blockPage = await this.chronikCallWithRetry(
`blockTxs ${blockHash} page ${page}`,
async () => await this.chronik.blockTxs(blockHash, page, pageSize)
)
return blockPage.txs
}

private async syncBlockTransactions (blockHash: string): Promise<void> {
let page = 0
const pageSize = 200
let blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs
let blockPageTxs = await this.getBlockTxsPage(blockHash, page, pageSize)
let blockTxsToSync: Tx[] = []
const confirmedTxHashes = new Set(this.confirmedTxsHashesFromLastBlock)
while (blockPageTxs.length > 0 && blockTxsToSync.length < confirmedTxHashes.size) {
const thisBlockTxsToSync = blockPageTxs.filter(tx => confirmedTxHashes.has(tx.txid))
blockTxsToSync = [...blockTxsToSync, ...thisBlockTxsToSync]
page += 1
blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs
blockPageTxs = await this.getBlockTxsPage(blockHash, page, pageSize)
}
for (const transaction of blockTxsToSync) {
const addressesWithTransactions = await this.getAddressesForTransaction(transaction)
Expand Down Expand Up @@ -1396,8 +1443,8 @@ class MultiBlockchainClient {

public async syncMissedTransactions (): Promise<void> {
await this.waitForStart()
await this.clients.ecash.syncMissedTransactions()
await this.clients.bitcoincash.syncMissedTransactions()
await this.clients.ecash.syncMissedTransactions()
}

public async syncAndSubscribeAddresses (addresses: Address[]): Promise<SyncAndSubscriptionReturn> {
Expand Down
27 changes: 27 additions & 0 deletions tests/unittests/chronikService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,33 @@ describe('Regression: mempool + retries + onMessage + cache TTL', () => {
expect(txMock).toHaveBeenCalledTimes(2)
})

it('chronikCallWithRetry retries on error then succeeds', async () => {
process.env.WS_AUTH_KEY = 'test-auth-key'
const client = new ChronikBlockchainClient('ecash')
await new Promise(resolve => setImmediate(resolve))

const fn = jest.fn()
.mockRejectedValueOnce(new Error('Transaction not found in the index'))
.mockResolvedValueOnce({ txs: [{ txid: 'tx1' }], numTxs: 1 })

const result = await (client as any).chronikCallWithRetry('script history', fn, 3, 1)

expect(result.txs).toHaveLength(1)
expect(fn).toHaveBeenCalledTimes(2)
})

it('chronikCallWithRetry retries on any error until tries exhausted', async () => {
process.env.WS_AUTH_KEY = 'test-auth-key'
const client = new ChronikBlockchainClient('ecash')
await new Promise(resolve => setImmediate(resolve))

const fn = jest.fn().mockRejectedValue(new Error('connection refused'))
await expect(
(client as any).chronikCallWithRetry('test', fn, 3, 1)
).rejects.toThrow('connection refused')
expect(fn).toHaveBeenCalledTimes(3)
})

it('clearOldMessages expires entries by TTL and from the correct maps', () => {
process.env.WS_AUTH_KEY = 'test-auth-key'
const client = new ChronikBlockchainClient('ecash')
Expand Down
Loading