「Qtの基礎 - MQTT」の版間の差分

提供: MochiuWiki : SUSE, EC, PCB

編集の要約なし
 
1,195行目: 1,195行目:


== qmqttのインストール ==
== qmqttのインストール ==
==== ソースコードからインストール (CMake) ====
==== ソースコードのダウンロード ====
まず、GitHubからソースコードをクローンする。<br>
[https://github.com/emqx/qmqtt qmqttのGitHub]にアクセスして、ソースコードをダウンロードする。<br>
ダウンロードしたファイルを解凍する。<br>
<br>
tar xf qmqtt-<バージョン>.tar.gz
cd qmqtt-<バージョン>
<br>
または、<code>git clone</code> コマンドを実行して、qmqttのソースコードをクローンする。<br>
<br>
<br>
  git clone https://github.com/emqx/qmqtt.git
  git clone https://github.com/emqx/qmqtt.git
  cd qmqtt
  cd qmqtt
<br>
<br>
次に、CMakeを使用してビルドおよびインストールする。<br>
==== ソースコードからインストール (CMake) ====
CMakeを使用してビルドおよびインストールする。<br>
<br>
<br>
  mkdir build && cd build
  mkdir build && cd build
1,218行目: 1,225行目:
   
   
  # インストール先を指定する場合
  # インストール先を指定する場合
  cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local
  cmake .. -DCMAKE_INSTALL_PREFIX=<任意のインストールディレクトリ>
  cmake --build .
  cmake --build .
  cmake --install .
  cmake --install .
1,242行目: 1,249行目:
  # 基本ビルド
  # 基本ビルド
  qmake qmqtt.pro
  qmake qmqtt.pro
  make
  make -j $(nproc)
   
   
  # WebSocket対応ビルド
  # WebSocket対応ビルド
  qmake "CONFIG+=QMQTT_WEBSOCKETS" qmqtt.pro
  qmake "CONFIG+=QMQTT_WEBSOCKETS" qmqtt.pro
  make
  make -j $(nproc)
   
   
  # SSL無効ビルド
  # SSL無効ビルド
  qmake "CONFIG+=QT_NO_SSL" qmqtt.pro
  qmake "CONFIG+=QT_NO_SSL" qmqtt.pro
  make
  make -j $(nproc)
<br>
<br>
==== ビルド時の注意事項 ====
==== ビルド時の注意事項 ====

2026年5月11日 (月) 05:44時点における最新版

概要

QtでMQTT通信を実装する場合は、QtMqttモジュールを使用する。
このモジュールは、標準的なMQTTプロトコルの機能をカバーしており、プロジェクトファイルに簡単な設定を追加するだけで利用可能である。

中核となるQMqttClientクラスを使用することにより、MQTTブローカーへの接続、切断、メッセージの送受信等の基本的な操作が可能になる。
接続設定では、ホスト名、ポート番号、必要に応じて認証情報を指定する。

Qt特有のシグナル / スロットメカニズムを使用することで、接続状態の監視やメッセージの非同期処理を効率的に実装できる。
接続完了、切断、エラー発生等のイベントをリアルタイムで検知して、適切な処理を行うことができる。

パブリッシュ / サブスクライブ (メッセージの送受信) の実装も直感的である。
トピック名を指定してメッセージを発行、あるいは、特定のトピックをサブスクライブしてメッセージを受信することができる。
また、ワイルドカードを使用したトピックの購読にも対応している。

セキュリティ面においては、TLS/SSL通信やユーザ認証もサポートされている。
証明書の設定や認証情報の指定により、セキュアな通信を実現できる。
また、Last Will Testamentの設定やQoSレベルの指定といったMQTTプロトコルの高度な機能も利用可能である。

実装時において重要なことは、エラーハンドリングの適切な実装が挙げられる。
ネットワークの切断、再接続、タイムアウト等の状況に適切に対応することにより、安定したアプリケーションを構築することができる。

また、大量のメッセージを送受信する場合は、非同期処理を活用してUIのブロッキングを防ぐことが重要である。


QtMqttモジュールのインストール

パッケージ管理システムからインストール

# RHEL
sudo dnf install qt6-qtmqtt-devel

# RHEL
sudo zypper install qt6-mqtt-devel 


ソースコードからインストール

QtMqttモジュールのソースコードをダウンロードする。

git clone --depth 1 git://code.qt.io/qt/qtmqtt.git -b <Qtのブランチ  例: 6.5.3>
cd qtmqtt


QtMqttモジュールをビルドおよびインストールする。

 mkdir build && cd build
 
 # QtオンラインインストーラからQtライブラリをインストールしている場合
 ## Qt 5の場合
 /<Qtのインストールディレクトリ>/<バージョン>/gcc_64/bin/qmake ../qtmqtt.pro
 make -j $(nproc)
 make install
 
 ## Qt 6 の場合
 /<Qtのインストールディレクトリ>/<バージョン>/gcc_64/bin/qt-cmake \
    -DCMAKE_BUILD_TYPE=Release \
    -DCMAKE_C_COMPILER=<GCC 8以降のGCCのパス>   \
    -DCMAKE_CXX_COMPILER=<GCC 8以降のG++のパス> \
    ..
 make -j $(nproc)
 make install
 
 # パッケージ管理システムからQtライブラリをインストールしている場合
 ## Qt 5の場合
 qmake ../qtmqtt.pro
 make -j $(nproc)
 make install
 
 ## Qt 6 の場合
 cmake -DCMAKE_BUILD_TYPE=Release \
       -DCMAKE_INSTALL_PREFIX="<QtMqttモジュールのインストールディレクトリ>" \
       -DCMAKE_C_COMPILER=<GCC 8以降のGCCのパス>   \
       -DCMAKE_CXX_COMPILER=<GCC 8以降のG++のパス> \
      ..
 make -j $(nproc)
 make install



MQTT関連のクラスとメソッド

QMqttClientクラスのメソッド

基本的な接続に関する設定を以下に示す。

 QMqttClient client;
 
 // MQTTブローカーのホスト名の指定
 client->setHostname("localhost");
 
 // ポート番号の指定
 client->setPort(1883);
 
 // ユーザ名の指定 (認証が必要な場合)
 client->setUsername("<ユーザ名>");
 
 // パスワード (認証が必要な場合)
 client->setPassword("<パスワード>");
 
 // MQTTブローカーの接続
 client->connectToHost();
 
 // MQTTブローカーの切断
 client->disconnectFromHost();
 
 // 現在の接続状態の取得
 // 状態の種類:
 // -> QMqttClient::Disconnected
 // -> QMqttClient::Connecting
 // -> QMqttClient::Connected
 QMqttClient::ClientState state = client->state();


MQTTメッセージの保持 (Retain) フラグとは、MQTTブローカーがそのトピックの最後のメッセージを保存して、
新しいクライアントが購読を開始した時に、最後に発行されたメッセージを自動的に送信するかどうかを制御するフラグである。

publishメソッドでは、以下に示す値を指定することができる。

  • RETAIN (true/1):
    メッセージをMQTTブローカーに保持するように指示する。
    新しいサブスクライバーが接続した時に、最後のRetainメッセージを受信する。
    頻繁に更新されないステータス情報 (デバイスの設定値等) の共有に使用する。

  • NO_RETAIN (false/0)
    メッセージは保持されない。
    購読時に過去のメッセージは送信されない。
    リアルタイムデータやイベント通知等の一時的な情報に適している。


 // 使用例:
 
 QMqttClient client;
 
 // 保持フラグをtrueに設定してメッセージを発行 (QoS 0, Retain true)
 client.publish("sensor/temperature", "25.5", 0, true);
 
 // 保持フラグをfalseに設定してメッセージを発行 (QoS 0, Retain false)
 client.publish("sensor/temperature", "25.5", 0, false);


MQTTブローカーに保持されているメッセージを削除する場合は、同じトピックに空のペイロードを保持フラグtrueで送信する。

 QMqttClient client;
 
 client.publish("sensor/temperature", "", 0, true);


送信 (パブリッシュ側)

 QMqttClient client;
 
 // メッセージのパブリッシュ
 qint32 msgId = client->publish(topic,    // トピック名
                                message,  // メッセージ内容
                                qos,      // QoSレベル
                                retain);  // 保持メッセージフラグ
 
 // 戻り値が-1の場合はパブリッシュ失敗
 if (msgId == -1) {
    // エラー処理
    // ...略
 }


受信 (サブスクライブ側)

 QMqttClient client;
 
 // トピックのサブスクライブ
 // QoS 0 : (最大1回配信)
 // QoS 1 : (最低1回配信)
 // QoS 2 : (正確に1回配信)
 auto subscription = client->subscribe(topic, qos);
 
 // サブスクリプション成功の確認
 if (subscription) {
    QMqttSubscription *sub = subscription;
    // サブスクリプションの状態変更を監視
    connect(sub, &QMqttSubscription::stateChanged, this, [](QMqttSubscription::SubscriptionState state) {
       // 状態の種類:
       // -> QMqttSubscription::Unsubscribed
       // -> QMqttSubscription::SubscriptionPending
       // -> QMqttSubscription::Subscribed
       // -> QMqttSubscription::UnsubscriptionPending
    });
 }


シグナル / スロット接続

  • 接続状態の変更を監視する場合


 connect(client, &QMqttClient::stateChanged, this, [](QMqttClient::ClientState state) {
    // 状態変更時の処理
    // ...略
 });


  • メッセージを受信する場合


 connect(client, &QMqttClient::messageReceived, this, [](const QByteArray &message, const QMqttTopicName &topic) {
    // メッセージの受信
    QString msg = QString::fromUtf8(message);
    QString topicName = topic.name();
 
    // その他のメッセージ処理
    // ...略
 });


 // エラー監視
 connect(client, &QMqttClient::errorChanged, this, [](QMqttClient::ClientError error) {
    // エラーの種類:
    // -> QMqttClient::NoError
    // -> QMqttClient::InvalidProtocolVersion
    // -> QMqttClient::IdRejected
    // -> QMqttClient::ServerUnavailable
    // -> QMqttClient::BadUsernameOrPassword
    // -> QMqttClient::NotAuthorized
    // -> QMqttClient::TransportInvalid
    // -> QMqttClient::ProtocolViolation
    // -> QMqttClient::UnknownError
    // -> QMqttClient::Mqtt5SpecificError
 
    // エラー処理を記述
    // ...略
 });



MQTT通信

技術的な制約

  • メモリ関連の制約
    QtのMQTTクライアントは動的メモリ管理を使用するため、大量のメッセージを処理する場合はメモリ使用量に注意が必要となる。
    特に、大きなペイロードを持つメッセージを頻繁に送受信する場合、メモリリークを防ぐための適切な解放処理が重要である。

  • ネットワーク関連の制約
    Qt MQTTはTCP/IP上で動作するため、ネットワークの状態に依存する。
    ファイアウォールや特定のポートがブロックされている環境では動作しない可能性がある。
    SSL/TLS接続を使用する場合、追加の設定と証明書の管理が必要である。


デバッグとトラブルシューティング

開発時は、MQTTクライアントツール (例: MQTT Explorer) を使用することにより、通信の様子を視覚的に確認できる。
また、シリアルモニタを活用して、接続状態やメッセージの送受信を確認する。

CMake / Qtプロジェクトファイル

  • CMakeLists.txtファイルを使用する場合
 # パッケージの検索
 find_package(QT NAMES Qt6 REQUIRED COMPONENTS Core Network Mqtt)
 find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Core Network Mqtt)
 
 # ライブラリのリンク
 target_link_libraries(<プロジェクト名> PRIVATE
    Qt6::Core
    Qt6::Network
    Qt6::Mqtt
 )


  • Qtプロジェクトファイルを使用する場合
 QT += core network mqtt


