-
Notifications
You must be signed in to change notification settings - Fork 5
/
StepperServoCANtester.py
executable file
·387 lines (313 loc) · 13.5 KB
/
StepperServoCANtester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
#!/usr/bin/python3
"""
Simple gui program that can be used to test StepperServoCAN motor by @killinen
"""
import os
import sys
import subprocess
import time
import threading
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox
import cantools
import can
# Constants
MIN_TORQUE = -16
MAX_TORQUE = 15.875
MIN_ANGLE = -4096
MAX_ANGLE = 4096
##########################################################################
############################### FUNCTIONS ################################
##########################################################################
# This function checks if the operating system is Linux. If not, it prints an error message and aborts the program.
def check_linux():
"""
Check if the operating system is Linux. If not, print an error message and abort the program.
Raises:
SystemExit: If the operating system is not Linux.
"""
if sys.platform != "linux":
print("Error: Operating system is not Linux. Aborting.")
return False
return True
def msg_calc_checksum_8bit(data: bytes, len: int, msg_id: int) -> int:
"""
This function calculates 8-bit checksum for given data, length and message id.
"""
checksum = msg_id
for i in range(len):
checksum += data[i]
checksum = (checksum & 0xFF) + (checksum >> 8)
checksum &= 0xFF
return checksum
# This function configures the socketCAN interface in the terminal
def configure_socketcan():
# Check for available socketCAN channels
try:
channels = []
output = subprocess.check_output("ip -details -brief link | grep can", shell=True)
for line in output.decode().split("\n"):
if line.strip():
words = line.split()
if words[1] == "UP" or words[1] == "UNKNOWN": # Check if channel is up
channel = words[0]
channels.append(channel)
except subprocess.CalledProcessError:
print("No socketCAN channels found.")
print("Hint: if you want to test the program set up a virtual CAN channel.")
print("sudo modprobe vcan")
print("sudo ip link add dev vcan0 type vcan")
print("sudo ip link set up vcan0")
return None
# If multiple CAN channels are found, list them and ask in the terminal which channel to choose
if len(channels) > 1:
print("Multiple CAN channels found:")
for i, channel in enumerate(channels):
print(f"{i+1}: {channel}")
print(f"{len(channels)+1}: Abort the program")
selection = input("Select a channel number: ")
try:
selection = int(selection)
if selection < 1 or selection > len(channels) + 1:
raise ValueError
elif selection == len(channels) + 1:
print("Aborting program...")
return None
except ValueError:
print("Invalid selection")
return None
channel = channels[selection - 1]
# If only 1 channel is found, choose that
elif len(channels) == 1:
channel = channels[0]
# If no channels are found, abort the program
else:
print("No socketCAN channels found that are UP.")
print("Hint: if you want to test the program set up a virtual CAN channel.")
print("sudo modprobe vcan")
print("sudo ip link add dev vcan0 type vcan")
print("sudo ip link set up vcan0")
return None
return channel
# Connect to CAN interface that has been selected from GUI and start sending CAN msgs
def connect_to_can_interface(interface):
global can_enabled
can_enabled = True
can_bus = None
interface_name = interface.get()
channel = None
print(f"\nConnecting to {interface_name}...")
if interface_name == "socketcan":
if not check_linux():
can_enabled = False
return
print("Finding socketCAN channel...")
channel = configure_socketcan()
if channel is None:
# If send_message thread is already running, stop the while loop by setting can_enable flag to False
print(f'CAN interface "{interface_name}" not available.')
can_enabled = False
# Handle the case where there are no compatible CAN interfaces
# You can display an error message or perform other actions as needed
return
try:
can_bus = can.Bus(interface=interface_name, bitrate=500000, channel=channel)
print("Connected.")
except Exception as e:
print("An error occurred:", e)
can_enabled = False
if can_enabled:
thread = threading.Thread(target=lambda: send_message(can_bus))
thread.start()
def interface_selected(event):
# Run the connect_to_can_interface function when a new option is selected
interface = selected_backend
connect_to_can_interface(interface)
def validate_input(input_str, min_value, max_value):
try:
# Convert the input string to an floating point number
value = float(input_str)
# Check if the value is within the specified range
if min_value <= value <= max_value:
return value
except ValueError:
# If there was an error converting to an float, or the value is outside the range,
# catch the exception and do nothing
pass
# If we get here, the input was invalid, so return None
return None
# Function to update torque and angle from widget values
def update_values():
# Use global variables to update torque and angle values
global torque, angle
# Get the torque input string from the widget
torque_str = torque_widget.get()
# Check if the input string represents a valid integer within the torque range
torque = validate_input(torque_str, MIN_TORQUE, MAX_TORQUE)
# If the input is invalid, set the torque to zero and show an error message
if torque is None:
torque = 0
messagebox.showerror("Error", "Torque value should be between {} and {}".format(MIN_TORQUE, MAX_TORQUE))
# Get the angle input string from the widget
angle_str = angle_widget.get()
# Check if the input string represents a valid integer within the angle range
angle = validate_input(angle_str, MIN_ANGLE, MAX_ANGLE)
# If the input is invalid, set the angle to zero and show an error message
if angle is None:
angle = 0
messagebox.showerror("Error", "Angle value should be between -180 and 180")
# Function to encode and send the CAN message
def update_message():
global msg, data, counter, torque, angle
# Calculate new counter value
counter = counter + 1
if counter == 16:
counter = 0
# Encode data to STEERING_COMMAND data field
data = msg.encode({
'STEER_TORQUE': torque,
'STEER_ANGLE': angle,
'STEER_MODE': steer_mode_widget.get_value(),
'COUNTER': counter & 0xF,
'CHECKSUM': 0
})
# Calculate checksum for the STEERING_COMMAND message
lent = len(data)
checksum = msg_calc_checksum_8bit(data, lent, 558)
# Encode the data field with new checksum
data = msg.encode({
'STEER_TORQUE': torque,
'STEER_ANGLE': angle,
'STEER_MODE': steer_mode_widget.get_value(),
'COUNTER': counter & 0xF,
'CHECKSUM': checksum
})
def send_message(can_bus: can.bus.BusABC):
global can_enabled
last_exec_time = time.monotonic() # current time in seconds since some arbitrary reference point
loop_count = 0
last_print_time = time.monotonic()
while can_enabled:
# Create a message using the "torque" dbc object
message = can.Message(arbitration_id=msg.frame_id, data=data, is_extended_id=False)
# Update the STEERING_COMMAND message values and send to the bus
update_message()
can_bus.send(message)
# Wait for the remaining time until the next 10 ms interval
elapsed_time = time.monotonic() - last_exec_time
remaining_time = max(0.01 - elapsed_time, 0)
time.sleep(remaining_time)
# Update last execution time and loop count
last_exec_time += 0.01
loop_count += 1
# Print send frequency every second
if time.monotonic() - last_print_time >= 1:
loop_frequency = loop_count / (time.monotonic() - last_print_time)
print(f"CAN send frequency: {loop_frequency:.2f} Hz")
loop_count = 0
last_print_time = time.monotonic()
can_bus.shutdown()
# Define a class named SteerModeWidget
class SteerModeWidget:
def __init__(self, master, label_text, options_list, command):
# Create an instance variable of type tk.IntVar to store the selected value
self.var = tk.IntVar()
# Create a Label widget with the specified label text and place it in the parent widget using the grid geometry manager
self.label = tk.Label(master, text=label_text)
self.label.grid(row=3, column=0, sticky="w") # set sticky to "w" for left alignment
# Create a set of radio buttons, one for each option in the options list
self.buttons = []
for idx, option in enumerate(options_list):
# Create a Radiobutton widget with the specified text and value, and associate it with the var instance variable
button = tk.Radiobutton(
master, text=option[1], variable=self.var, value=option[0], command=command
)
# Place the radio button in the parent widget using the grid geometry manager, and set sticky to "w" for left alignment
button.grid(row=3+idx, column=1, sticky="w")
# Add the radio button to the list of buttons
self.buttons.append(button)
def get_value(self):
# Return the selected value as an integer by calling the get method on the var instance variable
return self.var.get()
##########################################################################
############################ DEFINE VARIABLES ############################
##########################################################################
# Flag to control the CAN traffic
can_enabled = True
# Define Steer Command msg counter value
counter = 0
# Define global variables for torque and angle
torque = 0
angle = 0
# Load the .dbc file and define it's variables
current_dir = os.path.dirname(os.path.abspath(__file__))
dbc_file_path = os.path.join(current_dir, 'opendbc/ocelot_controls.dbc')
db = cantools.database.load_file(dbc_file_path)
msg = db.get_message_by_name('STEERING_COMMAND')
data = msg.encode({
'STEER_TORQUE': 0,
'STEER_ANGLE': 0,
'STEER_MODE': 0,
'COUNTER': 0,
'CHECKSUM': 0
})
##########################################################################
############################### MAIN STUFF ###############################
##########################################################################
# Create the GUI window
window = tk.Tk()
# set the title of the window
window.title("StepperServoCAN Tester")
# Set window width and height to custom values
window.geometry("360x250") # Set window width and height
# Create labels for the widgets
selected_backend = tk.StringVar(window)
selected_backend.set('pcan')
can_interface_selector = ttk.Combobox(window, values=[*sorted(can.interfaces.VALID_INTERFACES)], textvariable=selected_backend)
can_interface_selector.bind("<<ComboboxSelected>>", interface_selected)
can_label = tk.Label(window, text="CAN interface: ")
torque_label = tk.Label(window, text="Steer Torque: ")
angle_label = tk.Label(window, text="Steer Angle: ")
# Set the initial values for torque and angle
initial_torque = torque
initial_angle = angle
# Create the torque and angle widget with an initial value
torque_var = tk.DoubleVar(value=initial_torque)
torque_widget = tk.Entry(window, textvariable=torque_var)
angle_widget = tk.Entry(window, textvariable=tk.StringVar(value=str(initial_angle)))
# Add SteerModeWidget for the Steer Mode option
STEER_MODE_OPTIONS = [
(0, "Off - instant 0 torque"),
(1, "TorqueControl"),
(2, "AngleControl"),
(3, "SoftOff - ramp torque to 0 in 1s")
]
steer_mode_widget = SteerModeWidget(window, "Steer Mode: ", STEER_MODE_OPTIONS, command=update_values)
# Add a button to update torque/angle values
send_button = tk.Button(window, text='Update Torque/Angle value', command=update_values)
# Place the labels and widgets using grid
can_label.grid(row=0, column=0, sticky="w") # set sticky to "w" for left alignment
can_interface_selector.grid(row=0, column=1, sticky="w")
torque_label.grid(row=1, column=0, sticky="w") # set sticky to "w" for left alignment
torque_widget.grid(row=1, column=1, sticky="w")
angle_label.grid(row=2, column=0, sticky="w") # set sticky to "w" for left alignment
angle_widget.grid(row=2, column=1, sticky="w")
send_button.grid(row=9, column=0, columnspan=2, pady=(10, 0))
# Function for closing the program elegantly
def on_closing():
global can_enabled
can_enabled = False
window.destroy()
# Add the quit button
quit_button = tk.Button(window, text="Quit", command=on_closing)
quit_button.grid(row=10, column=0, columnspan=2, pady=(10, 0), sticky=tk.N+tk.S+tk.E+tk.W)
# Trigger events when Esc and Return key are pressed
window.bind('<Escape>', lambda event: on_closing())
window.bind('<Return>', lambda event: update_values())
# This is for making successful program termination when GUI closed from top right corner x button
window.protocol("WM_DELETE_WINDOW", on_closing)
# On startup try to connect to default CAN interface
connect_to_can_interface(selected_backend)
# Run the GUI loop
window.mainloop()