Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lock free ring buffer with throttling [NOETIC] #319

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changelog
* [BUGFIX]: LaserScan is not properly aligned with generated point cloud
* address an issue where LaserScan appeared different on FW prior to 2.4
* [BUGFIX]: LaserScan does not work when using dual mode
* [BUGFIX]: Implement lock free ring buffer with throttling


ouster_ros v0.10.0
Expand Down
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ add_dependencies(${PROJECT_NAME}_nodelets ${PROJECT_NAME}_gencpp)
# ==== Test ====
if(CATKIN_ENABLE_TESTING)
catkin_add_gtest(${PROJECT_NAME}_test
tests/ring_buffer_test.cpp
src/os_ros.cpp
tests/test_main.cpp
tests/ring_buffer_test.cpp
tests/lock_free_ring_buffer_test.cpp
tests/point_accessor_test.cpp
tests/point_transform_test.cpp
tests/point_cloud_compose_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="3">
<name>ouster_ros</name>
<version>0.12.1</version>
<version>0.12.2</version>
<description>Ouster ROS driver</description>
<maintainer email="[email protected]">ouster developers</maintainer>
<license file="LICENSE">BSD</license>
Expand Down
120 changes: 103 additions & 17 deletions src/lidar_packet_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include <pcl_conversions/pcl_conversions.h>

#include "lock_free_ring_buffer.h"
#include <thread>

namespace {

template <typename T, typename UnaryPredicate>
Expand Down Expand Up @@ -73,12 +76,26 @@ class LidarPacketHandler {
const std::vector<LidarScanProcessor>& handlers,
const std::string& timestamp_mode,
int64_t ptp_utc_tai_offset)
: lidar_scan_handlers{handlers} {
: ring_buffer(LIDAR_SCAN_COUNT), lidar_scan_handlers{handlers} {
// initialize lidar_scan processor and buffer
scan_batcher = std::make_unique<ouster::ScanBatcher>(info);
lidar_scan = std::make_unique<ouster::LidarScan>(
info.format.columns_per_frame, info.format.pixels_per_column,
info.format.udp_profile_lidar);

lidar_scans.resize(LIDAR_SCAN_COUNT);
mutexes.resize(LIDAR_SCAN_COUNT);

for (size_t i = 0; i < lidar_scans.size(); ++i) {
lidar_scans[i] = std::make_unique<ouster::LidarScan>(
info.format.columns_per_frame, info.format.pixels_per_column,
info.format.udp_profile_lidar);
mutexes[i] = std::make_unique<std::mutex>();
}

lidar_scans_processing_thread = std::make_unique<std::thread>([this]() {
while (lidar_scans_processing_active) {
process_scans();
}
NODELET_DEBUG("lidar_scans_processing_thread done.");
});

// initalize time handlers
scan_col_ts_spacing_ns = compute_scan_col_ts_spacing_ns(info.mode);
Expand Down Expand Up @@ -125,14 +142,39 @@ class LidarPacketHandler {
info, handlers, timestamp_mode, ptp_utc_tai_offset);
return [handler](const uint8_t* lidar_buf) {
if (handler->lidar_packet_accumlator(lidar_buf)) {
for (auto h : handler->lidar_scan_handlers) {
h(*handler->lidar_scan, handler->lidar_scan_estimated_ts,
handler->lidar_scan_estimated_msg_ts);
}
handler->ring_buffer_has_elements.notify_one();
}
};
}

const std::string getName() const { return "lidar_packet_hander"; }

void process_scans() {

{
std::unique_lock<std::mutex> index_lock(ring_buffer_mutex);
ring_buffer_has_elements.wait(index_lock, [this] {
return !ring_buffer.empty();
});
}

std::unique_lock<std::mutex> lock(*mutexes[ring_buffer.read_head()]);

for (auto h : lidar_scan_handlers) {
h(*lidar_scans[ring_buffer.read_head()], lidar_scan_estimated_ts,
lidar_scan_estimated_msg_ts);
}

// why we hit percent amount of the ring_buffer capacity throttle
size_t read_step = 1;
if (ring_buffer.size() > THROTTLE_PERCENT * ring_buffer.capacity()) {
NODELET_WARN("lidar_scans %d%% full, THROTTLING",
static_cast<int>(100* THROTTLE_PERCENT));
read_step = 2;
}
ring_buffer.read(read_step);
}

// time interpolation methods
uint64_t impute_value(int last_scan_last_nonzero_idx,
uint64_t last_scan_last_nonzero_value,
Expand Down Expand Up @@ -221,23 +263,45 @@ class LidarPacketHandler {

bool lidar_handler_sensor_time(const sensor::packet_format&,
const uint8_t* lidar_buf) {
if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp());
lidar_scan_estimated_msg_ts =
impl::ts_to_ros_time(lidar_scan_estimated_ts);

if (ring_buffer.full()) {
NODELET_WARN("lidar_scans full, DROPPING PACKET");
return false;
}

std::unique_lock<std::mutex> lock(*(mutexes[ring_buffer.write_head()]));

if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp());
lidar_scan_estimated_msg_ts = impl::ts_to_ros_time(lidar_scan_estimated_ts);

ring_buffer.write();

return true;
}

bool lidar_handler_sensor_time_ptp(const sensor::packet_format&,
const uint8_t* lidar_buf,
int64_t ptp_utc_tai_offset) {
if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false;
auto ts_v = lidar_scan->timestamp();

if (ring_buffer.full()) {
NODELET_WARN("lidar_scans full, DROPPING PACKET");
return false;
}

std::unique_lock<std::mutex> lock(
*(mutexes[ring_buffer.write_head()]));

if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false;
auto ts_v = lidar_scans[ring_buffer.write_head()]->timestamp();
for (int i = 0; i < ts_v.rows(); ++i)
ts_v[i] = impl::ts_safe_offset_add(ts_v[i], ptp_utc_tai_offset);
lidar_scan_estimated_ts = compute_scan_ts(ts_v);
lidar_scan_estimated_msg_ts =
impl::ts_to_ros_time(lidar_scan_estimated_ts);

ring_buffer.write();

return true;
}

Expand All @@ -249,12 +313,24 @@ class LidarPacketHandler {
pf, lidar_buf, packet_receive_time); // first point cloud time
lidar_handler_ros_time_frame_ts_initialized = true;
}
if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp());

if (ring_buffer.full()) {
NODELET_WARN("lidar_scans full, DROPPING PACKET");
return false;
}

std::unique_lock<std::mutex> lock(
*(mutexes[ring_buffer.write_head()]));

if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp());
lidar_scan_estimated_msg_ts = lidar_handler_ros_time_frame_ts;
// set time for next point cloud msg
lidar_handler_ros_time_frame_ts =
extrapolate_frame_ts(pf, lidar_buf, packet_receive_time);

