diff --git a/util/accept_server.h b/util/accept_server.h index 924c96db..dc7dab5c 100644 --- a/util/accept_server.h +++ b/util/accept_server.h @@ -71,6 +71,7 @@ class AcceptServer { fb2::BlockingCounter ref_bc_; // to synchronize listener threads during the shutdown. bool was_run_ = false; + bool break_on_int_; uint16_t backlog_ = 128; }; diff --git a/util/fibers/accept_server.cc b/util/fibers/accept_server.cc index 8b90948b..c1ca64bc 100644 --- a/util/fibers/accept_server.cc +++ b/util/fibers/accept_server.cc @@ -18,10 +18,10 @@ using namespace boost; using namespace std; AcceptServer::AcceptServer(ProactorPool* pool, PMR_NS::memory_resource* mr, bool break_on_int) - : pool_(pool), mr_(mr), ref_bc_(0) { + : pool_(pool), mr_(mr), ref_bc_(0), break_on_int_(break_on_int) { if (break_on_int) { ProactorBase* proactor = pool_->GetNextProactor(); - proactor->RegisterSignal({SIGINT, SIGTERM}, [this](int signal) { + ProactorBase::RegisterSignal({SIGINT, SIGTERM}, proactor, [this](int signal) { LOG(INFO) << "Exiting on signal " << strsignal(signal); if (on_break_hook_) { on_break_hook_(); @@ -32,6 +32,9 @@ AcceptServer::AcceptServer(ProactorPool* pool, PMR_NS::memory_resource* mr, bool } AcceptServer::~AcceptServer() { + if (break_on_int_) { + ProactorBase::ClearSignal({SIGINT, SIGTERM}); + } list_interface_.clear(); } diff --git a/util/fibers/proactor_base.cc b/util/fibers/proactor_base.cc index 4e26915b..0c0bc5b5 100644 --- a/util/fibers/proactor_base.cc +++ b/util/fibers/proactor_base.cc @@ -230,7 +230,8 @@ void ProactorBase::Migrate(ProactorBase* dest) { }); } -void ProactorBase::RegisterSignal(std::initializer_list l, std::function cb) { +void ProactorBase::RegisterSignal(std::initializer_list l, ProactorBase* proactor, + std::function cb) { auto* state = get_signal_state(); struct sigaction sa; @@ -243,7 +244,7 @@ void ProactorBase::RegisterSignal(std::initializer_list l, std::functi for (uint16_t val : l) { CHECK(!state->signal_map[val].cb) << "Signal " << val << " was already registered"; state->signal_map[val].cb = cb; - state->signal_map[val].proactor = this; + state->signal_map[val].proactor = proactor; CHECK_EQ(0, sigaction(val, &sa, NULL)); } diff --git a/util/fibers/proactor_base.h b/util/fibers/proactor_base.h index 8ab0f890..55d0c873 100644 --- a/util/fibers/proactor_base.h +++ b/util/fibers/proactor_base.h @@ -91,10 +91,11 @@ class ProactorBase { return tl_info_.owner; } - void RegisterSignal(std::initializer_list l, std::function cb); + static void RegisterSignal(std::initializer_list l, ProactorBase* proactor, + std::function cb); - void ClearSignal(std::initializer_list l) { - RegisterSignal(l, nullptr); + static void ClearSignal(std::initializer_list l) { + RegisterSignal(l, nullptr, nullptr); } // Returns an approximate (cached) time with nano-sec granularity. @@ -253,7 +254,6 @@ class ProactorBase { return absl::GetCurrentTimeNanos(); } - // Returns true if we should poll scheduler tasks that run periodically but not too often. bool ShouldPollL2Tasks() const; @@ -265,12 +265,12 @@ class ProactorBase { static uint64_t GetCPUCycleCount() { #if defined(__x86_64__) uint64_t low, high; - __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); - return static_cast((high << 32) | low); + __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); + return static_cast((high << 32) | low); #elif defined(__aarch64__) - int64_t tv; - asm volatile("mrs %0, cntvct_el0" : "=r"(tv)); - return tv; + int64_t tv; + asm volatile("mrs %0, cntvct_el0" : "=r"(tv)); + return tv; #else return absl::base_internal::CycleClock::Now(); #endif @@ -367,7 +367,6 @@ inline void ProactorBase::WakeupIfNeeded() { // memory_order_acq_rel until further notice. auto current = tq_seq_.fetch_add(2, std::memory_order_acq_rel); if (current == WAIT_SECTION_STATE) { - // We protect WakeRing using tq_seq_. That means only one thread at a time // can enter here. Moreover tq_seq_ == WAIT_SECTION_STATE only when // proactor enters WAIT section, therefore we do not race over SQE ring with proactor thread.