Skip to content

Reference

docker2mqtt package.

ContainerEventStateType = Literal['on', 'off'] module-attribute

Container event state

ContainerEventStatusType = Literal['paused', 'running', 'stopped', 'destroyed', 'created'] module-attribute

Container event docker status

Docker2Mqtt

docker2mqtt class.

Attributes:

Name Type Description
version str

The version of docker2mqtt

cfg Docker2MqttConfig

The config for docker2mqtt

b_stats bool

Activate the stats

b_events bool

Activate the events

docker_events Queue[str]

Queue with docker events

docker_stats Queue[str]

Queue with docker stats

known_event_containers dict[str, ContainerEvent]

The dict with the known container events

known_stat_containers dict[str, ContainerStatsRef]

The dict with the known container stats references

last_stat_containers dict[str, ContainerStats | dict[str, Any]]

The dict with the last container stats

mqtt Client

The mqtt client

docker_events_t Thread

The thread to collect events from docker

docker_stats_t Thread

The thread to collect stats from docker

docker_version str

The docker version

discovery_binary_sensor_topic str

Topic template for a binary sensor

discovery_sensor_topic str

Topic template for a nary sensor

status_topic str

Topic template for a status value

version_topic str

Topic template for a version value

stats_topic str

Topic template for stats

events_topic str

Topic template for an events

do_not_exit bool

Prevent exit from within docker2mqtt, when handled outside

Initialize the docker2mqtt.

Parameters:

Name Type Description Default
cfg Docker2MqttConfig

The configuration object for docker2mqtt

required
do_not_exit bool

Prevent exit from within docker2mqtt, when handled outside

False
Source code in docker2mqtt/docker2mqtt.py
def __init__(self, cfg: Docker2MqttConfig, do_not_exit: bool = False):
    """Initialize the docker2mqtt.

    Parameters
    ----------
    cfg
        The configuration object for docker2mqtt
    do_not_exit
        Prevent exit from within docker2mqtt, when handled outside

    """

    self.cfg = cfg
    self.do_not_exit = do_not_exit

    self.discovery_binary_sensor_topic = f"{cfg['homeassistant_prefix']}/binary_sensor/{cfg['mqtt_topic_prefix']}/{cfg['docker2mqtt_hostname']}_{{}}/config"
    self.discovery_sensor_topic = f"{cfg['homeassistant_prefix']}/sensor/{cfg['mqtt_topic_prefix']}/{cfg['docker2mqtt_hostname']}_{{}}/config"
    self.status_topic = (
        f"{cfg['mqtt_topic_prefix']}/{cfg['docker2mqtt_hostname']}/status"
    )
    self.version_topic = (
        f"{cfg['mqtt_topic_prefix']}/{cfg['docker2mqtt_hostname']}/version"
    )
    self.stats_topic = (
        f"{cfg['mqtt_topic_prefix']}/{cfg['docker2mqtt_hostname']}/{{}}/stats"
    )
    self.events_topic = (
        f"{cfg['mqtt_topic_prefix']}/{cfg['docker2mqtt_hostname']}/{{}}/events"
    )

    if self.cfg["enable_events"]:
        self.b_events = True
    if self.cfg["enable_stats"]:
        self.b_stats = True

    main_logger.setLevel(self.cfg["log_level"].upper())
    events_logger.setLevel(self.cfg["log_level"].upper())
    stats_logger.setLevel(self.cfg["log_level"].upper())

    try:
        self.docker_version = self._get_docker_version()
    except FileNotFoundError as e:
        raise Docker2MqttConfigException("Could not get docker version") from e

    if not self.do_not_exit:
        main_logger.info("Register signal handlers for SIGINT and SIGTERM")
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)

    main_logger.info("Events enabled: %d", self.b_events)
    main_logger.info("Stats enabled: %d", self.b_stats)

    try:
        # Setup MQTT
        self.mqtt = paho.mqtt.client.Client(
            callback_api_version=paho.mqtt.client.CallbackAPIVersion.VERSION2,  # type: ignore[attr-defined, call-arg]
            client_id=self.cfg["mqtt_client_id"],
        )
        self.mqtt.username_pw_set(
            username=self.cfg["mqtt_user"], password=self.cfg["mqtt_password"]
        )
        self.mqtt.will_set(
            self.status_topic,
            "offline",
            qos=self.cfg["mqtt_qos"],
            retain=True,
        )
        self.mqtt.connect(
            self.cfg["mqtt_host"], self.cfg["mqtt_port"], self.cfg["mqtt_timeout"]
        )
        self.mqtt.loop_start()
        self._mqtt_send(self.status_topic, "online", retain=True)
        self._mqtt_send(self.version_topic, self.version, retain=True)

    except paho.mqtt.client.WebsocketConnectionError as ex:
        main_logger.exception("Error while trying to connect to MQTT broker.")
        main_logger.debug(ex)
        raise Docker2MqttConnectionException from ex

    # Register containers with HA
    docker_ps = subprocess.run(
        DOCKER_PS_CMD, capture_output=True, text=True, check=False
    )
    for line in docker_ps.stdout.splitlines():
        container_status = json.loads(line)

        if self._filter_container(container_status["Names"]):
            status_str: ContainerEventStatusType
            state_str: ContainerEventStateType

            if "Paused" in container_status["Status"]:
                status_str = "paused"
                state_str = "off"
            elif "Up" in container_status["Status"]:
                status_str = "running"
                state_str = "on"
            else:
                status_str = "stopped"
                state_str = "off"

            if self.b_events:
                self._register_container(
                    {
                        "name": container_status["Names"],
                        "image": container_status["Image"],
                        "status": status_str,
                        "state": state_str,
                    }
                )

    started = False
    try:
        if self.b_events:
            logging.info("Starting Events thread")
            self._start_readline_events_thread()
            started = True
    except Exception as ex:
        main_logger.exception("Error while trying to start events thread.")
        main_logger.debug(ex)
        raise Docker2MqttConfigException from ex

    try:
        if self.b_stats:
            started = True
            logging.info("Starting Stats thread")
            self._start_readline_stats_thread()
    except Exception as ex:
        main_logger.exception("Error while trying to start stats thread.")
        main_logger.debug(ex)
        raise Docker2MqttConfigException from ex

    if started is False:
        logging.critical("Nothing started, check your config!")
        sys.exit(1)