使用例

以下の例では、MQTT通信でトピックの送受信を行っている。

全てのMQTT操作 (接続、発行、購読) をシグナル/スロットを使用して非同期で実行している。
また、接続が失敗した場合は、自動的に再接続を行う。

送信 (MQTTパブリッシャー側)
 // Publisher.h
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QTimer>
 #include <QDebug>
 
 class MqttPublisher : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;
    QString     m_host;
    quint16     m_port;
 
 public:
    explicit MqttPublisher(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
    {
       // MQTT接続状態変更時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::stateChanged, this, &MqttPublisher::handleConnectionStateChange);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::errorChanged, this, &MqttPublisher::handleError);
    }
 
    ~MqttPublisher()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnect();
       }
    }

    // MQTT接続の開始
    void connectToHost(const QString &host, quint16 port)
    {
       m_host = host;
       m_port = port;
 
       // 接続パラメータを設定
       m_client->setHostname(host);
       m_client->setPort(port);
 
       // ユーザ名とパスワードが指定されている場合は設定
       if (!username.isEmpty()) {
          m_client->setUsername(username);
       }
 
       if (!password.isEmpty()) {
          m_client->setPassword(password);
       }
 
       // 非同期で接続を開始
       m_client->connectToHost();
    }
 
    // メッセージの発行
    void publishMessage(const QString &topic, const QByteArray &message)
    {
       if (m_client->state() != QMqttClient::Connected) {
          qWarning() << "MQTTクライアントが接続されていません";
          return;
       }
 
       // QoS 1でメッセージを発行 (メッセージ到達保証あり)
       auto publish = m_client->publish(topic, message, 1);
       if (!publish) {
          qWarning() << "メッセージの発行に失敗しました : " << topic;
          return;
       }
    }
 
    // 接続の切断
    void disconnect()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 signals:
    // 接続確立時のシグナルを追加
    void connectionEstablished();
 
 private slots:
    // MQTT接続状態が変化した時のハンドラ
    void handleConnectionStateChange()
    {
       switch (m_client->state()) {
          case QMqttClient::Connected:
             qInfo() << "MQTTブローカーに接続しました";
             emit connectionEstablished();  // 接続確立時にシグナルを発行
             break;
          case QMqttClient::Disconnected:
             qInfo() << "MQTTブローカーから切断されました";
             break;
          case QMqttClient::Connecting:
             qInfo() << "MQTTブローカーに接続中...";
             break;
          default:
             qWarning() << "不明な接続状態です: " << m_client->state();
             break;
       }
    }
 
    // エラー発生時のハンドラ
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMsg;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMsg = "無効なプロトコルバージョンです";
             break;
          case QMqttClient::IdRejected:
             errorMsg = "クライアントIDが拒否されました";
             break;
          case QMqttClient::ServerUnavailable:
             errorMsg = "サーバーが利用できません";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMsg = "ユーザー名またはパスワードが無効です";
             break;
          case QMqttClient::NotAuthorized:
             errorMsg = "認証されていません";
             break;
          case QMqttClient::TransportInvalid:
             errorMsg = "トランスポートが無効です";
             break;
          case QMqttClient::ProtocolViolation:
             errorMsg = "プロトコル違反が発生しました";
             break;
          case QMqttClient::UnknownError:
             errorMsg = "不明なエラーが発生しました";
             break;
          default:
             errorMsg = "予期せぬエラーが発生しました";
             break;
       }
 
       qCritical() << "MQTTエラー :" << errorMsg;
 
       // エラー発生時は再接続を試みる
       if (m_client->state() != QMqttClient::Connected) {
          QTimer::singleShot(5000, this, [this]() {
             qInfo() << "再接続を試みます...";
             this->connectToHost(m_host, m_port, m_client->username(), m_client->password());
          });
       }
    }
 };


 // Publisher側の使用例
 
 #include "Publisher.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    MqttPublisher publisher;
 
    // タイマの作成
    QTimer *pTimer = new QTimer(&a);

    // タイマーのコールバックを設定
    QObject::connect(pTimer, &QTimer::timeout, pTimer, [&publisher]() {
       publisher.publishMessage("qt/topic", "Hello, MQTT!");
    });

    // Publisher側で接続状態の変化を監視し、接続成功時にタイマーを開始
    QObject::connect(&publisher, &Publisher::connectionEstablished, pTimer, [pTimer]() {
       pTimer->start(3000);  // 3秒おきに送信
    });

    // MQTTブローカーに接続
    publisher.connectToHost("<IPアドレスまたはホスト名  例: localhost>",
                            <MQTT通信するポート番号  : 1883>,
                            "<MQTTユーザ名 (匿名ユーザを許可している場合は空でも可)>",
                            "<MQTTユーザのパスワード (匿名ユーザを許可している場合は空でも可>");
    
    return a.exec();
 }


