mqttasgi

MQTT ASGI Protocol Server

35
15
Python

mqttasgi - MQTT ASGI Protocol Server for Django

mqttasgi is an ASGI protocol server that implements a complete interface for MQTT for the Django development framework. Built following daphne protocol server.

Features

  • Publish / Subscribe to any topic
  • Multiple workers to handle different topics / subscriptions.
  • Full Django ORM support within consumers.
  • Full Channel Layers support.
  • Full testing consumer to enable TDD.
  • Lightweight.
  • Django 3.x, 4.x / Channels 3.x support

Instalation

To install mqttasgi for Django 3.x, 4.x

pip install mqttasgi

IMPORTANT NOTE: If legacy support for Django 2.x is required install latest 0.x mqttasgi.

Usage

Unit

Mqttasgi provides a cli interface to run the protocol server.

mqttasgi -H localhost -p 1883 my_application.asgi:application

Parameters:

Parameter Explanation Environment variable Default
-H / --host MQTT broker host MQTT_HOSTNAME localhost
-p / --port MQTT broker port MQTT_PORT 1883
-c / --cleansession MQTT Clean Session MQTT_CLEAN True
-v / --verbosity Logging verbosity VERBOSITY 0
-U / --username MQTT Username MQTT_USERNAME
-P / --password MQTT Password MQTT_PASSWORD
-i / --id MQTT Client ID MQTT_CLIENT_ID
-C / --cert TLS Certificate TLS_CERT
-K / --key TLS Key TLS_KEY
-S / --cacert TLS CA Certificate TLS_CA
-SSL / --use-ssl Use ssl (without certificate authentication) MQTT_USE_SSL False
-T / --transport Transport type (tcp or websockets) MQTT_TRANSPORT tcp
-r / --retries Num. retries on disconnect MQTT_RETRIES 3
Last argument ASGI Apllication

Environment variables are supported and can be set using a .env file on the root of the project, but passing a parameter overrides this value.

Consumer

To add your consumer to the asgi.py file in your django application:

import os
import django
from channels.routing import ProtocolTypeRouter
from my_application.consumers import MyMqttConsumer
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_application.settings')

django.setup()

application = ProtocolTypeRouter({
        'http': get_asgi_application(),
        'mqtt': MyMqttConsumer.as_asgi(),
    })

Your consumer should inherit from MqttConsumer in mqttasgi.consumers. It implements helper functions such as publish and subscribe. A simple example:

from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('my/testing/topic', 2)

    async def receive(self, mqtt_message):
        print('Received a message at topic:', mqtt_message['topic'])
        print('With payload', mqtt_message['payload'])
        print('And QOS:', mqtt_message['qos'])
        pass

    async def disconnect(self):
        await self.unsubscribe('my/testing/topic')
    

Consumer API

MQTT

The consumer provides a full API to interact with MQTT and with the channel layer:

Publish

Publishes a message to the MQTT topic passed as argument with the given payload. The QOS of the message or retain flagged can be also passed as aditional arguments.


await self.publish(topic, payload, qos=1, retain=False)

Subscribe

Subscribes to the MQTT topic passed as argument with the given QOS.

await self.subscribe(topic, qos)

Unsubscribe

Unsubscribes from the given MQTT topic.

await self.unsubscribe(topic)

Worker API - Experimental

This is an advanced functionality of the MQTTAsgi protocol server that allows the user to run multiple consumers on the same mqttasgi instance.

Spawn Worker

The app_id is a unique identifier for the worker, the consumer_path is the dot separated path to the consumer and consumer_params is the parameter dictonary to pass down to the new consumer.

await self.spawn_worker(app_id, consumer_path, consumer_params)

Kill Worker

The consumer can also kill the spawned workers with a specific app_id:

await self.kill_worker(self, app_id)

Channel Layers

MQTTAsgi supports channel layer communications and group messages. It follows the Channel Layers implementation:

Outside of the consumer:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)("my.group", {"type": "my.custom.message", "text":"Hi from outside of the consumer"})

In the consumer:

from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('my/testing/topic', 2)
        await self.channel_layer.group_add("my.group", self.channel_name)

    async def receive(self, mqtt_message):
        print('Received a message at topic:', mqtt_message['topic'])
        print('With payload', mqtt_message['payload'])
        print('And QOS:', mqtt_message['qos'])
        pass
    
    async def my_custom_message(self, event):
        print('Received a channel layer message')
        print(event)

    async def disconnect(self):
        await self.unsubscribe('my/testing/topic')

Supporters

MAPER - IIOT Asset Monitoring - Webpage

Maper Logo

Predict failures before they happen.

Real time health monitoring to avoid unexpected downtimes and organize maintenance in industrial plants.

Combining IoT Technology and Artificial Intelligence, we deliver a complete view of your assets like never before.

With real time health diagnostics you will increase the reliability of the whole production process, benefitting both the company and it’s staff.