1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] MQTT Broker publish keeps failing until we don't restart client service

Discussão em 'Python' iniciado por Stack, Outubro 3, 2024 às 21:12.

  1. Stack

    Stack Membro Participativo

    We are running a MQTT broker in kubernetes.

    After sometime publish starts failing, and keeps failing till we don't restart the client service

    MQTT protocol version we are using is MQTTv5

    connection code:

    class Mqtt_Connection:

    def __init__(self, host_name, broker_ip_addr, port, msg_counter, logger):
    self._logger = logger
    self._condition = threading.Condition()
    self._broker_ip_addr = broker_ip_addr
    self._port = port # port

    self.rf_status = ""
    self.id = msg_counter
    self.response_received = False

    self.client = mqtt.Client(protocol=mqtt.MQTTv5)

    def on_message(self, client, userdata, msg):

    with self._condition:

    self._logger.debug("Reply from node...Topic {0}".format(msg.topic))
    self._logger.debug("Reply from node...Topic {0} Payload {1}".format(msg.topic, msg.payload))

    if msg.payload != None or msg.payload != "":
    try:
    payload_dict = json.loads(msg.payload)
    self._logger.debug("Reply: {}".format(payload_dict))

    if "id" in payload_dict:
    self.rf_status = payload_dict["rf_status"]
    status = payload_dict["status"]
    id = payload_dict["id"]
    self._logger.debug("Status : {} rf_status : {} id: {}".format(status, self.rf_status, id))

    if self.id == id:
    self.response_received = True
    self._condition.notify()
    except Exception as e:
    self._logger.exception("Exception: {}".format(e))

    def mqtt_connect(self, topic, payload):
    self.client.on_message = self.on_message
    self._logger.debug("Connecting to the Broker : {}".format(self._broker_ip_addr))

    try:
    connection_result = self.client.connect(host=self._broker_ip_addr, port=self._port, clean_start=True) # connect to broker
    self.client.loop_start() # It starts a new thread, that calls the loop method at regular intervals in
    # which on message will be processed

    if connection_result == 0: # to verify the connection is successful or not. 0-true
    # pub_result=None
    with self._condition:

    # subscription
    topic_list = topic.split("/") # split the topic with "/" and get first two elements to subscribe parent topic
    subscription_topic = topic_list[0] + "/" + topic_list[1] + "/"
    self._logger.debug("Subscription Topic : {} ".format(subscription_topic))

    rc = self.client.subscribe(subscription_topic, 0)
    self._logger.debug("Subscribe Return Code {} ".format(rc[0]))
    sub_result = rc[0] # rc[0] will return 0 if subscription successful

    if sub_result == 0:
    self._logger.debug("Topic {} has been subscribed".format(subscription_topic))
    else:
    raise exceptions.MQTTSubscribeException("Subscription Failed")
    # subscription

    # publishing
    pub_result = self.client.publish(topic, payload)
    pub_result.wait_for_publish(30)
    self._logger.debug(
    "Publish Return Code = {} Is published = {}".format(pub_result.rc, pub_result.is_published()))

    if pub_result.is_published():
    self._logger.debug(" Published : {} to the Topic {}".format(payload, topic))

    self._condition.wait(300)

    if not pub_result.is_published():
    raise exceptions.MQTTPublishException("Publishing failed")

    if self.response_received == False:
    raise exceptions.TimeOutException("Timed out for RF4CNode")

    return self.voice_command_status, self.rf_status
    else:
    raise exceptions.ConnectionException("MQTT Server Connection Failed")

    except Exception as err:
    self._logger.error(" MQTT Broker Connection Failed".format(err))
    raise
    finally:
    self.client.disconnect()
    self.client.loop_stop()
    self._logger.debug(" Disconnected from the Broker : {}".format(self._broker_ip_addr))


    After restarting client it starts working.

    When it fails pub_result.rc value is always 0 but is_published is always false.

    Client is always able to subscribe. only publish starts failing.

    Continue reading...

Compartilhe esta Página