受信 (MQTTサブスクライバ側)

以下の例では、受信したトピックの購読 (サブスクライブ) している。

 // Subscriber.hファイル
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QtMqtt/QMqttSubscription>
 #include <QTimer>
 #include <QDebug>
 
 class MqttSubscriber : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;
    QString     m_host;
    quint16     m_port;
    QMap<QString, QMqttSubscription*> m_subscriptions;
 
 public:
    explicit MqttSubscriber(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
    {
       // MQTT接続状態変更時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::stateChanged, this, &MqttSubscriber::handleConnectionStateChange);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::errorChanged, this, &MqttSubscriber::handleError);
    }
 
    ~MqttSubscriber()
    {
       // 全ての購読を解除
       for (auto it = m_subscriptions.constBegin(); it != m_subscriptions.constEnd(); it++) {
          it.value()->unsubscribe();
          delete it.value();
       }
 
       m_subscriptions.clear();
 
      if (m_client->state() == QMqttClient::Connected) {
         m_client->disconnect();
      }
   }
 
    // MQTT接続を開始する
    void connectToHost(const QString &host, quint16 port, const QString &username = QString(), const QString &password = QString())
    {
       m_host = host;
       m_port = port;
 
       // 接続パラメータを設定
       m_client->setHostname(host);
       m_client->setPort(port);
 
       // ユーザ名とパスワードが指定されている場合は設定
       if (!username.isEmpty()) {
          m_client->setUsername(username);
       }
 
       if (!password.isEmpty()) {
          m_client->setPassword(password);
       }
 
       // 非同期で接続を開始
       m_client->connectToHost();
    }
 
    // トピックを購読する
    void subscribe(const QString &topic)
    {
       if (m_client->state() != QMqttClient::Connected) {
          qWarning() << "MQTTクライアントが接続されていません";
          return;
       }
 
       // 既に購読済みのトピックは無視
       if (m_subscriptions.contains(topic)) {
          qInfo() << "トピックは既に購読されています:" << topic;
          return;
       }
 
       // QoS 1でトピックを購読 (メッセージ到達保証あり)
       auto subscription = m_client->subscribe(topic, 1);
       if (!subscription) {
          qWarning() << "トピックの購読に失敗しました : " << topic;
          return;
       }
 
       // メッセージ受信時のハンドラを設定
       connect(subscription, &QMqttSubscription::messageReceived, this, &MqttSubscriber::handleMessage);
 
       // 購読リストに追加
       m_subscriptions.insert(topic, subscription);
       qInfo() << "トピックを購読開始しました : " << topic;
    }
 
    // 接続を切断する
    void disconnect()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 signals:
    void connected();
    void messageReceived(const QString &topic, const QByteArray &message);
 
 private slots:
    // MQTT接続状態が変化した時のハンドラ
    void handleConnectionStateChange()
    {
       switch (m_client->state()) {
          case QMqttClient::Connected:
             qInfo() << "MQTTブローカーに接続しました";
             emit connected();  // 接続成功時にシグナルを発行
             break;
          case QMqttClient::Disconnected:
             qInfo() << "MQTTブローカーから切断されました";
 
             // 切断時に再接続を試みる
             QTimer::singleShot(5000, this, [this]() {
                qInfo() << "再接続を試みます...";
                this->connectToHost(m_host, m_port, m_client->username(), m_client->password());
             });
 
             break;
          case QMqttClient::Connecting:
             qInfo() << "MQTTブローカーに接続中...";
             break;
          default:
             qWarning() << "不明な接続状態です: " << m_client->state();
             break;
       }
    }
 
    // エラー発生時のハンドラ
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMsg;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMsg = "無効なプロトコルバージョンです";
             break;
          case QMqttClient::IdRejected:
             errorMsg = "クライアントIDが拒否されました";
             break;
          case QMqttClient::ServerUnavailable:
             errorMsg = "サーバが利用できません";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMsg = "ユーザ名またはパスワードが無効です";
             break;
          case QMqttClient::NotAuthorized:
             errorMsg = "認証されていません";
             break;
          case QMqttClient::TransportInvalid:
             errorMsg = "トランスポートが無効です";
             break;
          case QMqttClient::ProtocolViolation:
             errorMsg = "プロトコル違反が発生しました";
             break;
          case QMqttClient::UnknownError:
             errorMsg = "不明なエラーが発生しました";
             break;
          default:
             errorMsg = "予期せぬエラーが発生しました";
             break;
       }
 
       qCritical() << "MQTTエラー:" << errorMsg;
 
       // エラー発生時は再接続を試みる
       if (m_client->state() != QMqttClient::Connected) {
          QTimer::singleShot(5000, this, [this]() {
             qInfo() << "再接続を試みます...";
             this->connectToHost(m_host, m_port, m_client->username(), m_client->password());
          });
       }
    }
 
    // メッセージ受信時のハンドラ
    void handleMessage(const QByteArray &message, const QMqttTopicName &topic)
    {
       // シグナルを発行してアプリケーションに通知
       emit messageReceived(topic.name(), message);
    }
 };


 // Subscriber側の使用例
 
 #include "Subscriber.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    MqttSubscriber subscriber;
 
    // メッセージ受信時の処理
    QObject::connect(&subscriber, &Subscriber::messageReceived, [](const QString &topic, const QByteArray &message) {
       qDebug() << "受信 - トピック : " << topic;
       qDebug() << "メッセージ : "  << message;
    });
 
    // 接続成功時の処理を追加
    QObject::connect(&subscriber, &Subscriber::connected, [&subscriber]() {
       subscriber.subscribe("qt/topic");
    });
 
    subscriber.connectToHost("localhost", 1883, "<MQTTユーザ名>", "<MQTTユーザのパスワード>");
 
    return a.exec();
 }


