Skip to content

MindConnect MQTT Broker Certificate Expiration

The current MindConnect MQTT broker certificate needs to be replaced before 20th May 2024. This will require all MindConnect MQTT agents to switch to new certificate after this date. The new certificate will be CA DigiCert, and it can be downloaded from here.

It is important to note that the current and new certificates will not be valid simultaneously. To ensure uninterrupted connectivity to our MindConnect MQTT services, it is essential that you download and install the new certificate in your device.

In order for you to expedite adopting the changes, refer the sample codes below for Python and Java for your perusal. In python example, a combined certificate is created by copying both certificates into one certificate file. The Java example makes use of trust store by storing old and new certificates in it. The approaches provided here will allow seamless connection for your device.

Python Sample

import paho.mqtt.client as mqtt
import time
from paho.mqtt.client import ssl

CERT_PATH = "./mqtt-client-cert.pem"
KEY_PATH = "./mqtt-client-key.pem"
ROOT_CA_PATH_COMBINED = "./server_combined.pem"

MQTT_HOST = "localhost"
MQTT_PORT = 8883
MQTT_TOPIC = "topic"


def on_connect(client, userdata, flags, rc):
    print("Connected!")
    client.subscribe(MQTT_TOPIC, qos=0)


def on_publish(client, userdata, mid):
    print("Published message: " + str(mid))


def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: " + str(mid) + " " + str(granted_qos))


def on_message(client, userdata, msg):
    print("Received message: " + msg.topic + " " + str(msg.qos) + " " + str(msg.payload))


def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected MQTT disconnection. Will auto-reconnect")
    else:
        print("MQTT connection closed. No auto-reconnect")


def on_connect_fail(client, userdata, flags, rc):
    print("Connection failed:", rc)


def setup_mqtt_client():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_connect_fail = on_connect_fail
    client.on_subscribe = on_subscribe
    client.on_message = on_message
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect

    client.tls_set(ROOT_CA_PATH_COMBINED, CERT_PATH, KEY_PATH, tls_version=ssl.PROTOCOL_TLSv1_2)
    client.tls_insecure_set(True)

    return client


def main():
    mqtt_client = setup_mqtt_client()
    mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60)
    mqtt_client.loop_start()

    count = 0
    while True:
        temperature = 5
        (rc, mid) = mqtt_client.publish(MQTT_TOPIC, str(temperature), qos=1)
        count += 1

        if count == 10:
            mqtt_client.disconnect()
            mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60)
            count = 0

        time.sleep(5)


main()

Java Sample

public class MqttConnection {

  private static final String SERVER_URL = "ssl://localhost:8883";
  private static final String ROOT_CERT1 = "server.pem";
  private static final String ROOT_CERT2 = "server1.pem";
  private static final String CLIENT_CERT_FILE_PATH = "mqtt-client-cert.pem";
  private static final String CLIENT_KEY_FILE_PATH = "mqtt-client-key-rsa.pem";
  private static final String TOPIC_NAME = "testTopicJava";
  private static final int CONNECTION_TIMEOUT = 60;
  private static final int KEEP_ALIVE_INTERVAL = 60;
  private static final int MESSAGE_PUBLISH_INTERVAL = 5000;

  public static void main(String[] args) {
    connectAndPublish();
  }

  private static void connectAndPublish() {
    MqttClient client;
    try {
      client = new MqttClient(SERVER_URL, "javaClient");
      MqttConnectOptions options = new MqttConnectOptions();

      options.setConnectionTimeout(CONNECTION_TIMEOUT);
      options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
      options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);

      SSLSocketFactory socketFactory = getSocketFactory(ROOT_CERT1, ROOT_CERT2,
        CLIENT_CERT_FILE_PATH, CLIENT_KEY_FILE_PATH, "");
      options.setSocketFactory(socketFactory);

      System.out.println("Starting to connect to the server...");
      client.connect(options);
      System.out.println("Connected!");

      setupMqttCallback(client);
      client.subscribe(TOPIC_NAME, 0);

      int count = 0;
      while (true) {
        publishMessage(client);
        count++;
        if (count == 10) {
          client.disconnect();
          System.out.println("Disconnected!");
          connectAndPublish();
        }
        Thread.sleep(MESSAGE_PUBLISH_INTERVAL);
      }
    } catch (Exception e) {
      connectAndPublish();
    }
  }

  private static void setupMqttCallback(MqttClient client) {
    client.setCallback(new MqttCallback() {
      @Override
      public void connectionLost(Throwable cause) {
        System.out.println("Connection lost. Reconnecting...");
      }

      @Override
      public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message on topic: " + topic + " Message is: " + new String(message.getPayload()));
      }

      @Override
      public void deliveryComplete(IMqttDeliveryToken token) {
      }
    });
  }

  private static SSLSocketFactory getSocketFactory(final String rootCert1, final String rootCert2,
                                                    final String crtFile, final String keyFile, final String password)
    throws Exception {
    Security.addProvider(new BouncyCastleProvider());
    CertificateFactory cf = CertificateFactory.getInstance("X.509");

    X509Certificate cert;
    try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(crtFile))) {
      cert = (X509Certificate) cf.generateCertificate(bis);
    }

    PEMParser pemParser = new PEMParser(new FileReader(keyFile));
    Object object = pemParser.readObject();
    PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder()
      .build(password.toCharArray());
    JcaPEMKeyConverter converter = new JcaPEMKeyConverter()
      .setProvider("BC");
    KeyPair key;
    if (object instanceof PEMEncryptedKeyPair) {
      System.out.println("Encrypted key - we will use provided password");
      key = converter.getKeyPair(((PEMEncryptedKeyPair) object)
        .decryptKeyPair(decProv));
    } else {
      System.out.println("Unencrypted key - no password needed");
      key = converter.getKeyPair((PEMKeyPair) object);
    }
    pemParser.close();

    FileInputStream cert1File = new FileInputStream(rootCert1);
    FileInputStream cert2File = new FileInputStream(rootCert2);
    KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
    keyStore.load(null, null);
    keyStore.setCertificateEntry("rootCert1", CertificateFactory.getInstance("X.509").generateCertificate(cert1File));
    keyStore.setCertificateEntry("rootCert2", CertificateFactory.getInstance("X.509").generateCertificate(cert2File));
    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
    trustManagerFactory.init(keyStore);

    KeyStore clientKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
    clientKeyStore.load(null, null);
    clientKeyStore.setCertificateEntry("certificate", cert);
    clientKeyStore.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
      new java.security.cert.Certificate[]{cert});
    KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
      .getDefaultAlgorithm());
    kmf.init(clientKeyStore, password.toCharArray());

    SSLContext context = SSLContext.getInstance("TLSv1.2");
    context.init(kmf.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);

    return context.getSocketFactory();
  }

  private static void publishMessage(MqttClient client) throws MqttException {
    String stringMessage = "TestMessage";
    MqttMessage message = new MqttMessage(stringMessage.getBytes());
    message.setQos(0);
    client.publish(TOPIC_NAME, message);
    System.out.println("Message published: " + stringMessage);
  }
}

Last update: April 2, 2024

Except where otherwise noted, content on this site is licensed under the Development License Agreement.