Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unregister termination callback when AcceptServer exits #343

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions util/accept_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class AcceptServer {
fb2::BlockingCounter ref_bc_; // to synchronize listener threads during the shutdown.

bool was_run_ = false;
bool break_on_int_;

uint16_t backlog_ = 128;
};
Expand Down
7 changes: 5 additions & 2 deletions util/fibers/accept_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ using namespace boost;
using namespace std;

AcceptServer::AcceptServer(ProactorPool* pool, PMR_NS::memory_resource* mr, bool break_on_int)
: pool_(pool), mr_(mr), ref_bc_(0) {
: pool_(pool), mr_(mr), ref_bc_(0), break_on_int_(break_on_int) {
if (break_on_int) {
ProactorBase* proactor = pool_->GetNextProactor();
proactor->RegisterSignal({SIGINT, SIGTERM}, [this](int signal) {
ProactorBase::RegisterSignal({SIGINT, SIGTERM}, proactor, [this](int signal) {
LOG(INFO) << "Exiting on signal " << strsignal(signal);
if (on_break_hook_) {
on_break_hook_();
Expand All @@ -32,6 +32,9 @@ AcceptServer::AcceptServer(ProactorPool* pool, PMR_NS::memory_resource* mr, bool
}

AcceptServer::~AcceptServer() {
if (break_on_int_) {
ProactorBase::ClearSignal({SIGINT, SIGTERM});
}
list_interface_.clear();
}

Expand Down
5 changes: 3 additions & 2 deletions util/fibers/proactor_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ void ProactorBase::Migrate(ProactorBase* dest) {
});
}

void ProactorBase::RegisterSignal(std::initializer_list<uint16_t> l, std::function<void(int)> cb) {
void ProactorBase::RegisterSignal(std::initializer_list<uint16_t> l, ProactorBase* proactor,
std::function<void(int)> cb) {
auto* state = get_signal_state();

struct sigaction sa;
Expand All @@ -243,7 +244,7 @@ void ProactorBase::RegisterSignal(std::initializer_list<uint16_t> l, std::functi
for (uint16_t val : l) {
CHECK(!state->signal_map[val].cb) << "Signal " << val << " was already registered";
state->signal_map[val].cb = cb;
state->signal_map[val].proactor = this;
state->signal_map[val].proactor = proactor;

CHECK_EQ(0, sigaction(val, &sa, NULL));
}
Expand Down
19 changes: 9 additions & 10 deletions util/fibers/proactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ class ProactorBase {
return tl_info_.owner;
}

void RegisterSignal(std::initializer_list<uint16_t> l, std::function<void(int)> cb);
static void RegisterSignal(std::initializer_list<uint16_t> l, ProactorBase* proactor,
std::function<void(int)> cb);

void ClearSignal(std::initializer_list<uint16_t> l) {
RegisterSignal(l, nullptr);
static void ClearSignal(std::initializer_list<uint16_t> l) {
RegisterSignal(l, nullptr, nullptr);
}

// Returns an approximate (cached) time with nano-sec granularity.
Expand Down Expand Up @@ -253,7 +254,6 @@ class ProactorBase {
return absl::GetCurrentTimeNanos();
}


// Returns true if we should poll scheduler tasks that run periodically but not too often.
bool ShouldPollL2Tasks() const;

Expand All @@ -265,12 +265,12 @@ class ProactorBase {
static uint64_t GetCPUCycleCount() {
#if defined(__x86_64__)
uint64_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
return static_cast<int64_t>((high << 32) | low);
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
return static_cast<int64_t>((high << 32) | low);
#elif defined(__aarch64__)
int64_t tv;
asm volatile("mrs %0, cntvct_el0" : "=r"(tv));
return tv;
int64_t tv;
asm volatile("mrs %0, cntvct_el0" : "=r"(tv));
return tv;
#else
return absl::base_internal::CycleClock::Now();
#endif
Expand Down Expand Up @@ -367,7 +367,6 @@ inline void ProactorBase::WakeupIfNeeded() {
// memory_order_acq_rel until further notice.
auto current = tq_seq_.fetch_add(2, std::memory_order_acq_rel);
if (current == WAIT_SECTION_STATE) {

// We protect WakeRing using tq_seq_. That means only one thread at a time
// can enter here. Moreover tq_seq_ == WAIT_SECTION_STATE only when
// proactor enters WAIT section, therefore we do not race over SQE ring with proactor thread.
Expand Down