__del__

Destroy the class.

Source code in docker2mqtt/docker2mqtt.py
def __del__(self) -> None:
    """Destroy the class."""
    self._cleanup()

loop

Start the loop.

Raises:

Type Description
Docker2MqttEventsException

If anything goes wrong in the processing of the events

Docker2MqttStatsException

If anything goes wrong in the processing of the stats

Docker2MqttException

If anything goes wrong outside of the known exceptions

Source code in docker2mqtt/docker2mqtt.py
def loop(self) -> None:
    """Start the loop.

    Raises
    ------
    Docker2MqttEventsException
        If anything goes wrong in the processing of the events
    Docker2MqttStatsException
        If anything goes wrong in the processing of the stats
    Docker2MqttException
        If anything goes wrong outside of the known exceptions

    """

    self._remove_destroyed_containers()

    self._handle_events_queue()

    self._handle_stats_queue()

    try:
        if self.b_events and not self.docker_events_t.is_alive():
            main_logger.warning("Restarting events thread")
            self._start_readline_events_thread()
    except Exception as ex:
        main_logger.exception("Error while trying to restart events thread.")
        main_logger.debug(ex)
        raise Docker2MqttConfigException from ex

    try:
        if self.b_stats and not self.docker_stats_t.is_alive():
            main_logger.warning("Restarting stats thread")
            self._start_readline_stats_thread()
    except Exception as ex:
        main_logger.exception("Error while trying to restart stats thread.")
        main_logger.debug(ex)
        raise Docker2MqttConfigException from ex

loop_busy

Start the loop (blocking).

Parameters:

Name Type Description Default
raise_known_exceptions bool

Should any known processing exception be raised or ignored

False

Raises:

Type Description
Docker2MqttEventsException

If anything goes wrong in the processing of the events

Docker2MqttStatsException

If anything goes wrong in the processing of the stats

Docker2MqttException

If anything goes wrong outside of the known exceptions

Source code in docker2mqtt/docker2mqtt.py
def loop_busy(self, raise_known_exceptions: bool = False) -> None:
    """Start the loop (blocking).

    Parameters
    ----------
    raise_known_exceptions
        Should any known processing exception be raised or ignored

    Raises
    ------
    Docker2MqttEventsException
        If anything goes wrong in the processing of the events
    Docker2MqttStatsException
        If anything goes wrong in the processing of the stats
    Docker2MqttException
        If anything goes wrong outside of the known exceptions

    """

    while True:
        try:
            self.loop()
        except Docker2MqttEventsException as ex:
            if raise_known_exceptions:
                raise ex  # noqa: TRY201
            else:
                main_logger.warning(
                    "Do not raise due to raise_known_exceptions=False: %s", str(ex)
                )
        except Docker2MqttStatsException as ex:
            if raise_known_exceptions:
                raise ex  # noqa: TRY201
            else:
                main_logger.warning(
                    "Do not raise due to raise_known_exceptions=False: %s", str(ex)
                )

        # Calculate next iteration between (~0.2s and 0.001s)
        sleep_time = 0.001 + 0.2 / MAX_QUEUE_SIZE * (
            MAX_QUEUE_SIZE
            - max(self.docker_events.qsize(), self.docker_stats.qsize())
        )
        main_logger.debug("Sleep for %.5fs until next iteration", sleep_time)
        sleep(sleep_time)

Docker2MqttConfigException

Bases: Docker2MqttException

Config exception occurred.

Docker2MqttConnectionException

