Skip to content

Commit

Permalink
Implement lock free ring buffer with throttling [NOETIC] (#319)
Browse files Browse the repository at this point in the history
* Implement lock free ring_buffer with throttling
* Implement THROTTLE_PERCENT constant
* Update CHANGELOG and package version information
  • Loading branch information
Samahu authored May 28, 2024
1 parent 2c30d0c commit 346251b
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changelog
* [BUGFIX]: LaserScan is not properly aligned with generated point cloud
* address an issue where LaserScan appeared different on FW prior to 2.4
* [BUGFIX]: LaserScan does not work when using dual mode
* [BUGFIX]: Implement lock free ring buffer with throttling


ouster_ros v0.10.0
Expand Down
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ add_dependencies(${PROJECT_NAME}_nodelets ${PROJECT_NAME}_gencpp)
# ==== Test ====
if(CATKIN_ENABLE_TESTING)
catkin_add_gtest(${PROJECT_NAME}_test
tests/ring_buffer_test.cpp
src/os_ros.cpp
tests/test_main.cpp
tests/ring_buffer_test.cpp
tests/lock_free_ring_buffer_test.cpp
tests/point_accessor_test.cpp
tests/point_transform_test.cpp
tests/point_cloud_compose_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="3">
<name>ouster_ros</name>
<version>0.12.1</version>
<version>0.12.2</version>
<description>Ouster ROS driver</description>
<maintainer email="[email protected]">ouster developers</maintainer>
<license file="LICENSE">BSD</license>
Expand Down
120 changes: 103 additions & 17 deletions src/lidar_packet_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include <pcl_conversions/pcl_conversions.h>

#include "lock_free_ring_buffer.h"
#include <thread>

namespace {

template <typename T, typename UnaryPredicate>
Expand Down Expand Up @@ -73,12 +76,26 @@ class LidarPacketHandler {
const std::vector<LidarScanProcessor>& handlers,
const std::string& timestamp_mode,
int64_t ptp_utc_tai_offset)
: lidar_scan_handlers{handlers} {
: ring_buffer(LIDAR_SCAN_COUNT), lidar_scan_handlers{handlers} {
// initialize lidar_scan processor and buffer
scan_batcher = std::make_unique<ouster::ScanBatcher>(info);
lidar_scan = std::make_unique<ouster::LidarScan>(
info.format.columns_per_frame, info.format.pixels_per_column,
info.format.udp_profile_lidar);

lidar_scans.resize(LIDAR_SCAN_COUNT);
mutexes.resize(LIDAR_SCAN_COUNT);

for (size_t i = 0; i < lidar_scans.size(); ++i) {
lidar_scans[i] = std::make_unique<ouster::LidarScan>(
info.format.columns_per_frame, info.format.pixels_per_column,
info.format.udp_profile_lidar);
mutexes[i] = std::make_unique<std::mutex>();
}

lidar_scans_processing_thread = std::make_unique<std::thread>([this]() {
while (lidar_scans_processing_active) {
process_scans();
}
NODELET_DEBUG("lidar_scans_processing_thread done.");
});

// initalize time handlers
scan_col_ts_spacing_ns = compute_scan_col_ts_spacing_ns(info.mode);
Expand Down Expand Up @@ -125,14 +142,39 @@ class LidarPacketHandler {
info, handlers, timestamp_mode, ptp_utc_tai_offset);
return [handler](const uint8_t* lidar_buf) {
if (handler->lidar_packet_accumlator(lidar_buf)) {
for (auto h : handler->lidar_scan_handlers) {
h(*handler->lidar_scan, handler->lidar_scan_estimated_ts,
handler->lidar_scan_estimated_msg_ts);
}
handler->ring_buffer_has_elements.notify_one();
}
};
}

const std::string getName() const { return "lidar_packet_hander"; }

void process_scans() {

{
std::unique_lock<std::mutex> index_lock(ring_buffer_mutex);
ring_buffer_has_elements.wait(index_lock, [this] {
return !ring_buffer.empty();
});
}

std::unique_lock<std::mutex> lock(*mutexes[ring_buffer.read_head()]);

for (auto h : lidar_scan_handlers) {
h(*lidar_scans[ring_buffer.read_head()], lidar_scan_estimated_ts,
lidar_scan_estimated_msg_ts);
}

// why we hit percent amount of the ring_buffer capacity throttle
size_t read_step = 1;
if (ring_buffer.size() > THROTTLE_PERCENT * ring_buffer.capacity()) {
NODELET_WARN("lidar_scans %d%% full, THROTTLING",
static_cast<int>(100* THROTTLE_PERCENT));
read_step = 2;
}
ring_buffer.read(read_step);
}

// time interpolation methods
uint64_t impute_value(int last_scan_last_nonzero_idx,
uint64_t last_scan_last_nonzero_value,
Expand Down Expand Up @@ -221,23 +263,45 @@ class LidarPacketHandler {

bool lidar_handler_sensor_time(const sensor::packet_format&,
const uint8_t* lidar_buf) {
if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp());
lidar_scan_estimated_msg_ts =
impl::ts_to_ros_time(lidar_scan_estimated_ts);

if (ring_buffer.full()) {
NODELET_WARN("lidar_scans full, DROPPING PACKET");
return false;
}

std::unique_lock<std::mutex> lock(*(mutexes[ring_buffer.write_head()]));

if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp());
lidar_scan_estimated_msg_ts = impl::ts_to_ros_time(lidar_scan_estimated_ts);

ring_buffer.write();

return true;
}

bool lidar_handler_sensor_time_ptp(const sensor::packet_format&,
const uint8_t* lidar_buf,
int64_t ptp_utc_tai_offset) {
if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false;
auto ts_v = lidar_scan->timestamp();

if (ring_buffer.full()) {
NODELET_WARN("lidar_scans full, DROPPING PACKET");
return false;
}

std::unique_lock<std::mutex> lock(
*(mutexes[ring_buffer.write_head()]));

if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false;
auto ts_v = lidar_scans[ring_buffer.write_head()]->timestamp();
for (int i = 0; i < ts_v.rows(); ++i)
ts_v[i] = impl::ts_safe_offset_add(ts_v[i], ptp_utc_tai_offset);
lidar_scan_estimated_ts = compute_scan_ts(ts_v);
lidar_scan_estimated_msg_ts =
impl::ts_to_ros_time(lidar_scan_estimated_ts);

ring_buffer.write();

return true;
}

Expand All @@ -249,12 +313,24 @@ class LidarPacketHandler {
pf, lidar_buf, packet_receive_time); // first point cloud time
lidar_handler_ros_time_frame_ts_initialized = true;
}
if (!(*scan_batcher)(lidar_buf, *lidar_scan)) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scan->timestamp());

if (ring_buffer.full()) {
NODELET_WARN("lidar_scans full, DROPPING PACKET");
return false;
}

std::unique_lock<std::mutex> lock(
*(mutexes[ring_buffer.write_head()]));

if (!(*scan_batcher)(lidar_buf, *lidar_scans[ring_buffer.write_head()])) return false;
lidar_scan_estimated_ts = compute_scan_ts(lidar_scans[ring_buffer.write_head()]->timestamp());
lidar_scan_estimated_msg_ts = lidar_handler_ros_time_frame_ts;
// set time for next point cloud msg
lidar_handler_ros_time_frame_ts =
extrapolate_frame_ts(pf, lidar_buf, packet_receive_time);

ring_buffer.write();

return true;
}

Expand All @@ -267,7 +343,13 @@ class LidarPacketHandler {

private:
std::unique_ptr<ouster::ScanBatcher> scan_batcher;
std::unique_ptr<ouster::LidarScan> lidar_scan;
const int LIDAR_SCAN_COUNT = 10;
const double THROTTLE_PERCENT = 0.7;
LockFreeRingBuffer ring_buffer;
std::mutex ring_buffer_mutex;
std::vector<std::unique_ptr<ouster::LidarScan>> lidar_scans;
std::vector<std::unique_ptr<std::mutex>> mutexes;

uint64_t lidar_scan_estimated_ts;
ros::Time lidar_scan_estimated_msg_ts;

Expand All @@ -286,6 +368,10 @@ class LidarPacketHandler {
std::vector<LidarScanProcessor> lidar_scan_handlers;

LidarPacketAccumlator lidar_packet_accumlator;

bool lidar_scans_processing_active = true;
std::unique_ptr<std::thread> lidar_scans_processing_thread;
std::condition_variable ring_buffer_has_elements;
};

} // namespace ouster_ros
89 changes: 89 additions & 0 deletions src/lock_free_ring_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright (c) 2018-2024, Ouster, Inc.
* All rights reserved.
*
* @file thread_safe_ring_buffer.h
* @brief File contains thread safe implementation of a ring buffer
*/

#pragma once

#include <condition_variable>
#include <mutex>
#include <vector>
#include <atomic>

/**
* @class LockFreeRingBuffer thread safe ring buffer.
*
* @remarks current implementation has effective (capacity-1) when writing elements
*/
class LockFreeRingBuffer {
public:
LockFreeRingBuffer(size_t capacity)
: capacity_(capacity),
write_idx_(0),
read_idx_(0) {}

/**
* Gets the maximum number of items that this ring buffer can hold.
*/
size_t capacity() const { return capacity_; }

/**
* Gets the number of item that currently occupy the ring buffer. This
* number would vary between 0 and the capacity().
*
* @remarks
* if returned value was 0 or the value was equal to the buffer capacity(),
* this does not guarantee that a subsequent call to read() or write()
* wouldn't cause the calling thread to be blocked.
*/
size_t size() const {
return write_idx_ >= read_idx_ ?
write_idx_ - read_idx_ :
write_idx_ + capacity_ - read_idx_;
}

size_t available() const {
return capacity_ - 1 - size();
}

/**
* Checks if the ring buffer is empty.
*/
bool empty() const {
return read_idx_ == write_idx_;
}

/**
* Checks if the ring buffer is full.
*/
bool full() const {
return read_idx_ == (write_idx_ + 1) % capacity_;
}

/**
*/
bool write(size_t count = 1) {
if (count > available()) return false;
write_idx_ = (write_idx_ + count) % capacity_;
return true;
}

/**
*/
bool read(size_t count = 1) {
if (count > size()) return false;
read_idx_ = (read_idx_ + count) % capacity_;
return true;
}

size_t write_head() const { return write_idx_; }
size_t read_head() const { return read_idx_; }

private:
const size_t capacity_;
std::atomic<size_t> write_idx_;
std::atomic<size_t> read_idx_;
};
Loading

0 comments on commit 346251b

Please sign in to comment.