送受信 (MQTTパブリッシャー / MQTTサブスクライバ)

以下の例では、トピックの送受信を行っている。

  • クラス構造
    • MqttWorker
      MQTT通信の実際の処理を担当
    • MqttManager
      アプリケーションとワーカー間のインターフェース

  • スレッド管理
    MQTTの処理を専用スレッドで実行
    シグナル / スロットによる安全な通信

  • エラーハンドリング
    全てのエラーはerrorOccurredシグナルで通知
    接続問題は自動的に処理
    包括的なエラー状態の検出と通知
    デバッグ情報の出力

  • 非同期処理
    全ての操作が非ブロッキング
    メッセージキューイングによる信頼性の確保


 // Mqttworker.h
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QThread>
 #include <QQueue>
 #include <QMutex>
 
 /**
  * @brief MQTTの通信処理を担当するワーカークラス
  * 
  * このクラスは別スレッドで動作して、MQTT通信に関する以下に示す機能を提供する
  * - MQTTブローカーへの接続 / 切断
  * - メッセージの発行 (パブリッシュ)
  * - トピックの購読(サブスクライブ)
  * - メッセージの受信と通知
  * 
  * スレッドセーフな設計となっており、メッセージキューイング機能も備えている
  */
 class MqttWorker : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;                                  // MQTTクライアントインスタンス
    QQueue<QPair<QString, QByteArray>> m_messageQueue;      // 未送信メッセージのキュー
    QMutex m_mutex;                                         // スレッド同期用ミューテックス
    bool m_isConnected;                                     // 現在の接続状態
 
 public:
    /**
     * @brief コンストラクタ
     * @param parent 親オブジェクト(デフォルトはnullptr)
     */
    explicit MqttWorker(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this)), m_isConnected(false)
    {
       // MQTTクライアントのシグナルとワーカーのスロットを接続
       connect(m_client, &QMqttClient::messageReceived, this, &MqttWorker::handleMessage);
       connect(m_client, &QMqttClient::stateChanged, this, &MqttWorker::handleStateChange);
       connect(m_client, &QMqttClient::errorChanged, this, &MqttWorker::handleError);
    }
 
    /**
     * @brief デストラクタ
     * 接続中の場合は切断処理を行います
     */
    ~MqttWorker()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 public slots:
    /**
     * @brief MQTTブローカーへの接続を開始
     * @param host ブローカーのホスト名またはIPアドレス
     * @param port ブローカーのポート番号
     * 
     * このメソッドは非同期で実行され、接続状態の変更はconnectionStateChangedシグナルで通知される
     */
    void connectToHost(const QString &host, quint16 port)
    {
       qDebug() << "ワーカースレッド開始: " << QThread::currentThread();
       m_client->setHostname(host);
       m_client->setPort(port);
       m_client->connectToHost();
    }
 
    /**
     * @brief 指定されたトピックにメッセージを発行
     * @param topic 発行先のトピック
     * @param payload 送信するメッセージ内容
     * 
     * 未接続時はメッセージをキューに保存し、接続時に自動的に送信する
     * QoS 1を使用して、メッセージの到達を保証する
     */
    void publishMessage(const QString &topic, const QByteArray &payload)
    {
       QMutexLocker locker(&m_mutex);
 
       if (!m_isConnected) {
          // 未接続時はメッセージをキューに保存
          m_messageQueue.enqueue(qMakePair(topic, payload));
          qDebug() << "メッセージをキューに保存: " << topic;
          return;
       }
 
       auto publish = m_client->publish(topic, payload, 1);
       if (!publish) {
          emit errorOccurred("メッセージの発行に失敗しました: " + topic);
          qWarning() << "メッセージ発行失敗: " << topic;
       }
       else {
          qDebug() << "メッセージを発行:" << topic;
       }
    }
 
    /**
     * @brief 指定されたトピックを購読
     * @param topic 購読するトピック名
     * 
     * 接続済みの場合のみ購読を開始する
     * QoS 1を使用して、メッセージの到達を保証する
     */
    void subscribe(const QString &topic)
    {
       if (!m_isConnected) {
          emit errorOccurred("購読できません - 接続されていません");
          qWarning() << "購読失敗 - 未接続:" << topic;
          return;
       }
 
       auto subscription = m_client->subscribe(topic, 1);
       if (!subscription) {
          emit errorOccurred("トピックの購読に失敗しました: " + topic);
          qWarning() << "購読失敗:" << topic;
       }
       else {
          qDebug() << "トピックを購読開始:" << topic;
       }
    }
 
 signals:
    /**
     * @brief メッセージ受信時に発行されるシグナル
     * @param topic 受信したメッセージのトピック
     * @param message 受信したメッセージの内容
     */
    void messageReceived(const QString &topic, const QByteArray &message);
 
    /**
     * @brief 接続状態が変化した時に発行されるシグナル
     * @param connected 接続状態 (true: 接続済み, false: 未接続)
     */
    void connectionStateChanged(bool connected);
 
    /**
     * @brief エラー発生時に発行されるシグナル
     * @param error エラーメッセージ
     */
    void errorOccurred(const QString &error);
 
 private slots:
    /**
     * @brief メッセージ受信時の処理
     * @param message 受信したメッセージ
     * @param topic 受信したトピック
     */
    void handleMessage(const QByteArray &message, const QMqttTopicName &topic) {
        emit messageReceived(topic.name(), message);
        qDebug() << "メッセージを受信:" << topic.name() << message;
    }

    /**
     * @brief 接続状態変更時の処理
     * 
     * 接続完了時にキューに保存されたメッセージの送信を試みます
     */
    void handleStateChange()
    {
       m_isConnected = (m_client->state() == QMqttClient::Connected);
       emit connectionStateChanged(m_isConnected);
 
       if (m_isConnected) {
          qDebug() << "MQTT接続完了";
 
          // 接続時にキューのメッセージを処理
          QMutexLocker locker(&m_mutex);
          while (!m_messageQueue.isEmpty()) {
             auto message = m_messageQueue.dequeue();
             publishMessage(message.first, message.second);
          }
       }
       else {
          qDebug() << "MQTT切断";
       }
    }
 
    /**
     * @brief エラー発生時の処理
     * @param error 発生したエラーの種類
     * 
     * エラーの種類に応じて適切なメッセージを生成し通知します
     */
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMessage;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMessage = "無効なプロトコルバージョン";
             break;
          case QMqttClient::IdRejected:
             errorMessage = "クライアントID拒否";
             break;
          case QMqttClient::ServerUnavailable:
             errorMessage = "サーバー利用不可";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMessage = "認証エラー";
             break;
          case QMqttClient::NotAuthorized:
             errorMessage = "認可エラー";
             break;
          case QMqttClient::TransportInvalid:
             errorMessage = "トランスポートエラー";
             break;
          case QMqttClient::ProtocolViolation:
             errorMessage = "プロトコル違反";
             break;
          case QMqttClient::UnknownError:
             errorMessage = "不明なエラー";
             break;
          default:
             errorMessage = "予期せぬエラー";
             break;
       }
 
       emit errorOccurred(errorMessage);
 
       qCritical() << "MQTTエラー:" << errorMessage;
    }
 };


 // Mqttmanager.hファイル
 
 #include <QObject>
 #include <QThread>
 #include "Mqttworker.h"
 
 /**
  * @brief MQTT通信を管理するマネージャークラス
  * 
  * このクラスは以下に示す機能を提供する
  * - MQTTワーカーの生成と管理
  * - ワーカースレッドの制御
  * - MQTT通信操作のインターフェース提供
  * 
  * アプリケーションはこのクラスを通じてMQTT通信を利用する
  * 全ての操作は非同期で実行されて、メインスレッドをブロックしない
  */
 class MqttManager : public QObject
 {
    Q_OBJECT
 
 private:
    QThread m_workerThread;  // ワーカー用スレッド
    MqttWorker *m_worker;    // MQTTワーカーインスタンス
 
 public:
    /**
     * @brief コンストラクタ
     * @param parent 親オブジェクト(デフォルトはnullptr)
     * 
     * ワーカーオブジェクトを生成して、専用スレッドで実行を開始する
     * 必要なシグナル / スロット接続も確立する
     */
    explicit MqttManager(QObject *parent = nullptr) : QObject(parent), m_worker(new MqttWorker)
    {
       // ワーカーを別スレッドに移動
       m_worker->moveToThread(&m_workerThread);
 
       // マネージャーからワーカーへのシグナル接続
       connect(this, &MqttManager::connectRequested, m_worker, &MqttWorker::connectToHost);
       connect(this, &MqttManager::publishRequested, m_worker, &MqttWorker::publishMessage);
       connect(this, &MqttManager::subscribeRequested, m_worker, &MqttWorker::subscribe);
 
       // ワーカーからマネージャーへのシグナル接続
       connect(m_worker, &MqttWorker::messageReceived, this, &MqttManager::messageReceived);
       connect(m_worker, &MqttWorker::connectionStateChanged, this, &MqttManager::connectionStateChanged);
       connect(m_worker, &MqttWorker::errorOccurred, this, &MqttManager::errorOccurred);
 
       // スレッド終了時のクリーンアップ設定
       connect(&m_workerThread, &QThread::finished, m_worker, &MqttWorker::deleteLater);
 
       // ワーカースレッドを開始
       m_workerThread.start();
       qDebug() << "MQTTマネージャー初期化完了";
    }
 
    /**
     * @brief デストラクタ
     * 
     * ワーカースレッドを適切に終了して、リソースを解放する
     */
    ~MqttManager()
    {
       m_workerThread.quit();
       m_workerThread.wait();
       qDebug() << "MQTTマネージャー終了";
    }
 
    /**
     * @brief MQTTブローカーへの接続を要求
     * @param host ブローカーのホスト名またはIPアドレス
     * @param port ブローカーのポート番号
     */
    void connect(const QString &host, quint16 port)
    {
        emit connectRequested(host, port);
        qDebug() << "接続要求:" << host << port;
    }
 
    /**
     * @brief メッセージの発行を要求
     * @param topic 発行先のトピック
     * @param payload 送信するメッセージ内容
     */
    void publish(const QString &topic, const QByteArray &payload)
    {
       emit publishRequested(topic, payload);
       qDebug() << "発行要求:" << topic;
    }
 
    /**
     * @brief トピックの購読を要求
     * @param topic 購読するトピック名
     */
    void subscribe(const QString &topic)
    {
        emit subscribeRequested(topic);
        qDebug() << "購読要求:" << topic;
    }
 
 signals:
    // ワーカーへの要求シグナル
    void connectRequested(const QString &host, quint16 port);               // 接続要求シグナル
    void publishRequested(const QString &topic, const QByteArray &payload); // 発行要求シグナル
    void subscribeRequested(const QString &topic);                          // 購読要求シグナル
 
    // アプリケーションへの通知シグナル
    void messageReceived(const QString &topic, const QByteArray &message);  // メッセージ受信通知
    void connectionStateChanged(bool connected);                            // 接続状態変更通知
    void errorOccurred(const QString &error);                               // エラー発生通知
 };


 // 使用例 : main.cppファイル
 
 #include <QCoreApplication>
 #include "Mqttmanager.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    // MQTTマネージャーのインスタンスを生成
    MqttManager manager;
 
    // 接続状態変更時の処理
    QObject::connect(&manager, &MqttManager::connectionStateChanged, [](bool connected) {
       if (connected) {
          qDebug() << "MQTT接続完了";
       }
       else {
          qDebug() << "MQTT切断";
       }
    });
 
    // メッセージ受信時の処理
    QObject::connect(&manager, &MqttManager::messageReceived, [](const QString &topic, const QByteArray &message) {
       qDebug() << "メッセージ受信 - トピック: " << topic;
       qDebug() << "内容: " << message;
    });
 
    // エラー発生時の処理
    QObject::connect(&manager, &MqttManager::errorOccurred, [](const QString &error) {
       qCritical() << "MQTTエラー: " << error;
    });
 
    // MQTTブローカーへの接続
    manager.connect("<IPアドレスまたはホスト名  例: localhost>", <ポート番号  : 1883>);
 
    // トピックの購読
    manager.subscribe("qt/topic");
 
    // メッセージの発行
    QTimer::singleShot(1000, [&manager]() {
       manager.publish("qt/topic", "Hello、MQTT!");
    });
 
    return a.exec();
 }



