← Back to Garden
budding ·
mqtt python timescaledb iot telemetry

MQTT to TimescaleDB Bridge

A Python script that subscribes to HiveMQ Cloud and writes telemetry data to TimescaleDB.

Why a Bridge?

HiveMQ Cloud (MQTT) ──► Bridge Script ──► TimescaleDB
                        (Python)

The bridge:

  • Subscribes to car/telemetry topic on HiveMQ Cloud
  • Parses incoming JSON messages
  • Inserts data into TimescaleDB with timestamps

Setup

Directory Structure

~/telemetry-bridge/
├── pyproject.toml      # UV project config
├── uv.lock             # Dependency lock file
└── mqtt_to_timescale.py

Initialize with UV

mkdir -p ~/telemetry-bridge
cd ~/telemetry-bridge
uv init
uv add paho-mqtt psycopg2-binary

The Script

mqtt_to_timescale.py:

#!/usr/bin/env python3
import json
import ssl
import paho.mqtt.client as mqtt
import psycopg2

# HiveMQ Cloud settings
HIVEMQ_HOST = "xxxxx.s1.eu.hivemq.cloud"
HIVEMQ_PORT = 8883
HIVEMQ_USER = "esp32"
HIVEMQ_PASS = "your_password"

# TimescaleDB connection
conn = psycopg2.connect(
    host="localhost",
    port=5433,
    database="telemetry",
    user="postgres",
    password="telemetry123"
)
conn.autocommit = True

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to HiveMQ Cloud!")
        client.subscribe("car/telemetry")
    else:
        print(f"Connection failed: {rc}")

def on_message(client, userdata, msg):
    try:
        data = json.loads(msg.payload.decode())
        cur = conn.cursor()
        cur.execute("""
            INSERT INTO car_metrics (
                time, rpm, speed, coolant, intake_temp,
                throttle, engine_load, map_kpa, fuel_level,
                timing_adv, battery_voltage
            )
            VALUES (NOW(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            data.get('rpm'),
            data.get('speed'),
            data.get('coolant'),
            data.get('intake_temp'),
            data.get('throttle'),
            data.get('engine_load'),
            data.get('map_kpa'),
            data.get('fuel_level'),
            data.get('timing_adv'),
            data.get('battery_voltage')
        ))
        print(f"Inserted: {data}")
    except Exception as e:
        print(f"Error: {e}")

def main():
    client = mqtt.Client()
    client.username_pw_set(HIVEMQ_USER, HIVEMQ_PASS)
    client.tls_set(tls_version=ssl.PROTOCOL_TLS)
    client.on_connect = on_connect
    client.on_message = on_message

    print(f"Connecting to {HIVEMQ_HOST}...")
    client.connect(HIVEMQ_HOST, HIVEMQ_PORT, 60)
    client.loop_forever()

if __name__ == "__main__":
    main()

Running

cd ~/telemetry-bridge
uv run python mqtt_to_timescale.py

Output:

Connecting to xxxxx.s1.eu.hivemq.cloud...
Connected to HiveMQ Cloud!
Inserted: {'rpm': 2500, 'speed': 80, ...}

Running as Background Service

Option 1: tmux/screen

tmux new -s bridge
uv run python mqtt_to_timescale.py
# Ctrl+B, D to detach
# tmux attach -t bridge to reattach

Option 2: launchd (macOS service)

Create ~/Library/LaunchAgents/com.telemetry.bridge.plist:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>Label</key>
    <string>com.telemetry.bridge</string>
    <key>ProgramArguments</key>
    <array>
        <string>/Users/YOUR_USER/.local/bin/uv</string>
        <string>run</string>
        <string>python</string>
        <string>mqtt_to_timescale.py</string>
    </array>
    <key>WorkingDirectory</key>
    <string>/Users/YOUR_USER/telemetry-bridge</string>
    <key>RunAtLoad</key>
    <true/>
    <key>KeepAlive</key>
    <true/>
    <key>StandardOutPath</key>
    <string>/tmp/bridge.log</string>
    <key>StandardErrorPath</key>
    <string>/tmp/bridge.err</string>
</dict>
</plist>

Load:

launchctl load ~/Library/LaunchAgents/com.telemetry.bridge.plist

Testing

Send a test message:

mosquitto_pub \
  -h xxxxx.s1.eu.hivemq.cloud \
  -p 8883 \
  -u esp32 \
  -P 'password' \
  --capath /etc/ssl/certs/ \
  -t "car/telemetry" \
  -m '{"rpm":3000,"speed":100,"coolant":92,"intake_temp":38,"throttle":50,"engine_load":65,"map_kpa":105,"fuel_level":70,"timing_adv":15,"battery_voltage":14.2}'

Verify in database:

psql -h localhost -p 5433 -U postgres -d telemetry \
  -c "SELECT * FROM car_metrics ORDER BY time DESC LIMIT 1;"

Error Handling Improvements

For production, add reconnection logic:

def on_disconnect(client, userdata, rc):
    print(f"Disconnected (rc={rc}), reconnecting...")
    while True:
        try:
            client.reconnect()
            break
        except:
            time.sleep(5)

client.on_disconnect = on_disconnect

Environment Variables (Better Practice)

Instead of hardcoding credentials:

import os

HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST")
HIVEMQ_USER = os.environ.get("HIVEMQ_USER")
HIVEMQ_PASS = os.environ.get("HIVEMQ_PASS")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "telemetry123")

Run with:

HIVEMQ_HOST=xxx.hivemq.cloud HIVEMQ_USER=esp32 HIVEMQ_PASS=xxx uv run python mqtt_to_timescale.py

mosquitto_pub
-h YOUR_HIVEMQ_HOST
-p 8883
-u YOUR_USERNAME
-P 'YOUR_PASSWORD'
--cafile /etc/ssl/cert.pem
-t "car/telemetry"
-m '{"rpm":9999,"speed":123,"coolant":99}'