Fix MQTT close() blocking indefinitely on pending publishes#1650
Fix MQTT close() blocking indefinitely on pending publishes#1650robjarawan wants to merge 6 commits into
Conversation
The while loop waiting for pending_publishes to drain has no upper bound. If the broker never acks, close() hangs forever with EBO capped at 64s. Add a max_wait bound (uses timeout setting, default 300s) so close() eventually gives up and proceeds with disconnect.
petersilva
left a comment
There was a problem hiding this comment.
This one I am not sure about... Sometimes it is a good idea to loop forever.
I do not see any good reason to give up. I don't see what problem ignoring a failure and moving on solves for us.
|
@robjarawan another example... I'd like to see the MQTT tests run on this branch. |
|
MQTT flow tests are running on my fork but the sr_insects workflow keeps timing out on the "Add and Remove configs" step connecting to hpfx.collab.science.gc.ca. Unit tests pass. Is there a way to run the MQTT flow tests locally without the sr_insects infra? |
|
what do you mean "sr_insects" infra? all it does is run tests using local brokers. the local brokers are setup using scripts in sr3/travis/flow_autoconfig*.sh ... the actual set up of the brokers is done by stuff from sr3/travis. on a vanilla ubuntu linux, you run sr3/travis/flow_autoconfig.sh which will install and set up a rabbitmq broker, and credentials for it. then you can run flow_autoconfig_add_mosquitto.sh to do the same for a mosquitto broker to run mqtt traffic. You can run any flow you want on those brokers. It's not sr_insects specific. just need the credentials (which should be written to ~/.config/sr3/credentials.conf by the autoconfig scripts. |
…inite-block # Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
…se-indefinite-block
|
Ran the MQTT tests locally on this branch. Unit tests: 32/32 passed Flow test (dynamic_flow with MQP=mqtt): Could not complete. `shovel/t_dd1_f00` connects to `amqps://hpfx.collab.science.gc.ca/` as its message source, which is not reachable from this server (same reason GitHub Actions timed out on the "Add and Remove configs" step). Static_flow with MQP=mqtt is also broken for a different reason: the post components run to completion before the MQTT subscribers connect, and MQTT does not queue messages for late subscribers the way AMQP does, so subscribers receive nothing. If you know a way to run the MQTT flow test that does not require hpfx connectivity, I am happy to try it. Otherwise the unit tests are the best coverage I can provide from this environment. |
What
close()waits in a while loop forpending_publishesto drain. If the broker never acks (disconnect, crash, network partition), the loop runs forever with EBO capped at 64s. Process hangs on shutdown.Change
Add a
max_waitbound (usestimeoutsetting, defaults to 300s). If exceeded, logs a warning and proceeds with disconnect instead of hanging.