qmqtt (EMQX)

qmqttは、EMQXプロジェクトが開発したQt向けのサードパーティMQTTクライアントライブラリである。
Qt公式のQt MQTTモジュール (qt/qtmqtt) とは独立した別物のライブラリであり、名前空間 QMQTT:: を使用する。

3条項BSDライセンスの下で公開されており、商用・オープンソースを問わず制限の少ない条件で使用できる。

現在の状態はメンテナンス段階であり、Qt公式のQt MQTTモジュールが登場して以降は新機能の追加を停止している。
バグ修正と小規模な改善のみを継続して実施しており、最新バージョンは v1.0.5 (2026年3月) である。

qmqttの主な特徴を以下に示す。

  • 対応MQTTバージョン
    MQTT 3.1.0 (MQTTVersion::V3_1_0) および MQTT 3.1.1 (MQTTVersion::V3_1_1) に対応する。
    MQTT 5.0は未対応である。

  • SSL/TLS 対応
    デフォルトで有効となっている。
    OpenSSL 1.0.2以上が必要である。
    CMake または qmake のビルドオプションで無効化することもできる。

  • WebSocket 対応
    Qt 5.7以上で利用可能である。
    ビルドオプションで有効化する必要がある。

  • 自動再接続
    setAutoReconnect() メソッドで自動再接続を有効化できる。
    setAutoReconnectInterval() で再接続間隔を設定できる。

  • 遺言メッセージ (Will Message) 対応
    接続が予期なく断たれた場合にブローカーから自動配信されるメッセージをサポートする。