Bases: Docker2MqttException

Connection exception occurred.

Docker2MqttEventsException

Bases: Docker2MqttException

Events processing exception occurred.

Docker2MqttException

Bases: Exception

General processing exception occurred.

Docker2MqttStatsException

Bases: Docker2MqttException

Stats processing exception occurred.

ContainerDeviceEntry

Bases: TypedDict

A container device entry object for discovery in home assistant.

Attributes:

Name Type Description
identifiers str

A unique str to identify the device in home assistant

name str

The name of the device to display in home assistant

model str

The model of the device as additional info

ContainerEntry

Bases: TypedDict

A container entry object for discovery in home assistant.

Attributes:

Name Type Description
name str

The name of the sensor to display in home assistant

unique_id str

The unique id of the sensor in home assistant

icon str | None

The icon of the sensor to display

availability_topic str

The topic to check the availability of the sensor

payload_available str

The payload of availability_topic of the sensor when available

payload_unavailable

The payload of availability_topic of the sensor when unavailable

state_topic str

The topic containing all information for the state of the sensor

value_template str

The jinja2 template to extract the state value from the state_topic for the sensor

unit_of_measurement str | None

The unit of measurement of the sensor

payload_on str | None

When a binary sensor: The value of extracted state of the sensor to be considered 'on'

payload_off str | None

When a binary sensor: The value of extracted state of the sensor to be considered 'off'

device ContainerDeviceEntry

The device the sensor is attributed to

device_class str | None

The device class of the sensor

state_topic str

The topic containing all information for the attributes of the sensor

qos int

The QOS of the discovery message

ContainerEvent

Bases: TypedDict

A container event object to send to an mqtt topic.

Attributes:

Name Type Description
name str

The name of the container

image str

The image the container is running

status ContainerEventStatusType

The docker status the container is in

state ContainerEventStateType

The state of the container

ContainerStats

Bases: TypedDict

A container stats object to send to an mqtt topic.

Attributes:

Name Type Description
name str

The name of the container

host str

The docker host

memory str

Human-readable memory information from docker

memoryused float

Used memory in MB

memorylimit float

Memory limit in MB

netio str

Human-readable network information from docker

netinput float

Network input in MB

netinputrate float

Network input rate in MB/s

netoutput float

Network output in MB

netoutputrate float

Network output rate in MB/s

blockinput float

Block (to disk) input in MB

blockinputrate float

Block (to disk) input rate in MB/s

blockoutput float

Block (to disk) output in MB

blockoutputrate float

Block (to disk) output rate in MB/s

cpu float

The cpu usage by the container in cpu-% (ex.: a docker with 4 cores has 400% cpu available)

ContainerStatsRef

Bases: TypedDict

A container stats ref object compare between current and past stats.

Attributes:

Name Type Description
key str

The reference key of a stat rotation

last datetime

When the last stat rotation happened

Docker2MqttConfig

Bases: TypedDict

A config object.

Attributes:

Name Type Description
log_level str

Log verbosity

destroyed_container_ttl int

How long, in seconds, before destroyed containers are removed from Home Assistant. Containers won't be removed if the service is restarted before the TTL expires.

homeassistant_prefix str

MQTT discovery topic prefix

docker2mqtt_hostname str

A descriptive name for the docker being monitored

mqtt_client_id str

Client Id for MQTT broker client

mqtt_user str

Username for MQTT broker authentication

mqtt_password str

Password for MQTT broker authentication

mqtt_host str

Hostname or IP address of the MQTT broker

mqtt_port int

Port or IP address of the MQTT broker

mqtt_timeout int

Timeout for MQTT messages

mqtt_topic_prefix str

MQTT topic prefix

mqtt_qos int

QOS for standard MQTT messages

container_whitelist list[str]

Whitelist the containers to monitor, if empty, everything is monitored. The entries are either match as literal strings or as regex.

container_blacklist list[str]

Blacklist the containers to monitor, takes priority over whitelist. The entries are either match as literal strings or as regex.

enable_events bool

Flag to enable event monitoring

enable_stats bool

Flag to enable stat monitoring

stats_record_seconds int

Interval every how many seconds the stats are published via MQTT

clean_for_discovery

Cleanup a typed dict for home assistant discovery, which is quite picky and does not like empty of None values.

Parameters:

Name Type Description Default
val ContainerEntry

The TypedDict to cleanup

required

Returns:

Type Description
dict

The cleaned dict

Source code in docker2mqtt/helpers.py
def clean_for_discovery(
    val: ContainerEntry,
) -> dict[str, str | int | float | object]:
    """Cleanup a typed dict for home assistant discovery, which is quite picky and does not like empty of None values.

    Parameters
    ----------
    val
        The TypedDict to cleanup

    Returns
    -------
    dict
        The cleaned dict

    """

    return {
        k: v
        for k, v in dict(val).items()
        if isinstance(v, str | int | float | object) and v not in (None, "")
    }