Skip to content

Commit

Permalink
write each
Browse files Browse the repository at this point in the history
  • Loading branch information
cindyyan317 committed Aug 16, 2024
1 parent 313ea83 commit afa7ec9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/data/CassandraBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ class BasicCassandraBackend : public BackendInterface {
}
}

executor_.write(std::move(statements));
executor_.writeEach(std::move(statements));
}

void
Expand Down
35 changes: 34 additions & 1 deletion src/data/cassandra/impl/ExecutionStrategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
Expand Down Expand Up @@ -192,10 +193,24 @@ class DefaultExecutionStrategy {
template <typename... Args>
void
write(PreparedStatementType const& preparedStatement, Args&&... args)
{
auto statement = preparedStatement.bind(std::forward<Args>(args)...);
write(std::move(statement));
}

/**
* @brief Non-blocking query execution used for writing data.
*
* Retries forever with retry policy specified by @ref AsyncExecutor
*
* @param statement Statement to execute
* @throw DatabaseTimeout on timeout
*/
void
write(StatementType&& statement)
{
auto const startTime = std::chrono::steady_clock::now();

auto statement = preparedStatement.bind(std::forward<Args>(args)...);
incrementOutstandingRequestCount();

counters_->registerWriteStarted();
Expand All @@ -213,6 +228,24 @@ class DefaultExecutionStrategy {
);
}

/**
* @brief Non-blocking query execution used for writing data. Constrast with write, this method does not execute
* the statements in a batch.
*
* Retries forever with retry policy specified by @ref AsyncExecutor.
*
* @param statements Vector of statements to execute
* @throw DatabaseTimeout on timeout
*/
void
writeEach(std::vector<StatementType>&& statements)
{
if (statements.empty())
return;

std::ranges::for_each(std::move(statements), [this](auto& statement) { this->write(std::move(statement)); });
}

/**
* @brief Non-blocking batched query execution used for writing data.
*
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/data/cassandra/ExecutionStrategyTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,47 @@ TEST_F(BackendCassandraExecutionStrategyTest, WriteMultipleAndCallSyncSucceeds)
thread.join();
}

TEST_F(BackendCassandraExecutionStrategyTest, WriteEachAndCallSyncSucceeds)
{
auto strat = makeStrategy();
auto const totalRequests = 1024u;
auto const numStatements = 16u;
auto callCount = std::atomic_uint{0u};

auto work = std::optional<boost::asio::io_context::work>{ctx};
auto thread = std::thread{[this]() { ctx.run(); }};

ON_CALL(handle, asyncExecute(A<FakeStatement const&>(), A<std::function<void(FakeResultOrError)>&&>()))
.WillByDefault([this, &callCount](auto const&, auto&& cb) {
// run on thread to emulate concurrency model of real asyncExecute
boost::asio::post(ctx, [&callCount, cb = std::forward<decltype(cb)>(cb)] {
++callCount;
cb({}); // pretend we got data
});
return FakeFutureWithCallback{};
});
EXPECT_CALL(
handle,
asyncExecute(
A<FakeStatement const&>(),
A<std::function<void(FakeResultOrError)>&&>()
)
)
.Times(totalRequests * numStatements); // numStatements per write call
EXPECT_CALL(*counters, registerWriteStarted()).Times(totalRequests * numStatements);
EXPECT_CALL(*counters, registerWriteFinished(testing::_)).Times(totalRequests * numStatements);

auto makeStatements = [] { return std::vector<FakeStatement>(16); };
for (auto i = 0u; i < totalRequests; ++i)
strat.writeEach(makeStatements());

strat.sync(); // make sure all above writes are finished
EXPECT_EQ(callCount, totalRequests * numStatements); // all requests should finish

work.reset();
thread.join();
}

TEST_F(BackendCassandraExecutionStrategyTest, StatsCallsCountersReport)
{
auto strat = makeStrategy();
Expand Down

0 comments on commit afa7ec9

Please sign in to comment.