ring_buffer.write();

return true;
}

Expand All @@ -267,7 +343,13 @@ class LidarPacketHandler {

private:
std::unique_ptr<ouster::ScanBatcher> scan_batcher;
std::unique_ptr<ouster::LidarScan> lidar_scan;
const int LIDAR_SCAN_COUNT = 10;
const double THROTTLE_PERCENT = 0.7;
LockFreeRingBuffer ring_buffer;
std::mutex ring_buffer_mutex;
std::vector<std::unique_ptr<ouster::LidarScan>> lidar_scans;
std::vector<std::unique_ptr<std::mutex>> mutexes;

uint64_t lidar_scan_estimated_ts;
ros::Time lidar_scan_estimated_msg_ts;

Expand All @@ -286,6 +368,10 @@ class LidarPacketHandler {
std::vector<LidarScanProcessor> lidar_scan_handlers;

LidarPacketAccumlator lidar_packet_accumlator;

bool lidar_scans_processing_active = true;
std::unique_ptr<std::thread> lidar_scans_processing_thread;
std::condition_variable ring_buffer_has_elements;
};

} // namespace ouster_ros
89 changes: 89 additions & 0 deletions src/lock_free_ring_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright (c) 2018-2024, Ouster, Inc.
* All rights reserved.
*
* @file thread_safe_ring_buffer.h
* @brief File contains thread safe implementation of a ring buffer
*/

#pragma once

#include <condition_variable>
#include <mutex>
#include <vector>
#include <atomic>

/**
* @class LockFreeRingBuffer thread safe ring buffer.
*
* @remarks current implementation has effective (capacity-1) when writing elements
*/
class LockFreeRingBuffer {
public:
LockFreeRingBuffer(size_t capacity)
: capacity_(capacity),
write_idx_(0),
read_idx_(0) {}

/**
* Gets the maximum number of items that this ring buffer can hold.
*/
size_t capacity() const { return capacity_; }

/**
* Gets the number of item that currently occupy the ring buffer. This
* number would vary between 0 and the capacity().
*
* @remarks
* if returned value was 0 or the value was equal to the buffer capacity(),
* this does not guarantee that a subsequent call to read() or write()
* wouldn't cause the calling thread to be blocked.
*/
size_t size() const {
return write_idx_ >= read_idx_ ?
write_idx_ - read_idx_ :
write_idx_ + capacity_ - read_idx_;
}

size_t available() const {
return capacity_ - 1 - size();
}

/**
* Checks if the ring buffer is empty.
*/
bool empty() const {
return read_idx_ == write_idx_;
}

/**
* Checks if the ring buffer is full.
*/
bool full() const {
return read_idx_ == (write_idx_ + 1) % capacity_;
}

/**
*/
bool write(size_t count = 1) {
if (count > available()) return false;
write_idx_ = (write_idx_ + count) % capacity_;
return true;
}

/**
*/
bool read(size_t count = 1) {
if (count > size()) return false;
read_idx_ = (read_idx_ + count) % capacity_;
return true;
}

size_t write_head() const { return write_idx_; }
size_t read_head() const { return read_idx_; }

private:
const size_t capacity_;
std::atomic<size_t> write_idx_;
std::atomic<size_t> read_idx_;
};
Loading
Loading