対応するQtバージョンを以下に示す。

qmqtt対応 Qtバージョン
機能 最小Qtバージョン
基本機能 (TCP接続) Qt 5.3以上
SSL/TLS 機能 Qt 5.3以上 (OpenSSL 1.0.2以上)
WebSocket 機能 Qt 5.7以上
Qt 6 対応 非公式対応のみ (サポート対象外)


Qt公式QtMqttとqmqtt (EMQX) の比較

下表に、Qt公式のQt MQTT と qmqtt (EMQX) の主な相違点を示す。

Qt公式 Qt MQTT と qmqtt (EMQX) の比較
比較項目 Qt公式 QtMqtt qmqtt (EMQX)
開発元 The Qt Company EMQX (Ery Lee他)
ライセンス Commercial + GPL 3.0 3条項BSD
対応MQTTバージョン 3.1, 3.1.1, 5.0 3.1.0, 3.1.1 (5.0は未対応)
WebSocket対応 あり あり
SSL/TLS対応 あり あり (OpenSSL 1.0.2以上)
名前空間 QMqtt:: QMQTT::
主要クラス QMqttClient QMQTT::Client
インストール方法 Qt for Automationの一部 ソースビルドまたはシステムパッケージ
Qtモジュール統合度 Qt公式モジュール 独立したサードパーティライブラリ
メンテナンス状況 継続的 (Qt Company保守) メンテナンス段階 (バグ修正のみ)
Qt6対応 サポート 非公式対応のみ
最小Qtバージョン Qt 5.7以上 Qt 5.3以上


各ライブラリの選択ガイドラインを以下に示す。

Qt MQTTライブラリ選択ガイド
Qt公式 Qt MQTTを選ぶべき場合 qmqtt (EMQX) を選ぶべき場合
  • MQTT 5.0の最新機能が必要な場合
  • 商用サポートが必要な場合
  • 定期的なセキュリティアップデートが重要な場合
  • Qt 6を使用している または 今後使用予定の場合
  • BSDライセンス (制限が少ない) が必要な場合
  • Qt 5.3等の古いQtバージョンを使用中の場合
  • 既存プロジェクトに簡単なMQTT機能のみが必要な場合
  • EMQXブローカーと組み合わせて使用する場合
  • ビルド・統合の簡便性が重要な場合



qmqttのインストール

ソースコードのダウンロード

qmqttのGitHubにアクセスして、ソースコードをダウンロードする。
ダウンロードしたファイルを解凍する。

tar xf qmqtt-<バージョン>.tar.gz
cd qmqtt-<バージョン>


または、git clone コマンドを実行して、qmqttのソースコードをクローンする。

git clone https://github.com/emqx/qmqtt.git
cd qmqtt


ソースコードからインストール (CMake)

CMakeを使用してビルドおよびインストールする。

mkdir build && cd build

# 基本ビルド
cmake ..
cmake --build .

# WebSocket対応ビルド (Qt 5.7以上が必要)
cmake .. -Dqmqtt_WEBSOCKETS=ON
cmake --build .

# SSL無効ビルド (OpenSSLが不要な環境向け)
cmake .. -Dqmqtt_SSL=OFF
cmake --build .

# インストール先を指定する場合
cmake .. -DCMAKE_INSTALL_PREFIX=<任意のインストールディレクトリ>
cmake --build .
cmake --install .


下表に、CMakeビルドオプションの一覧を示す。

CMakeビルドオプション一覧
オプション デフォルト値 説明
qmqtt_SHARED ON ON: 共有ライブラリとしてビルド
OFF: 静的ライブラリとしてビルド
qmqtt_WEBSOCKETS OFF ON: WebSocket対応を有効化 (Qt 5.7以上が必要)
qmqtt_SSL ON ON: SSL/TLS対応を有効化
OFF: SSL/TLS対応を無効化


ソースコードからインストール (qmake)

qmakeを使用してビルドする場合は、以下のコマンドを実行する。

# 基本ビルド
qmake qmqtt.pro
make -j $(nproc)

# WebSocket対応ビルド
qmake "CONFIG+=QMQTT_WEBSOCKETS" qmqtt.pro
make -j $(nproc)

# SSL無効ビルド
qmake "CONFIG+=QT_NO_SSL" qmqtt.pro
make -j $(nproc)


ビルド時の注意事項

Windows環境でビルドする場合の注意事項を以下に示す。

  • Windows 環境での注意
    Windows環境ではgtestがサポートされていないため、NO_UNIT_TESTS の指定が必須である。
    qmakeを使用する場合: qmake "CONFIG+=NO_UNIT_TESTS" qmqtt.pro

  • OpenSSLの要件
    SSL/TLS機能を使用する場合は、OpenSSL 1.0.2以上のインストールが必要である。
    OpenSSLがインストールされていない環境では、qmqtt_SSL=OFF または CONFIG+=QT_NO_SSL で SSLを無効化すること。



qmqtt関連のクラスとメソッド

QMQTT::Clientクラスのコンストラクタ

QMQTT::Client クラスは、接続方式に応じて複数のコンストラクタを提供している。

 // TCP接続 (基本)
 QMQTT::Client(const QHostAddress &host, const quint16 port, QObject *parent = nullptr);
 
 // TCP接続 (SSLフラグ付き)
 QMQTT::Client(const QHostAddress &host, const quint16 port, bool ssl, QObject *parent = nullptr);
 
 // SSL 設定を含む接続
 QMQTT::Client(const QString &hostName, const quint16 port,
               const QSslConfiguration &sslConfig, QObject *parent = nullptr);
 
 // ホスト名指定のTCP / SSL接続 (互換性維持用)
 QMQTT::Client(const QString &hostName, const quint16 port,
               const bool ssl, QObject *parent = nullptr);
 
 // WebSocket接続 (Qt 5.7以上が必要)
 QMQTT::Client(const QUrl &url, const QString &origin,
               const MQTTVersion version, QObject *parent = nullptr);


