Hello,
I’m trying to set a connection timeout for a client(blocking/async), to put simply, i just want client throw an exception when it can not connect in a certain amount of time. I’ve tried two approaches so far:

  • Async client
  •  Mqtt5AsyncClient client = MqttClient.builder()
              .useMqttVersion5()
              .serverHost(hivemqHost)
              .serverPort(hivemqPort)
              .identifier(clientIdMqtt5)
              .automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT)
              .transportConfig()
              .mqttConnectTimeout(5, TimeUnit.SECONDS)
              .socketConnectTimeout(5,TimeUnit.SECONDS)
              .applyTransportConfig()
              .buildAsync();
            log.info("Built mqtt v5 client..");
            client.connectWith()
              .cleanStart(false)
              .noSessionExpiry()
              .send()
              .whenComplete((connAck, throwable) -> {
                  if (connAck != null) {
                      log.info("whenComplete: MQTT connect success with code {}", connAck.getReasonCode());
                  } else {
                      if (throwable instanceof Mqtt5ConnAckException) {
                          log.info("whenComplete: Connect failed because of Negative CONNACK with code {} ",
                            ((Mqtt5ConnAckException) throwable).getMqttMessage().getReasonCode());
                      } else {
                          log.info("whenComplete: MQTT connect failed because of {}", throwable.getMessage());
    

    When hivemq broker is not running this whenComplete() callback is never called. I expect it should be called in connect timeout seconds(5s here).

  • Blocking approach:
  • client = MqttClient.builder() .useMqttVersion5() .identifier(clientIdMqtt5) .automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT) .transportConfig() .serverHost(hivemqHost) .serverPort(hivemqPort) .socketConnectTimeout(5, TimeUnit.SECONDS) .mqttConnectTimeout(5, TimeUnit.SECONDS) .applyTransportConfig() .build(); log.info("Built mqtt v5 client.."); Mqtt5ConnAck connAck = client.toBlocking() .connectWith() .cleanStart(false) .noSessionExpiry() .send();

    When hivemq broker is not running this one hangs indefinetely, ignoring transport timeout parameters.
    So, how can i know a connection attempt failed?

    Thanks

    Hi @mtorak ,

    I see that you are using automatic reconnect strategy. So you can add following two lines in order to know a connection attempt failed or succeeded:

            Mqtt5AsyncClient client = Mqtt5Client.builder()
                    .serverHost("localhost")
                    .serverPort(1883)
                    .identifier("my-java-client")
                    .automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT)
                        .addConnectedListener(context -> System.out.println("connected " + LocalTime.now()))
                        .addDisconnectedListener(context -> System.out.println("disconnected " + LocalTime.now()))
                    .transportConfig()
                    .mqttConnectTimeout(5, TimeUnit.SECONDS)
                    .socketConnectTimeout(5,TimeUnit.SECONDS)
                    .applyTransportConfig()
                    .buildAsync();
    

    When the broker is not running, the client will print:

    disconnected 15:17:38.346839
    disconnected 15:17:41.747628
    disconnected 15:17:51.353332
    disconnected 15:18:07.575575
    disconnected 15:18:42.680786
    

    You can find this and more examples here: https://github.com/hivemq/hivemq-mqtt-client/tree/master/examples/src/main/java/com/hivemq/client/mqtt/examples

    I hope this helps,
    Kind regards,
    Dasha from HiveMQ team

    Hi Daria,
    Meanwhile I’ve also discovered those connect/disconnect listeners.They are handy.
    Also tried the autoReconnect option, which allows frequent checking if the server is back online.

            MqttClientAutoReconnect mqttClientAutoReconnect = MqttClientAutoReconnect.builder()
              .initialDelay(1, TimeUnit.SECONDS)
              .maxDelay(5, TimeUnit.SECONDS)
              .build();
    

    But, still i couldn’t grasp the purpose of these two options:

     .mqttConnectTimeout(5, TimeUnit.SECONDS)
     .socketConnectTimeout(5,TimeUnit.SECONDS)
    

    Any idea?

    And also, discovered a scenario like this:

  • Started async client while HiveMQ broker is offline
  • Sent several messages via async client like this:
  •         client.publishWith()
              .topic(topic)
              .messageExpiryInterval(messageExpiryIntervalMs)
              .payload(payload)
              .userProperties(userPropertiesBuilder.build())
              .qos(MqttQos.AT_LEAST_ONCE)
              .send()
              .whenComplete((mqtt5Publish, throwable) -> {
                  if (throwable != null) {
                      log.warn("Publish failed!! Details: " + throwable.getMessage());
                  } else {
                      log.info("Published successfully to hivemq topic: " + topic);
    
  • It didn’t trigger whenComplete() callback, i expected it should… Why wasn’t it triggered?
  • After HiveMQ broker went online, client auto reconnected and sent those messages to broker.
    So hivemq-mqtt-client must be buffering those messages locally. Is this correct? If so what is limit of this behaviour(i.e. how much data can be buffered?)
  • And also thank you for pointing out additional example api usages.

    Hi @mtorak ,

    mqttConnectTimeout(long timeout, @NotNull TimeUnit timeUnit)
    Sets the timeout between sending the Connect and receiving the ConnAck message .

    socketConnectTimeout(long timeout, @NotNull TimeUnit timeUnit)
    Sets the timeout for connecting the socket to the server .

    Your code I could make work like this:

    String topic = "topic/1";
            client.connectWith()
                    .cleanStart(false)
                    .noSessionExpiry()
                    .keepAlive(2)
                    .send()
                    .thenAccept(connAck -> System.out.println("connected " + connAck))
                    .thenCompose(v -> client.publishWith()
                            .topic(topic)
                            .payload("PAYLOAD".getBytes(StandardCharsets.UTF_8))
                            .send()
                            .whenComplete((mqtt5Publish, throwable) -> {
                                if (throwable != null) {
                                    System.out.println("Publish failed!! Details: " + throwable.getMessage());
                                } else {
                                    System.out.println("Published successfully to hivemq topic: " + topic);
                    .thenAccept(someResult -> System.out.println("Result " + someResult));
    

    I hope this helps,
    Dasha from HiveMQ team

    Hi Daria,
    Thank you for sample usage and pointing out javadoc; but i need example usage of these two methods:

              .mqttConnectTimeout(MQTT_CONNECT_TIMEOUT_S, TimeUnit.SECONDS)
              .socketConnectTimeout(SOCKET_CONNECT_TIMEOUT_S, TimeUnit.SECONDS)
    

    As far as i can see, there are also no usages for these options in mqtt-cli code base.
    So i wonder purpose/usage of them.

    Thank you,

    Hi @mtorak ,

    True, these two methods aren’t in the MQTT CLI. This means that MQTT CLI is reusing whatever is set in the MQTT Client library, take a look here:

        private int socketConnectTimeoutMs = MqttClientTransportConfigImpl.DEFAULT_SOCKET_CONNECT_TIMEOUT_MS;
        private int mqttConnectTimeoutMs = MqttClientTransportConfigImpl.DEFAULT_MQTT_CONNECT_TIMEOUT_MS;
    

    I hope this helps,
    Dasha from HiveMQ team