mqtt
python
timescaledb
iot
telemetry
MQTT to TimescaleDB Bridge
A Python script that subscribes to HiveMQ Cloud and writes telemetry data to TimescaleDB.
Why a Bridge?
HiveMQ Cloud (MQTT) ──► Bridge Script ──► TimescaleDB
(Python)
The bridge:
- Subscribes to
car/telemetrytopic on HiveMQ Cloud - Parses incoming JSON messages
- Inserts data into TimescaleDB with timestamps
Setup
Directory Structure
~/telemetry-bridge/
├── pyproject.toml # UV project config
├── uv.lock # Dependency lock file
└── mqtt_to_timescale.py
Initialize with UV
mkdir -p ~/telemetry-bridge
cd ~/telemetry-bridge
uv init
uv add paho-mqtt psycopg2-binary
The Script
mqtt_to_timescale.py:
#!/usr/bin/env python3
import json
import ssl
import paho.mqtt.client as mqtt
import psycopg2
# HiveMQ Cloud settings
HIVEMQ_HOST = "xxxxx.s1.eu.hivemq.cloud"
HIVEMQ_PORT = 8883
HIVEMQ_USER = "esp32"
HIVEMQ_PASS = "your_password"
# TimescaleDB connection
conn = psycopg2.connect(
host="localhost",
port=5433,
database="telemetry",
user="postgres",
password="telemetry123"
)
conn.autocommit = True
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to HiveMQ Cloud!")
client.subscribe("car/telemetry")
else:
print(f"Connection failed: {rc}")
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
cur = conn.cursor()
cur.execute("""
INSERT INTO car_metrics (
time, rpm, speed, coolant, intake_temp,
throttle, engine_load, map_kpa, fuel_level,
timing_adv, battery_voltage
)
VALUES (NOW(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
data.get('rpm'),
data.get('speed'),
data.get('coolant'),
data.get('intake_temp'),
data.get('throttle'),
data.get('engine_load'),
data.get('map_kpa'),
data.get('fuel_level'),
data.get('timing_adv'),
data.get('battery_voltage')
))
print(f"Inserted: {data}")
except Exception as e:
print(f"Error: {e}")
def main():
client = mqtt.Client()
client.username_pw_set(HIVEMQ_USER, HIVEMQ_PASS)
client.tls_set(tls_version=ssl.PROTOCOL_TLS)
client.on_connect = on_connect
client.on_message = on_message
print(f"Connecting to {HIVEMQ_HOST}...")
client.connect(HIVEMQ_HOST, HIVEMQ_PORT, 60)
client.loop_forever()
if __name__ == "__main__":
main()
Running
cd ~/telemetry-bridge
uv run python mqtt_to_timescale.py
Output:
Connecting to xxxxx.s1.eu.hivemq.cloud...
Connected to HiveMQ Cloud!
Inserted: {'rpm': 2500, 'speed': 80, ...}
Running as Background Service
Option 1: tmux/screen
tmux new -s bridge
uv run python mqtt_to_timescale.py
# Ctrl+B, D to detach
# tmux attach -t bridge to reattach
Option 2: launchd (macOS service)
Create ~/Library/LaunchAgents/com.telemetry.bridge.plist:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.telemetry.bridge</string>
<key>ProgramArguments</key>
<array>
<string>/Users/YOUR_USER/.local/bin/uv</string>
<string>run</string>
<string>python</string>
<string>mqtt_to_timescale.py</string>
</array>
<key>WorkingDirectory</key>
<string>/Users/YOUR_USER/telemetry-bridge</string>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>/tmp/bridge.log</string>
<key>StandardErrorPath</key>
<string>/tmp/bridge.err</string>
</dict>
</plist>
Load:
launchctl load ~/Library/LaunchAgents/com.telemetry.bridge.plist
Testing
Send a test message:
mosquitto_pub \
-h xxxxx.s1.eu.hivemq.cloud \
-p 8883 \
-u esp32 \
-P 'password' \
--capath /etc/ssl/certs/ \
-t "car/telemetry" \
-m '{"rpm":3000,"speed":100,"coolant":92,"intake_temp":38,"throttle":50,"engine_load":65,"map_kpa":105,"fuel_level":70,"timing_adv":15,"battery_voltage":14.2}'
Verify in database:
psql -h localhost -p 5433 -U postgres -d telemetry \
-c "SELECT * FROM car_metrics ORDER BY time DESC LIMIT 1;"
Error Handling Improvements
For production, add reconnection logic:
def on_disconnect(client, userdata, rc):
print(f"Disconnected (rc={rc}), reconnecting...")
while True:
try:
client.reconnect()
break
except:
time.sleep(5)
client.on_disconnect = on_disconnect
Environment Variables (Better Practice)
Instead of hardcoding credentials:
import os
HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST")
HIVEMQ_USER = os.environ.get("HIVEMQ_USER")
HIVEMQ_PASS = os.environ.get("HIVEMQ_PASS")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "telemetry123")
Run with:
HIVEMQ_HOST=xxx.hivemq.cloud HIVEMQ_USER=esp32 HIVEMQ_PASS=xxx uv run python mqtt_to_timescale.py
mosquitto_pub
-h YOUR_HIVEMQ_HOST
-p 8883
-u YOUR_USERNAME
-P 'YOUR_PASSWORD'
--cafile /etc/ssl/cert.pem
-t "car/telemetry"
-m '{"rpm":9999,"speed":123,"coolant":99}'