-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbus-pvoutput.py
151 lines (121 loc) · 4.56 KB
/
dbus-pvoutput.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/python -u
import sys, os
import logging
from functools import partial
from collections import Mapping
from datetime import datetime
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gobject
import requests
INTERVAL = 300000
PVOUTPUT = "https://pvoutput.org/service/r2/addstatus.jsp"
APIKEY = "YOUR_API_KEY_HERE"
SYSTEMID = "12345"
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def find_services(bus, tp):
return [str(service) for service in bus.list_names() \
if service.startswith('com.victronenergy.{}'.format(tp))]
class smart_dict(dict):
""" Dictionary that can be accessed via attributes. """
def __getattr__(self, k):
try:
v = self[k]
if isinstance(v, Mapping):
return self.__class__(v)
return v
except KeyError:
raise AttributeError(k)
def __setattr__(self, k, v):
self[k] = v
dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16,
dbus.UInt32, dbus.Int64, dbus.UInt64)
def unwrap_dbus_value(val):
"""Converts D-Bus values back to the original type. For example if val is
of type DBus.Double, a float will be returned."""
if isinstance(val, dbus_int_types):
return int(val)
if isinstance(val, dbus.Double):
return float(val)
return None
def set_state(state, key, v):
state[key] = value = unwrap_dbus_value(v["Value"])
def query(conn, service, path):
return conn.call_blocking(service, path, None, "GetValue", '', [])
def track(conn, state, service, path, target):
# Initialise state
state[target] = value = unwrap_dbus_value(query(conn, service, path))
# And track it
conn.add_signal_receiver(partial(set_state, state, target),
dbus_interface='com.victronenergy.BusItem',
signal_name='PropertiesChanged',
path=path,
bus_name=service)
def main():
logging.basicConfig(level=logging.INFO)
DBusGMainLoop(set_as_default=True)
conn = dbus.SystemBus()
generators = smart_dict()
consumers = smart_dict()
stats = smart_dict()
# Set the user timezone
if 'TZ' not in os.environ:
tz = query(conn, "com.victronenergy.settings", "/Settings/System/TimeZone")
if tz is not None:
os.environ['TZ'] = tz
# Find solarcharger services
solarchargers = find_services(conn, 'solarcharger')
logger.info("Found solarchargers at %s", ', '.join(solarchargers))
# Find grid meters
meters = find_services(conn, 'grid')
logger.info("Found grid meters at %s", ', '.join(meters))
# Find vebus service
vebus = str(query(conn, "com.victronenergy.system", "/VebusService"))
logger.info("Found vebus at %s", vebus)
# Track solarcharger yield
for charger in solarchargers:
track(conn, generators, charger, "/Yield/User", charger)
# Track grid consumption
for meter in meters:
track(conn, consumers, meter, "/Ac/L1/Energy/Forward", meter)
# Track vebus consumption, from battery to input and output
track(conn, consumers, vebus, "/Energy/InverterToAcOut", "c1")
track(conn, consumers, vebus, "/Energy/InverterToAcIn1", "c2")
# Track power values
track(conn, stats, "com.victronenergy.system", "/Ac/Consumption/L1/Power", "pc")
track(conn, stats, "com.victronenergy.system", "/Dc/Pv/Power", "pg")
# Track battery volts, from battery to input and output
track(conn, stats, "com.victronenergy.system", "/Dc/Battery/Voltage", "v6")
# Periodic work
def _upload():
energy_generated = sum(filter(None, generators.itervalues()))
energy_consumed = sum(filter(None, consumers.itervalues()))
logger.info("EG: %.2f, EC: %.2f, PG: %.2f, PC: %.2f", energy_generated,
energy_consumed, stats.pg, stats.pc)
# Post the values to pvoutput
now = datetime.now()
payload = {
"d": now.strftime("%Y%m%d"),
"t": now.strftime("%H:%M"),
"v1": int(energy_generated*1000),
"v2": int(stats.pg),
"v3": int(energy_consumed*1000),
"v4": int(stats.pc),
"v6": int(stats.v6),
"c1": 1
}
try:
requests.post(PVOUTPUT,
headers={
"X-Pvoutput-Apikey": APIKEY,
"X-Pvoutput-SystemId": SYSTEMID
}, data=payload)
except:
pass
return True
_upload()
gobject.timeout_add(INTERVAL, _upload)
gobject.MainLoop().run()
if __name__ == "__main__":
main()