QMQTT::Clientクラスの主要メソッド

接続関連のメソッド、プロパティ設定メソッド、パブリッシュ / サブスクライブメソッドを以下に示す。

 QMQTT::Client client(QHostAddress::LocalHost, 1883);
 
 // -- 接続関連 --
 // ブローカーへの接続を開始する
 client.connectToHost();
 
 // ブローカーから切断する
 client.disconnectFromHost();
 
 // 現在の接続状態をbool値で確認する
 bool connected = client.isConnectedToHost();
 
 // 現在の接続状態を数値で取得する
 quint32 st = client.state();
 
 // -- プロパティ設定 --
 // ホスト名 / アドレスの設定
 client.setHost(QHostAddress("192.168.1.10"));
 client.setHostName("broker.example.com");
 
 // ポート番号の設定
 client.setPort(1883);
 
 // 認証情報の設定
 client.setUsername("user");
 client.setPassword("password");
 
 // クライアント設定
 client.setClientId("my-client-id");
 
 // MQTTバージョンの設定 (V3_1_0 または V3_1_1)
 client.setVersion(QMQTT::V3_1_1);
 
 // キープアライブ間隔の設定 (秒)
 client.setKeepAlive(60);
 
 // クリーンセッションの設定
 client.setCleanSession(true);
 
 // -- 自動再接続の設定 --
 client.setAutoReconnect(true);
 client.setAutoReconnectInterval(5000);  // 5000 ミリ秒ごとに再接続を試みる
 
 // -- 遺言メッセージ (Will Message) の設定 --
 client.setWillTopic("device/status");
 client.setWillMessage("offline");
 client.setWillQos(1);
 client.setWillRetain(true);
 
 // -- SSL設定 --
 QSslConfiguration sslConfig = QSslConfiguration::defaultConfiguration();
 client.setSslConfiguration(sslConfig);
 
 // SSLエラーを全て無視する
 client.ignoreSslErrors();
 
 // -- パブリッシュ / サブスクライブ --
 // メッセージを発行する (QMQTT::Messageオブジェクトを使用)
 QMQTT::Message message(0, "qmqtt/topic", "Hello, MQTT!");
 quint16 msgId = client.publish(message);
 
 // トピックを購読する (QoS 0はデフォルト値)
 client.subscribe("qmqtt/topic", 0);
 
 // トピックの購読を解除する
 client.unsubscribe("qmqtt/topic");


QMQTT::Clientクラスのシグナル

下表に、QMQTT::Client クラスが提供するシグナルの一覧を示す。

QMQTT::Client シグナル一覧
シグナル パラメータ 説明
connected() なし ブローカーへの接続が成功した時に発行される。
disconnected() なし ブローカーから切断された時に発行される。
subscribed(QString, quint8) topic, qos トピックの購読が成功した時に発行される。
unsubscribed(QString) topic トピックの購読解除が成功した時に発行される。
published(quint16) id メッセージの配信が完了した時に発行される。
received(QMQTT::Message) message メッセージを受信した時に発行される。
error(QMQTT::ClientError) error エラーが発生した時に発行される。
sslErrors(QList<QSslError>) errors SSLエラーが発生した時に発行される。
stateChanged(QMQTT::ClientState) state 接続状態が変化した時に発行される。


シグナルとスロットの接続例を以下に示す。

 QMQTT::Client *client = new QMQTT::Client(QHostAddress::LocalHost, 1883, this);
 
 // 接続成功時の処理
 connect(client, &QMQTT::Client::connected, this, []() {
    qInfo() << "qmqtt: MQTTブローカーに接続しました";
 });
 
 // 切断時の処理
 connect(client, &QMQTT::Client::disconnected, this, []() {
    qInfo() << "qmqtt: MQTTブローカーから切断されました";
 });
 
 // メッセージ受信時の処理
 connect(client, &QMQTT::Client::received, this, [](const QMQTT::Message &message) {
    qDebug() << "受信トピック :" << message.topic();
    qDebug() << "ペイロード   :" << message.payload();
 });
 
 // エラー発生時の処理
 connect(client, &QMQTT::Client::error, this, [](QMQTT::ClientError error) {
    qCritical() << "qmqtt エラー :" << static_cast<int>(error);
 });
 
 // 接続状態変化時の処理
 connect(client, &QMQTT::Client::stateChanged, this, [](QMQTT::ClientState state) {
    qDebug() << "接続状態変化 :" << static_cast<int>(state);
 });


QMQTT::Messageクラス

QMQTT::Message クラスは、MQTT メッセージを表すクラスである。
パブリッシュ時のメッセージ作成 および サブスクライブ時の受信メッセージの読み取り に使用する。

コンストラクタの使用例を以下に示す。

 // デフォルトコンストラクタ
 QMQTT::Message msg1;
 
 // パラメータ付きコンストラクタ
 // (id, topic, payload, qos=0, retain=false, dup=false)
 QMQTT::Message msg2(0, "qmqtt/topic", QByteArray("Hello"), 1, false, false);
 
 // コピーコンストラクタ
 QMQTT::Message msg3(msg2);


下表に、ゲッター / セッターメソッドの一覧を示す。

QMQTT::Message クラスのメソッド一覧
メソッド 戻り値 説明
id() const quint16 メッセージ IDを取得する。
topic() const QString トピック文字列を取得する。
payload() const QByteArray メッセージペイロードを取得する。
qos() const quint8 QoS レベル (0〜2) を取得する。
retain() const bool リテインフラグを取得する。
dup() const bool 重複フラグを取得する。
setId(quint16) void メッセージ IDを設定する。
setTopic(QString) void トピックを設定する。
setPayload(QByteArray) void ペイロードを設定する。
setQos(quint8) void QoSレベルを設定する。
setRetain(bool) void リテインフラグを設定する。
setDup(bool) void 重複フラグを設定する。


メッセージオブジェクトの使用例を以下に示す。

 // QMQTT::Message を使用したパブリッシュの例
 
 QMQTT::Message message;
 message.setTopic("qmqtt/sensor/temperature");
 message.setPayload(QString("25.5").toUtf8());
 message.setQos(1);
 message.setRetain(false);
 
 quint16 msgId = client->publish(message);
 if (msgId == 0) {
    qWarning() << "メッセージの発行に失敗しました";
 }



qmqttによるMQTT通信

