-
Notifications
You must be signed in to change notification settings - Fork 0
/
realtime_plot.py
132 lines (110 loc) · 4.34 KB
/
realtime_plot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import re
import time
import matplotlib.pyplot as plt
import threading
from queue import Queue
import pyairview
# Store the RSSI values in a global list
rssi_values = []
plot_data_queue = Queue()
# Fixed frequency range for the plot
START_FREQ = 2399.0 # MHz
END_FREQ = 2485.0 # MHz
FREQ_STEP = 0.5 # MHz
# Fixed RSSI range for the plot (adjust this if your data has a wider range)
RSSI_MIN = -100 # dBm
RSSI_MAX = -40 # dBm
def parse_rssi_input(input_text):
"""
Extracts RSSI readings from the input text using regex.
"""
try:
# Extract numbers inside square brackets after the "Received X RSSI level readings: "
match = re.search(r"Received \d+ RSSI level readings: \[(.*)\]", input_text)
if match:
rssi_data = match.group(1)
# Convert the extracted string into a list of integers
readings = list(map(int, rssi_data.split(",")))
return readings
else:
print("No valid RSSI data found in the input.")
return []
except Exception as e:
print(f"Error parsing input: {e}")
return []
def plot_rssi_spectrum():
"""
Plots a spectrum analyzer style graph of RSSI values with frequency as the X-axis.
This function continuously updates the plot from the queue.
"""
fig, ax = plt.subplots()
ax.set_title("RSSI Spectrum Analyzer")
ax.set_xlabel("Frequency (MHz)")
ax.set_ylabel("RSSI Level (dBm)")
ax.grid(True)
# Set the fixed limits for the frequency and RSSI axes
ax.set_xlim(START_FREQ, END_FREQ)
ax.set_ylim(RSSI_MIN, RSSI_MAX)
while True:
# Get new RSSI data from the queue
if not plot_data_queue.empty():
rssi_values = plot_data_queue.get()
# Calculate the number of points based on the frequency range
num_points = len(rssi_values)
frequencies = [START_FREQ + i * FREQ_STEP for i in range(num_points)]
# Plot the RSSI values against the frequencies
ax.cla() # Clear the axes
ax.set_xlim(START_FREQ, END_FREQ) # Reset frequency limits
ax.set_ylim(RSSI_MIN, RSSI_MAX) # Reset RSSI limits
ax.plot(frequencies, rssi_values, marker='o', linestyle='-', color='b', alpha=0.7)
ax.set_title("RSSI Spectrum Analyzer")
ax.set_xlabel("Frequency (MHz)")
ax.set_ylabel("RSSI Level (dBm)")
ax.grid(True)
plt.draw()
plt.pause(0.1) # Pause to update the plot
def scan_callback(rssi_list):
"""
Callback function to handle received RSSI data.
This function is called each time RSSI values are received from the Airview device.
"""
print(f"Received {len(rssi_list)} RSSI level readings: {rssi_list}")
# Put the new RSSI readings into the queue for plotting
plot_data_queue.put(rssi_list)
def start_scan():
"""
Starts the RSSI scan and processes the results continuously.
"""
try:
# Connect to the Airview device (assuming the device is connected and the port is correct)
connected = pyairview.connect("/dev/ttyACM0") # Replace with the correct port if necessary
if not connected:
print("Failed to connect to the device.")
return
print("Starting RSSI scan...")
# Start scanning
pyairview.start_scan(callback=scan_callback)
# Start the plot update thread
plot_thread = threading.Thread(target=plot_rssi_spectrum, daemon=True)
plot_thread.start()
# Continuously update the plot with new RSSI data
while pyairview.is_scanning():
time.sleep(2.5) # Delay between scan updates
# Stop scanning after a certain period or when finished
pyairview.stop_scan()
except KeyboardInterrupt:
print("Scan interrupted by user.")
except Exception as e:
print(f"Error during scan: {e}")
finally:
# Ensure proper cleanup of the serial connection
try:
pyairview.disconnect()
print("Disconnected from Airview device.")
except Exception as e:
print(f"Error during cleanup: {e}")
# Keep the plot open after scan completion
plt.ioff() # Turn off interactive mode
plt.show() # Keep the plot window open
if __name__ == "__main__":
start_scan()