MQTT
An MQTT broker is an intermediary entity that enables MQTT clients to communicate. Specifically, an MQTT broker receives messages published by clients, filters the messages by topic, and distributes them to subscribers.
Package
mqtt_client: ^9.6.5
A chat app with MQTT server in the flutter
MQTTManager.dartimport 'package:mqtt_client/mqtt_client.dart'; import 'package:flutter_mqtt_app/mqtt/state/MQTTAppState.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; class MQTTManager { // Private instance of client final MQTTAppState _currentState; MqttServerClient? _client; final String _identifier; final String _host; final String _topic; // Constructor // ignore: sort_constructors_first MQTTManager( {required String host, required String topic, required String identifier, required MQTTAppState state}) : _identifier = identifier, _host = host, _topic = topic, _currentState = state; void initializeMQTTClient() { _client = MqttServerClient(_host, _identifier); _client!.port = 1883; _client!.keepAlivePeriod = 20; _client!.onDisconnected = onDisconnected; _client!.secure = false; _client!.logging(on: true); /// Add the successful connection callback _client!.onConnected = onConnected; _client!.onSubscribed = onSubscribed; final MqttConnectMessage connMess = MqttConnectMessage() .withClientIdentifier(_identifier) .withWillTopic( 'willtopic') // If you set this you must set a will message .withWillMessage('My Will message') .startClean() // Non persistent session for testing .withWillQos(MqttQos.atLeastOnce); print('EXAMPLE::Mosquitto client connecting....'); _client!.connectionMessage = connMess; } // Connect to the host // ignore: avoid_void_async void connect() async { assert(_client != null); try { print('EXAMPLE::Mosquitto start client connecting....'); _currentState.setAppConnectionState(MQTTAppConnectionState.connecting); await _client!.connect(); } on Exception catch (e) { print('EXAMPLE::client exception - $e'); disconnect(); } } void disconnect() { print('Disconnected'); _client!.disconnect(); } void publish(String message) { final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder(); builder.addString(message); _client!.publishMessage(_topic, MqttQos.exactlyOnce, builder.payload!); } /// The subscribed callback void onSubscribed(String topic) { print('EXAMPLE::Subscription confirmed for topic $topic'); } /// The unsolicited disconnect callback void onDisconnected() { print('EXAMPLE::OnDisconnected client callback - Client disconnection'); if (_client!.connectionStatus!.returnCode == MqttConnectReturnCode.noneSpecified) { print('EXAMPLE::OnDisconnected callback is solicited, this is correct'); } _currentState.setAppConnectionState(MQTTAppConnectionState.disconnected); } /// The successful connect callback void onConnected() { _currentState.setAppConnectionState(MQTTAppConnectionState.connected); print('EXAMPLE::Mosquitto client connected....'); _client!.subscribe(_topic, MqttQos.atLeastOnce); _client!.updates!.listen((ListMQTTAppState.dart>? c) { // ignore: avoid_as final MqttPublishMessage recMess = c![0].payload as MqttPublishMessage; // final MqttPublishMessage recMess = c![0].payload; final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message!); _currentState.setReceivedText(pt); print( 'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- --="" pt="">'); print(''); }); print( 'EXAMPLE::OnConnected client callback - Client connection was sucessful'); } } -->
import 'package:flutter/cupertino.dart'; enum MQTTAppConnectionState { connected, disconnected, connecting } class MQTTAppState with ChangeNotifier{ MQTTAppConnectionState _appConnectionState = MQTTAppConnectionState.disconnected; String _receivedText = ''; String _historyText = ''; void setReceivedText(String text) { _receivedText = text; _historyText = _historyText + '\n' + _receivedText; notifyListeners(); } void setAppConnectionState(MQTTAppConnectionState state) { _appConnectionState = state; notifyListeners(); } String get getReceivedText => _receivedText; String get getHistoryText => _historyText; MQTTAppConnectionState get getAppConnectionState => _appConnectionState; }mqttView.dart
import 'dart:io' show Platform; import 'package:flutter/material.dart'; import 'package:provider/provider.dart'; import 'package:flutter_mqtt_app/mqtt/state/MQTTAppState.dart'; import 'package:flutter_mqtt_app/mqtt/MQTTManager.dart'; class MQTTView extends StatefulWidget { @override StatecreateState() { return _MQTTViewState(); } } class _MQTTViewState extends State { final TextEditingController _hostTextController = TextEditingController(); final TextEditingController _messageTextController = TextEditingController(); final TextEditingController _topicTextController = TextEditingController(); late MQTTAppState currentAppState; late MQTTManager manager; @override void initState() { super.initState(); /* _hostTextController.addListener(_printLatestValue); _messageTextController.addListener(_printLatestValue); _topicTextController.addListener(_printLatestValue); */ } @override void dispose() { _hostTextController.dispose(); _messageTextController.dispose(); _topicTextController.dispose(); super.dispose(); } /* _printLatestValue() { print("Second text field: ${_hostTextController.text}"); print("Second text field: ${_messageTextController.text}"); print("Second text field: ${_topicTextController.text}"); } */ @override Widget build(BuildContext context) { final MQTTAppState appState = Provider.of (context); // Keep a reference to the app state. currentAppState = appState; final Scaffold scaffold = Scaffold(body: _buildColumn()); return scaffold; } Widget _buildAppBar(BuildContext context) { return AppBar( title: const Text('MQTT'), backgroundColor: Colors.greenAccent, ); } Widget _buildColumn() { return Column( children: [ _buildConnectionStateText( _prepareStateMessageFrom(currentAppState.getAppConnectionState)), _buildEditableColumn(), _buildScrollableTextWith(currentAppState.getHistoryText) ], ); } Widget _buildEditableColumn() { return Padding( padding: const EdgeInsets.all(20.0), child: Column( children: [ _buildTextFieldWith(_hostTextController, 'Enter broker address', currentAppState.getAppConnectionState), const SizedBox(height: 10), _buildTextFieldWith( _topicTextController, 'Enter a topic to subscribe or listen', currentAppState.getAppConnectionState), const SizedBox(height: 10), _buildPublishMessageRow(), const SizedBox(height: 10), _buildConnecteButtonFrom(currentAppState.getAppConnectionState) ], ), ); } Widget _buildPublishMessageRow() { return Row( mainAxisAlignment: MainAxisAlignment.spaceEvenly, children: [ Expanded( child: _buildTextFieldWith(_messageTextController, 'Enter a message', currentAppState.getAppConnectionState), ), _buildSendButtonFrom(currentAppState.getAppConnectionState) ], ); } Widget _buildConnectionStateText(String status) { return Row( children: [ Expanded( child: Container( color: Colors.deepOrangeAccent, child: Text(status, textAlign: TextAlign.center)), ), ], ); } Widget _buildTextFieldWith(TextEditingController controller, String hintText, MQTTAppConnectionState state) { bool shouldEnable = false; if (controller == _messageTextController && state == MQTTAppConnectionState.connected) { shouldEnable = true; } else if ((controller == _hostTextController && state == MQTTAppConnectionState.disconnected) || (controller == _topicTextController && state == MQTTAppConnectionState.disconnected)) { shouldEnable = true; } return TextField( enabled: shouldEnable, controller: controller, decoration: InputDecoration( contentPadding: const EdgeInsets.only(left: 0, bottom: 0, top: 0, right: 0), labelText: hintText, )); } Widget _buildScrollableTextWith(String text) { return Padding( padding: const EdgeInsets.all(20.0), child: Container( width: 400, height: 200, child: SingleChildScrollView( child: Text(text), ), ), ); } Widget _buildConnecteButtonFrom(MQTTAppConnectionState state) { return Row( children: [ Expanded( // ignore: deprecated_member_use child: RaisedButton( color: Colors.lightBlueAccent, child: const Text('Connect'), onPressed: state == MQTTAppConnectionState.disconnected ? _configureAndConnect : null, // ), ), const SizedBox(width: 10), Expanded( // ignore: deprecated_member_use child: RaisedButton( color: Colors.redAccent, child: const Text('Disconnect'), onPressed: state == MQTTAppConnectionState.connected ? _disconnect : null, // ), ), ], ); } Widget _buildSendButtonFrom(MQTTAppConnectionState state) { // ignore: deprecated_member_use return RaisedButton( color: Colors.green, child: const Text('Send'), onPressed: state == MQTTAppConnectionState.connected ? () { _publishMessage(_messageTextController.text); } : null, // ); } // Utility functions String _prepareStateMessageFrom(MQTTAppConnectionState state) { switch (state) { case MQTTAppConnectionState.connected: return 'Connected'; case MQTTAppConnectionState.connecting: return 'Connecting'; case MQTTAppConnectionState.disconnected: return 'Disconnected'; } } void _configureAndConnect() { // ignore: flutter_style_todos // TODO: Use UUID String osPrefix = 'Flutter_iOS'; if (Platform.isAndroid) { osPrefix = 'Flutter_Android'; } manager = MQTTManager( host: _hostTextController.text, topic: _topicTextController.text, identifier: osPrefix, state: currentAppState); manager.initializeMQTTClient(); manager.connect(); } void _disconnect() { manager.disconnect(); } void _publishMessage(String text) { String osPrefix = 'Flutter_iOS'; if (Platform.isAndroid) { osPrefix = 'Flutter_Android'; } final String message = osPrefix + ' says: ' + text; manager.publish(message); _messageTextController.clear(); } }