CMake / Qtプロジェクトファイル

  • CMakeLists.txt ファイルを使用する場合
     cmake_minimum_required(VERSION 3.9)
     
     project(MyMqttApp)
     
     set(CMAKE_CXX_STANDARD 11)
     set(CMAKE_AUTOMOC ON)
     
     find_package(Qt5 REQUIRED COMPONENTS Core Network)
     find_package(qmqtt REQUIRED)
     
     add_executable(my_mqtt_app
        main.cpp
     )
     
     target_link_libraries(my_mqtt_app PRIVATE
        qmqtt
        Qt5::Core
        Qt5::Network
     )
    

  • Qt プロジェクトファイル (.pro) を使用する場合
     QT = core network
     
     INCLUDEPATH += /usr/local/include/qmqtt
     LIBS += -L/usr/local/lib -lqmqtt
     
     TEMPLATE = app
     TARGET   = my_mqtt_app
     SOURCES += main.cpp
    


  • インクルードヘッダ
     // qmqttの全クラスをまとめてインクルードする
     #include <qmqtt.h>
     
     // 個別にインクルードすることも可能
     #include <qmqtt/qmqtt_client.h>
     #include <qmqtt/qmqtt_message.h>
    


使用例

以下の例では、qmqttライブラリを使用して、MQTT通信でトピックの送受信を行っている。

QMQTT::Clientクラスを継承して、シグナル / スロットを使用した非同期処理を実装している。
接続完了シグナル (connected) を受け取った後にパブリッシュ / サブスクライブを開始することが重要である。

送信 (qmqttパブリッシャ側)
 // QmqttPublisher.h
 
 #include <QObject>
 #include <QTimer>
 #include <QDebug>
 #include <qmqtt.h>
 
 class QmqttPublisher : public QMQTT::Client
 {
    Q_OBJECT
 
 private:
    QTimer m_timer;  // 一定間隔でメッセージを発行するタイマ
    int    m_count;  // 発行回数のカウンタ
 
 public:
    explicit QmqttPublisher(const QHostAddress &host = QHostAddress::LocalHost,
                            const quint16 port = 1883,
                            QObject *parent = nullptr)
       : QMQTT::Client(host, port, parent), m_count(0)
    {
       // 接続成功時のシグナルをハンドラに接続
       connect(this, &QMQTT::Client::connected,
               this, &QmqttPublisher::onConnected);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(this, &QMQTT::Client::error,
               this, &QmqttPublisher::onError);
 
       // タイマのタイムアウトシグナルをハンドラに接続 (1秒間隔)
       m_timer.setInterval(1000);
       connect(&m_timer, &QTimer::timeout,
               this, &QmqttPublisher::onTimeout);
    }
 
 private slots:
    // MQTT ブローカーへの接続が成功した時のハンドラ
    void onConnected()
    {
       qInfo() << "qmqtt: MQTTブローカーに接続しました";
 
       // タイマを開始してメッセージの発行を開始する
       m_timer.start();
    }
 
    // タイマのタイムアウト時のハンドラ (メッセージを発行する)
    void onTimeout()
    {
       // 10回発行したら接続を切断する
       if (m_count >= 10) {
          qInfo() << "qmqtt: 発行完了 - 接続を切断します";
          m_timer.stop();
          disconnectFromHost();
          return;
       }
 
       // QMQTT::Message オブジェクトを作成してメッセージを設定する
       QMQTT::Message message;
       message.setTopic("qmqtt/exampletopic");
       message.setPayload(QString("カウント: %1").arg(m_count).toUtf8());
       message.setQos(0);
       message.setRetain(false);
 
       // メッセージを発行する
       publish(message);
       qDebug() << "qmqtt: 発行 -" << message.payload();
 
       m_count++;
    }
 
    // エラー発生時のハンドラ
    void onError(QMQTT::ClientError error)
    {
       qCritical() << "qmqtt エラー :" << static_cast<int>(error);
    }
 };


 // Publisher側の使用例 (main.cpp)
 
 #include <QCoreApplication>
 #include "QmqttPublisher.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    // ブローカーのホストとポートを指定してパブリッシャを生成する
    QmqttPublisher publisher(QHostAddress("localhost"), 1883);
 
    // ブローカーへの接続を開始する (非同期)
    // 接続完了後に connected シグナルが発行され、タイマが起動する
    publisher.connectToHost();
 
    return a.exec();
 }


受信 (qmqttサブスクライバ側)

以下の例では、qmqtt ライブラリを使用してトピックの購読 (サブスクライブ) を行っている。

 // QmqttSubscriber.h
 
 #include <QObject>
 #include <QDebug>
 #include <qmqtt.h>
 
 class QmqttSubscriber : public QMQTT::Client
 {
    Q_OBJECT
 
 public:
    explicit QmqttSubscriber(const QHostAddress &host = QHostAddress::LocalHost,
                             const quint16 port = 1883,
                             QObject *parent = nullptr)
       : QMQTT::Client(host, port, parent)
    {
       // 接続成功時のシグナルをハンドラに接続
       connect(this, &QMQTT::Client::connected,
               this, &QmqttSubscriber::onConnected);
 
       // メッセージ受信時のシグナルをハンドラに接続
       connect(this, &QMQTT::Client::received,
               this, &QmqttSubscriber::onReceived);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(this, &QMQTT::Client::error,
               this, &QmqttSubscriber::onError);
 
       // 切断時のシグナルをハンドラに接続
       connect(this, &QMQTT::Client::disconnected,
               this, &QmqttSubscriber::onDisconnected);
    }
 
 signals:
    // 受信したメッセージをアプリケーションに通知するシグナル
    void messageReceived(const QString &topic, const QByteArray &payload);
 
 private slots:
    // MQTT ブローカーへの接続が成功した時のハンドラ
    void onConnected()
    {
       qInfo() << "qmqtt: MQTTブローカーに接続しました";
 
       // 接続成功後にトピックを購読する (QoS 0)
       subscribe("qmqtt/exampletopic", 0);
       qInfo() << "qmqtt: トピックを購読開始しました - qmqtt/exampletopic";
    }
 
    // メッセージ受信時のハンドラ
    void onReceived(const QMQTT::Message &message)
    {
       qDebug() << "qmqtt: 受信トピック :" << message.topic();
       qDebug() << "qmqtt: ペイロード   :" << message.payload();
 
       // シグナルを発行してアプリケーションに通知する
       emit messageReceived(message.topic(), message.payload());
    }
 
    // 切断時のハンドラ
    void onDisconnected()
    {
       qInfo() << "qmqtt: MQTTブローカーから切断されました";
    }
 
    // エラー発生時のハンドラ
    void onError(QMQTT::ClientError error)
    {
       qCritical() << "qmqtt エラー :" << static_cast<int>(error);
    }
 };


 // Subscriber側の使用例 (main.cpp)
 
 #include <QCoreApplication>
 #include "QmqttSubscriber.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    // ブローカーのホストとポートを指定してサブスクライバを生成する
    QmqttSubscriber subscriber(QHostAddress("localhost"), 1883);
 
    // メッセージ受信時の処理を設定する
    QObject::connect(&subscriber, &QmqttSubscriber::messageReceived,
                     [](const QString &topic, const QByteArray &payload) {
       qDebug() << "アプリケーション受信 - トピック :" << topic;
       qDebug() << "アプリケーション受信 - 内容     :" << payload;
    });
 
    // ブローカーへの接続を開始する (非同期)
    // 接続完了後にconnectedシグナルが発行され、購読が開始される
    subscriber.connectToHost();
 
    return a.exec();
 }