Skip to content

[Bug] Producer warns of send timeout when no messages queued up #576

@meisly

Description

@meisly

Search before asking

  • I searched in the issues and found nothing similar.

Version

We're using the pulsar-client-node v1.17.0 which uses the pulsar-client-cpp v4.0.1 on Ubuntu 20.04.6

Minimal reproduce step

Create a producer and leave it connected while no messages are being sent to it

What did you expect to see?

I expect it to connect and do nothing

What did you see instead?

It warns of a send timeout every 30 seconds (the default sendTimeoutMs value for the producer) when idle but works without issues when there are messages to be produced. Here is an example of the log warning:

[Wed May 13 2026 20:49:08] WARN: ProducerImpl-874() => [persistent://redacted, clusterName] Send timeout due to queueing delay, connection: [redacted -> redacted] , pending messages: 0, queue size: 0

Anything else?

I suspect that code block that does the timeout logging (shown below) should not be called when the pendingMessageQueue is empty.

    auto cnx = getCnx().lock();
    if (cnx) {
        LOG_WARN(getName() << "Send timeout due to queueing delay, connection: " << cnx->cnxString()
                           << ", pending messages: " << pendingMessages.size()
                           << ", queue size: " << pendingMessagesQueue_.size());
    } else {
        LOG_WARN(getName() << "Send timeout due to queueing delay, no connection, pending messages: "
                           << pendingMessages.size() << ", queue size: " << pendingMessagesQueue_.size());
    }
    for (const auto& op : pendingMessages) {
        op->complete(ResultTimeout, {});
    }
}

Maybe it could be moved to the else block immediately above it (shown below)?

    if (pendingMessagesQueue_.empty()) {
        // If there are no pending messages, reset the timeout to the configured value.
        LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue");
        asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
    } else {
        // If there is at least one message, calculate the diff between the message timeout and
        // the current time.
        auto diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now();
        if (toMillis(diff) <= 0) {
            // The diff is less than or equal to zero, meaning that the message has been expired.
            LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks.");
            pendingMessages = getPendingCallbacksWhenFailed();
            // Since the pending queue is cleared now, set timer to expire after configured value.
            asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
        } else {
            // The diff is greater than zero, set the timeout to the diff value
            LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff.count());
            asyncWaitSendTimeout(diff);
        }
    }

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions