Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: Add dup filtering to mqtt_relay #3018

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 105 additions & 6 deletions examples/rtl_433_mqtt_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,38 @@
from __future__ import print_function
from __future__ import with_statement

import socket
import json
import logging
import socket
import time

import paho.mqtt.client as mqtt


# The config class represents a config object. The constructor takes
# an optional pathname, and will switch on the suffix (.yaml for now)
# and read a dictionary.
class rtlconfig(object):

# Initialize with default values.
c = {
# Syslog socket configuration
# Log level info (False) or debug (True)
'DEBUG': False,

# Address to listen on for syslog/json messages from rtl_433
'UDP_IP': "127.0.0.1",
'UDP_PORT': 1433,

# MQTT broker configuration
# MQTT broker address and credentials
'MQTT_HOST': "127.0.0.1",
'MQTT_PORT': 1883,
'MQTT_USERNAME': None,
'MQTT_PASSWORD': None,
'MQTT_TLS': False,

# MQTT content
'MQTT_PREFIX': "sensor/rtl_433",
'MQTT_DEDUP': True,
'MQTT_INDIVIDUAL_TOPICS': True,
'MQTT_JSON_TOPIC': True,
}
Expand All @@ -68,17 +78,90 @@ def __init__(self, f=None):
def __getitem__(self, k):
return self.c[k]

class dedup(object):
""" A dedup class object supports deduping a stream of reports by
answering if a report is interesting relative to the history. While
more complicated deduping is allowed by the interface, for now it is
very simple, keeping track of only the previous interesting object.
For now, we more or less require that all reports have the same keys. """

# \todo Consider a cache with several entries.

def __init__(self):
# Make this long enough to skip repeats, but allow messages
# every 10s to come through.
self.duration = 5
# Exclude reception metadata (time and RF).
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
# Initialize storage for what was last sent.
(self.last_report, self.last_now) = (None, None)

def send_store(self, report, n):
""" Record report, n as the last report declared interesting, and
return True (to denote interesting). """
(self.last_report, self.last_now) = (report, n)
return True

def equiv(self, j1, j2):
""" Return True if j1 and j2 are the same, except for boring_keys. """
for (k, v) in j1.items():
# If in boring, we don't care.
if k not in self.boring_keys:
# If in j1 and not j2, they are different.
if k not in j2:
logging.debug("equiv: %s in j1 and not j2" % (k))
return False
if j1[k] != j2[k]:
logging.debug("equiv: %s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
return False
# If the lengths are different, they must be different.
if len(j1) != len(j2):
logging.debug("equiv: len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
return False

# If we get here, then the lengths are the same, and all
# non-boring keys in j1 exist in j2, and have the same value.
# It could be that j2 is missing a boring key and also has a
# new non-boring key, but boring keys in particular should not
# be variable.
return True

# report is a python dictionary
def is_interesting(self, report):
""" If report is intersting, return True and update records of the
most recent interesting report. Otherwise return False. """
n = time.time()

# If previous interesting is missing or empty, accept this one.
if self.last_report is None or self.last_now is None:
logging.debug("interesting: no previous")
return self.send_store(report, n)

# If previous one was too long ago, accept this one.
if n - self.last_now > self.duration:
logging.debug("interesting: time")
return self.send_store(report, n)

if not self.equiv(self.last_report, report):
logging.debug("interesting: different")
return self.send_store(report, n)

return False

# Create a config object, defaults modified by the config file if present.
c = rtlconfig("rtl_433_mqtt_relay.yaml")

# Create a dedup object for later use, even if it's configured off.
d = dedup()

def mqtt_connect(client, userdata, flags, rc):
"""Handle MQTT connection callback."""
print("MQTT connected: " + mqtt.connack_string(rc))
logging.info("MQTT connected: " + mqtt.connack_string(rc))


def mqtt_disconnect(client, userdata, rc):
"""Handle MQTT disconnection callback."""
print("MQTT disconnected: " + mqtt.connack_string(rc))
logging.info("MQTT disconnected: " + mqtt.connack_string(rc))


# Create listener for incoming json string packets.
Expand All @@ -100,6 +183,14 @@ def sanitize(text):
def publish_sensor_to_mqtt(mqttc, data, line):
"""Publish rtl_433 sensor data to MQTT."""

if c['MQTT_DEDUP']:
# If this data is not novel relative to recent data, just skip it.
# Otherwise, send it via MQTT.
if not d.is_interesting(data):
logging.debug(" not interesting")
return
logging.debug( "INTERESTING")

# Construct a topic from the information that identifies which
# device this frame is from.
# NB: id is only used if channel is not present.
Expand Down Expand Up @@ -166,6 +257,7 @@ def rtl_433_probe():
try:
line = parse_syslog(line)
data = json.loads(line)
logging.debug("received %s" % line)
publish_sensor_to_mqtt(mqttc, data, line)

except ValueError:
Expand All @@ -179,8 +271,15 @@ def run():
# uid
# gid
# working_directory
rtl_433_probe()

# Set up logging at INFO, and change to DEBUG if config asks for that.
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',datefmt='%Y-%m-%dT%H:%M:%S%z')
logging.getLogger().setLevel(logging.INFO)
if c['DEBUG']:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("DEBUG LOGGING ENABLED")

rtl_433_probe()

if __name__ == "__main__":
run()
Loading