Skip to content

Commit

Permalink
feat: Move/copy support in async framework (#1609)
Browse files Browse the repository at this point in the history
Fixes #1608
  • Loading branch information
godexsoft authored Aug 20, 2024
1 parent fb473f6 commit 9a9de50
Show file tree
Hide file tree
Showing 18 changed files with 289 additions and 53 deletions.
2 changes: 1 addition & 1 deletion benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ target_sources(
include(deps/gbench)

target_include_directories(clio_benchmark PRIVATE .)
target_link_libraries(clio_benchmark PUBLIC clio benchmark::benchmark_main)
target_link_libraries(clio_benchmark PUBLIC clio_etl benchmark::benchmark_main)
set_target_properties(clio_benchmark PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
1 change: 0 additions & 1 deletion benchmarks/util/async/ExecutionContextBenchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <cstdint>
#include <latch>
#include <optional>
#include <stdexcept>
#include <thread>
#include <vector>

Expand Down
3 changes: 2 additions & 1 deletion src/util/Assert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,5 @@ assertImpl(

} // namespace util

#define ASSERT(condition, ...) util::assertImpl(CURRENT_SRC_LOCATION, #condition, (condition), __VA_ARGS__)
#define ASSERT(condition, ...) \
util::assertImpl(CURRENT_SRC_LOCATION, #condition, static_cast<bool>(condition), __VA_ARGS__)
60 changes: 41 additions & 19 deletions src/util/async/AnyExecutionContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,37 @@ class AnyExecutionContext {
/**
* @brief Construct a new type-erased Execution Context object
*
* @note Stores the Execution Context by reference.
*
* @tparam CtxType The type of the execution context to wrap
* @param ctx The execution context to wrap
*/
template <typename CtxType>
requires(not std::is_same_v<std::decay_t<CtxType>, AnyExecutionContext>)
template <NotSameAs<AnyExecutionContext> CtxType>
/* implicit */
AnyExecutionContext(CtxType& ctx) : pimpl_{std::make_shared<Model<CtxType&>>(ctx)}
{
}

/**
* @brief Construct a new type-erased Execution Context object
*
* @note Stores the Execution Context by moving it into the AnyExecutionContext.
*
* @tparam CtxType The type of the execution context to wrap
* @param ctx The execution context to wrap
*/
template <RValueNotSameAs<AnyExecutionContext> CtxType>
/* implicit */
AnyExecutionContext(CtxType&& ctx) : pimpl_{std::make_unique<Model<CtxType>>(std::forward<CtxType>(ctx))}
AnyExecutionContext(CtxType&& ctx) : pimpl_{std::make_shared<Model<CtxType>>(std::forward<CtxType>(ctx))}
{
}

AnyExecutionContext(AnyExecutionContext const&) = default;
AnyExecutionContext(AnyExecutionContext&&) = default;
AnyExecutionContext&
operator=(AnyExecutionContext const&) = default;
AnyExecutionContext&
operator=(AnyExecutionContext&&) = default;
~AnyExecutionContext() = default;

/**
Expand Down Expand Up @@ -206,7 +227,7 @@ class AnyExecutionContext {
* @brief Stop the execution context
*/
void
stop()
stop() const
{
pimpl_->stop();
}
Expand All @@ -215,7 +236,7 @@ class AnyExecutionContext {
* @brief Join the execution context
*/
void
join()
join() const
{
pimpl_->join();
}
Expand All @@ -237,64 +258,65 @@ class AnyExecutionContext {
virtual AnyStrand
makeStrand() = 0;
virtual void
stop() = 0;
stop() const = 0;
virtual void
join() = 0;
join() const = 0;
};

template <typename CtxType>
struct Model : Concept {
std::reference_wrapper<std::decay_t<CtxType>> ctx;
CtxType ctx;

Model(CtxType& ctx) : ctx{std::ref(ctx)}
template <typename Type>
Model(Type&& ctx) : ctx(std::forward<Type>(ctx))
{
}

impl::ErasedOperation
execute(std::function<std::any(AnyStopToken)> fn, std::optional<std::chrono::milliseconds> timeout) override
{
return ctx.get().execute(std::move(fn), timeout);
return ctx.execute(std::move(fn), timeout);
}

impl::ErasedOperation
execute(std::function<std::any()> fn) override
{
return ctx.get().execute(std::move(fn));
return ctx.execute(std::move(fn));
}

impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds delay, std::function<std::any(AnyStopToken)> fn) override
{
return ctx.get().scheduleAfter(delay, std::move(fn));
return ctx.scheduleAfter(delay, std::move(fn));
}

impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds delay, std::function<std::any(AnyStopToken, bool)> fn) override
{
return ctx.get().scheduleAfter(delay, std::move(fn));
return ctx.scheduleAfter(delay, std::move(fn));
}

AnyStrand
makeStrand() override
{
return ctx.get().makeStrand();
return ctx.makeStrand();
}

void
stop() override
stop() const override
{
ctx.get().stop();
ctx.stop();
}

void
join() override
join() const override
{
ctx.get().join();
ctx.join();
}
};

private:
std::unique_ptr<Concept> pimpl_;
std::shared_ptr<Concept> pimpl_;
};

} // namespace util::async
5 changes: 1 addition & 4 deletions src/util/async/AnyOperation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@ class AnyOperation {
/**
* @brief Construct a new type-erased Operation object
*
* @tparam OpType The type of the operation to wrap
* @param operation The operation to wrap
*/
template <SomeOperation OpType>
requires std::is_same_v<std::decay_t<OpType>, impl::ErasedOperation>
/* implicit */ AnyOperation(OpType&& operation) : operation_{std::forward<OpType>(operation)}
/* implicit */ AnyOperation(impl::ErasedOperation&& operation) : operation_{std::move(operation)}
{
}

Expand Down
5 changes: 3 additions & 2 deletions src/util/async/AnyStopToken.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <memory>
#include <type_traits>
#include <utility>

namespace util::async {

Expand All @@ -41,7 +42,7 @@ class AnyStopToken {
* @param token The stop token to wrap
*/
template <SomeStopToken TokenType>
requires(not std::is_same_v<std::decay_t<TokenType>, AnyStopToken>)
requires NotSameAs<TokenType, AnyStopToken>
/* implicit */ AnyStopToken(TokenType&& token)
: pimpl_{std::make_unique<Model<TokenType>>(std::forward<TokenType>(token))}
{
Expand Down Expand Up @@ -142,7 +143,7 @@ class AnyStopToken {
}

ASSERT(false, "Token type does not support conversion to boost::asio::yield_context");
__builtin_unreachable(); // TODO: replace with std::unreachable when C++23 is available
std::unreachable();
}
};

Expand Down
10 changes: 6 additions & 4 deletions src/util/async/AnyStrand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include "util/async/AnyStopToken.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/impl/ErasedOperation.hpp"

#include <any>
Expand All @@ -43,13 +44,14 @@ class AnyStrand {
* @tparam StrandType The type of the strand to wrap
* @param strand The strand to wrap
*/
template <typename StrandType>
requires(not std::is_same_v<std::decay_t<StrandType>, AnyStrand>)
template <NotSameAs<AnyStrand> StrandType>
/* implicit */ AnyStrand(StrandType&& strand)
: pimpl_{std::make_unique<Model<StrandType>>(std::forward<StrandType>(strand))}
: pimpl_{std::make_shared<Model<StrandType>>(std::forward<StrandType>(strand))}
{
}

AnyStrand(AnyStrand const&) = default;
AnyStrand(AnyStrand&&) = default;
~AnyStrand() = default;

/**
Expand Down Expand Up @@ -163,7 +165,7 @@ class AnyStrand {
};

private:
std::unique_ptr<Concept> pimpl_;
std::shared_ptr<Concept> pimpl_;
};

} // namespace util::async
31 changes: 29 additions & 2 deletions src/util/async/Concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include <boost/asio/spawn.hpp>

#include <chrono>
#include <concepts>
#include <functional>
#include <optional>
#include <type_traits>

namespace util::async {
Expand Down Expand Up @@ -141,13 +143,38 @@ concept SomeStdDuration = requires {
// See https://stackoverflow.com/questions/74383254/concept-that-models-only-the-stdchrono-duration-types
[]<typename Rep, typename Period>( //
std::type_identity<std::chrono::duration<Rep, Period>>
) {}(std::type_identity<T>());
) {}(std::type_identity<std::decay_t<T>>());
};

/**
* @brief Specifies that the type must be some std::optional
*/
template <typename T>
concept SomeStdOptional = requires {
[]<typename Type>( //
std::type_identity<std::optional<Type>>
) {}(std::type_identity<std::decay_t<T>>());
};

/**
* @brief Specifies that the type must be some std::duration wrapped in an optional
*/
template <typename T>
concept SomeOptStdDuration = requires(T v) { SomeStdDuration<decltype(v.value())>; };
concept SomeOptStdDuration = SomeStdOptional<T> and SomeStdDuration<decltype(T{}.value())>;

/**
* @brief Checks that decayed T s not of the same type as Erased
*/
template <typename T, typename Erased>
concept NotSameAs = not std::is_same_v<std::decay_t<T>, Erased>;

/**
* @brief Check that T is an r-value and is not the same type as Erased
*/
template <typename T, typename Erased>
concept RValueNotSameAs = requires(T&& t) {
requires std::is_rvalue_reference_v<decltype(t)>;
requires NotSameAs<T, Erased>;
};

} // namespace util::async
1 change: 1 addition & 0 deletions src/util/async/Outcome.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "util/async/context/impl/Cancellation.hpp"

#include <concepts>
#include <future>

namespace util::async {
Expand Down
51 changes: 44 additions & 7 deletions src/util/async/context/BasicExecutionContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "util/Assert.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/Operation.hpp"
Expand All @@ -38,6 +39,7 @@
#include <chrono>
#include <cstddef>
#include <expected>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
Expand All @@ -59,6 +61,12 @@ struct AsioPoolStrandContext {
using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
using Timer = SteadyTimer<Executor>;

Executor const&
getExecutor() const
{
return executor;
}

Executor executor;
};

Expand All @@ -67,13 +75,42 @@ struct AsioPoolContext {
using Timer = SteadyTimer<Executor>;
using Strand = AsioPoolStrandContext;

AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
{
}

AsioPoolContext(AsioPoolContext const&) = delete;
AsioPoolContext(AsioPoolContext&&) = default;

Strand
makeStrand()
makeStrand() const
{
return {boost::asio::make_strand(executor)};
ASSERT(executor, "Called after executor was moved from.");
return {boost::asio::make_strand(*executor)};
}

Executor executor;
void
stop() const
{
if (executor) // don't call if executor was moved from
executor->stop();
}

void
join() const
{
if (executor) // don't call if executor was moved from
executor->join();
}

Executor&
getExecutor() const
{
ASSERT(executor, "Called after executor was moved from.");
return *executor;
}

std::unique_ptr<Executor> executor;
};

} // namespace impl
Expand Down Expand Up @@ -151,7 +188,7 @@ class BasicExecutionContext {
stop();
}

BasicExecutionContext(BasicExecutionContext&&) = delete;
BasicExecutionContext(BasicExecutionContext&&) = default;
BasicExecutionContext(BasicExecutionContext const&) = delete;

/**
Expand Down Expand Up @@ -323,9 +360,9 @@ class BasicExecutionContext {
* @brief Stop the execution context as soon as possible
*/
void
stop() noexcept
stop() const noexcept
{
context_.executor.stop();
context_.stop();
}

/**
Expand All @@ -334,7 +371,7 @@ class BasicExecutionContext {
void
join() noexcept
{
context_.executor.join();
context_.join();
}
};

Expand Down
Loading

0 comments on commit 9a9de50

Please sign in to comment.