-
Notifications
You must be signed in to change notification settings - Fork 1
/
bitalino.py
123 lines (104 loc) · 3.99 KB
/
bitalino.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from pylsl import StreamInlet, resolve_stream, lost_error
import datetime
import os
import time
import timesync_nict
# ntp server to request timestamp. Please update the link if you are in another timezone.
ntp_server_host = 'ntp.nict.jp'
def bitalino_handler(sio, person, mac_address, eeg_flag, ecg_flag, eda_flag, emg_flag, freq):
global ntp_server_host
bitalino_fname = ''
eeg =[]
ecg=[]
emg=[]
eda=[]
channeloptions = ['eeg', 'ecg', 'eda', 'emg']
channels = []
i = 0
for flag in [eeg_flag, ecg_flag, eda_flag, emg_flag]:
if flag == True:
channels.append(channeloptions[i])
i += 1
print("# Looking for an available OpenSignals stream from the specified device...")
os_stream = resolve_stream("type", mac_address)
# Create an inlet to receive signal samples from the stream
inlet = StreamInlet(os_stream[0], recover = False)
## log file name
now = datetime.datetime.now()
myroot = 'data'
os.makedirs(myroot, exist_ok=True)
snow = now.strftime('bitalino-%y%m%d-%H%M')
bitalino_fname = "%s/%s.csv" % (myroot, snow)
# write sensor types as titles of each column
channeltitlelist = ''
channeltitlelist += 'timestamp,'
for s in channels:
channeltitle = "%s"%s
if s != channels[-1]:
channeltitlelist += channeltitle + ','
else:
channeltitlelist += channeltitle + '\n'
with open(bitalino_fname, "a") as f:
f.write(channeltitlelist)
# a parameter of weighted moving average to smoothen signals
weighted_avg_param = 0.8
# a list of signals smoothened by a filter
corrected = [0, 0, 0, 0]
timestamp_dt = datetime.datetime.now()
while True:
channeldata = [None for i in channels]
try:
# Receive samples
samples, timestamp = inlet.pull_sample()
#print(timestamp)
except lost_error as e:
print('Connection from the BITalino device is lost')
os._exit(0)
channeldatastrlist = ''
#timestamp = datetime.datetime.now().timestamp()
num = int(samples[0])
if num == 0:
timestamp_dt = datetime.datetime.now()
else:
timestamp_dt += datetime.timedelta(microseconds = int(1/freq * 1e6 - 100))
#time.sleep(0.01)
#print(timestamp_dt)
#timestamp = datetime.datetime.timestamp(timestamp_dt)
ntp_client = timesync_nict.MyNTPClient(ntp_server_host)
timestamp = ntp_client.get_nowtime()
channeldata_send = {'person':person,'timestamp':timestamp}
channeldatastrlist += '%s'%datetime.datetime.fromtimestamp(timestamp) + ','
for s in channels:
idx = channels.index(s)
corrected[idx] = weighted_avg_param * corrected[idx] + (1-weighted_avg_param) * samples[idx+1]
channeldata[idx] = corrected[idx]
channeldatastr = '%s'%str(channeldata[idx])
channeldata_send.update([(s,channeldata[idx])])
if s != channels[-1]:
channeldatastrlist += channeldatastr + ','
else:
channeldatastrlist += channeldatastr + '\n'
if s == 'eeg':
eeg.append(channeldata[idx])
elif s == 'ecg':
ecg.append(channeldata[idx])
elif s == 'emg':
emg.append(channeldata[idx])
elif s == 'eda':
eda.append(channeldata[idx])
# write down data on log file
with open(bitalino_fname, "a") as f:
f.write(channeldatastrlist)
if len(eeg) > 1000:
eeg = eeg[-1000:]
if len(ecg) > 1000:
ecg = ecg[-1000:]
if len(emg) > 1000:
emg = emg[-1000:]
if len(eda) > 1000:
eda = eda[-1000:]
try:
sio.emit('my message', channeldata_send)
except Exception:
print('Cannot communicate with the server')
os._exit(0)