diff --git a/include/ichor/services/network/tcp/TcpConnectionService.h b/include/ichor/services/network/tcp/TcpConnectionService.h index 28dc93a5..721ec689 100644 --- a/include/ichor/services/network/tcp/TcpConnectionService.h +++ b/include/ichor/services/network/tcp/TcpConnectionService.h @@ -17,7 +17,6 @@ namespace Ichor { * - "Socket" int - An existing socket to manage (required if Address/Port are not present) * - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY) * - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000) - * - "TimeoutRecvUs" int64_t - Timeout in microseconds for recv calls (default 250'000) */ class TcpConnectionService final : public IConnectionService, public AdvancedService { public: @@ -47,13 +46,10 @@ namespace Ichor { friend DependencyRegister; - static uint64_t tcpConnId; int _socket; - uint64_t _id; uint64_t _attempts; uint64_t _priority; int64_t _sendTimeout{250'000}; - int64_t _recvTimeout{250'000}; bool _quit; ILogger *_logger{}; ITimerFactory *_timerFactory{}; diff --git a/include/ichor/services/network/tcp/TcpHostService.h b/include/ichor/services/network/tcp/TcpHostService.h index b03a80e8..96540305 100644 --- a/include/ichor/services/network/tcp/TcpHostService.h +++ b/include/ichor/services/network/tcp/TcpHostService.h @@ -31,7 +31,6 @@ namespace Ichor { * - "Port" uint16_t - What port to bind to (required) * - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY) * - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000) - * - "TimeoutRecvUs" int64_t - Timeout in microseconds for recv calls (default 250'000) */ class TcpHostService final : public IHostService, public AdvancedService { public: diff --git a/src/services/network/tcp/TcpConnectionService.cpp b/src/services/network/tcp/TcpConnectionService.cpp index eb3ddb4d..2c2b20ef 100644 --- a/src/services/network/tcp/TcpConnectionService.cpp +++ b/src/services/network/tcp/TcpConnectionService.cpp @@ -12,9 +12,7 @@ #include #include -uint64_t Ichor::TcpConnectionService::tcpConnId{}; - -Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _id(tcpConnId++), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { +Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { reg.registerDependency(this, DependencyFlags::NONE); reg.registerDependency(this, DependencyFlags::REQUIRED); } @@ -26,9 +24,6 @@ Ichor::Task> Ichor::TcpConnectionService:: if(auto propIt = getProperties().find("TimeoutSendUs"); propIt != getProperties().end()) { _sendTimeout = Ichor::any_cast(propIt->second); } - if(auto propIt = getProperties().find("TimeoutRecvUs"); propIt != getProperties().end()) { - _recvTimeout = Ichor::any_cast(propIt->second); - } if(getProperties().contains("Socket")) { if(auto propIt = getProperties().find("Socket"); propIt != getProperties().end()) { @@ -39,26 +34,23 @@ Ichor::Task> Ichor::TcpConnectionService:: ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting)); timeval timeout{}; - timeout.tv_usec = _recvTimeout; - setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); timeout.tv_usec = _sendTimeout; setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); - auto flags = ::fcntl(_socket, F_GETFL, 0); - ::fcntl(_socket, F_SETFL, flags | O_NONBLOCK); - ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for existing socket", _id); + ICHOR_LOG_DEBUG(_logger, "[{}] Starting TCP connection for existing socket", getServiceId()); } else { auto addrIt = getProperties().find("Address"); auto portIt = getProperties().find("Port"); if(addrIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "[{}] Missing address", _id); + ICHOR_LOG_ERROR(_logger, "[{}] Missing address", getServiceId()); co_return tl::unexpected(StartError::FAILED); } if(portIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "[{}] Missing port", _id); + ICHOR_LOG_ERROR(_logger, "[{}] Missing port", getServiceId()); co_return tl::unexpected(StartError::FAILED); } + ICHOR_LOG_TRACE(_logger, "[{}] connecting to {}:{}", getServiceId(), Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); // The start function possibly gets called multiple times due to trying to recover from not being able to connect if(_socket == -1) { @@ -72,14 +64,9 @@ Ichor::Task> Ichor::TcpConnectionService:: ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting)); timeval timeout{}; - timeout.tv_usec = _recvTimeout; - setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); timeout.tv_usec = _sendTimeout; setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); - auto flags = ::fcntl(_socket, F_GETFL, 0); - ::fcntl(_socket, F_SETFL, flags | O_NONBLOCK); - sockaddr_in address{}; address.sin_family = AF_INET; address.sin_port = htons(Ichor::any_cast(portIt->second)); @@ -90,61 +77,69 @@ Ichor::Task> Ichor::TcpConnectionService:: throw std::runtime_error("inet_pton invalid address for given address family (has to be ipv4-valid address)"); } - bool connected{}; - while(!connected && connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0) { - ICHOR_LOG_ERROR(_logger, "[{}] connect error {}", _id, errno); + bool connected = connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0; + while(!connected && _attempts < 5) { + connected = connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0; + if(connected) { + break; + } + ICHOR_LOG_TRACE(_logger, "[{}] connect error {}", getServiceId(), errno); if(errno == EINPROGRESS) { - while(_attempts++ >= 5) { - pollfd pfd{}; - pfd.fd = _socket; - pfd.events = POLLOUT; - ret = poll(&pfd, 1, static_cast(_sendTimeout)); + // this is from when the socket was marked as nonblocking, don't think this is necessary anymore. + pollfd pfd{}; + pfd.fd = _socket; + pfd.events = POLLOUT; + ret = poll(&pfd, 1, static_cast(_sendTimeout/1'000)); + + if(ret < 0) { + ICHOR_LOG_ERROR(_logger, "[{}] poll error {}", getServiceId(), errno); + continue; + } - if(ret < 0) { - ICHOR_LOG_ERROR(_logger, "[{}] poll error {}", _id, errno); - continue; - } + // timeout + if(ret == 0) { + continue; + } + + if(pfd.revents & POLLERR) { + ICHOR_LOG_ERROR(_logger, "[{}] POLLERR {}", getServiceId(), pfd.revents); + } else if(pfd.revents & POLLHUP) { + ICHOR_LOG_ERROR(_logger, "[{}] POLLHUP {}", getServiceId(), pfd.revents); + } else if(pfd.revents & POLLOUT) { + int connect_result{}; + socklen_t result_len = sizeof(connect_result); + ret = getsockopt(_socket, SOL_SOCKET, SO_ERROR, &connect_result, &result_len); - // timeout - if(ret == 0) { - continue; + if(ret < 0) { + throw std::runtime_error("getsocketopt error: Couldn't connect"); } - if(pfd.revents & POLLERR) { - ICHOR_LOG_ERROR(_logger, "[{}] POLLERR {} {} {}", _id, pfd.revents); - } else if(pfd.revents & POLLHUP) { - ICHOR_LOG_ERROR(_logger, "[{}] POLLHUP {} {} {}", _id, pfd.revents); - } else if(pfd.revents & POLLOUT) { - int connect_result{}; - socklen_t result_len = sizeof(connect_result); - ret = getsockopt(_socket, SOL_SOCKET, SO_ERROR, &connect_result, &result_len); - - if(ret < 0) { - throw std::runtime_error("getsocketopt error: Couldn't connect"); - } - - // connect failed, retry - if(connect_result < 0) { - break; - } - connected = true; + // connect failed, retry + if(connect_result < 0) { + ICHOR_LOG_ERROR(_logger, "[{}] POLLOUT {} {}", getServiceId(), pfd.revents, connect_result); break; } + connected = true; + break; } + } else if(errno == EISCONN) { + connected = true; + break; } else if(errno == EALREADY) { std::this_thread::sleep_for(std::chrono::microseconds(_sendTimeout)); } else { _attempts++; } - - // we don't want to increment attempts in the EINPROGRESS case, but we do want to check it here - if(_attempts >= 5) { - throw std::runtime_error("Couldn't connect"); - } } auto *ip = ::inet_ntoa(address.sin_addr); - ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for {}:{}", _id, ip, ::ntohs(address.sin_port)); + + if(!connected) { + ICHOR_LOG_ERROR(_logger, "[{}] Couldn't start TCP connection for {}:{}", getServiceId(), ip, ::ntohs(address.sin_port)); + GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); + co_return tl::unexpected(StartError::FAILED); + } + ICHOR_LOG_DEBUG(_logger, "[{}] Starting TCP connection for {}:{}", getServiceId(), ip, ::ntohs(address.sin_port)); } _timer = &_timerFactory->createTimer(); @@ -160,6 +155,7 @@ Ichor::Task> Ichor::TcpConnectionService:: Ichor::Task Ichor::TcpConnectionService::stop() { _quit = true; + ICHOR_LOG_INFO(_logger, "[{}] stopping service", getServiceId()); if(_socket >= 0) { ::shutdown(_socket, SHUT_RDWR); @@ -189,12 +185,13 @@ Ichor::Task> Ichor::TcpConnectionService::sen size_t sent_bytes = 0; if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } while(sent_bytes < msg.size()) { auto ret = ::send(_socket, msg.data() + sent_bytes, msg.size() - sent_bytes, MSG_NOSIGNAL); + ICHOR_LOG_TRACE(_logger, "[{}] queued sending {} bytes, errno = {}", getServiceId(), ret, errno); if(ret < 0) { co_return tl::unexpected(IOError::FAILED); @@ -208,7 +205,7 @@ Ichor::Task> Ichor::TcpConnectionService::sen Ichor::Task> Ichor::TcpConnectionService::sendAsync(std::vector> &&msgs) { if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } @@ -217,6 +214,7 @@ Ichor::Task> Ichor::TcpConnectionService::sen while(sent_bytes < msg.size()) { auto ret = ::send(_socket, msg.data() + sent_bytes, msg.size() - sent_bytes, 0); + ICHOR_LOG_TRACE(_logger, "[{}] queued sending {} bytes", getServiceId(), ret); if(ret < 0) { co_return tl::unexpected(IOError::FAILED); @@ -253,26 +251,33 @@ void Ichor::TcpConnectionService::setReceiveHandler(std::functionstartTimer(); + if(!_timer->startTimer()) { + GetThreadLocalEventQueue().pushEvent(getServiceId(), [this]() { + if(!_timer->startTimer()) { + std::terminate(); + } + }); + } } else { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no push", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no push", getServiceId()); } }}; std::vector msg{}; - int64_t ret{}; + ssize_t ret{}; { - std::array buf; + std::array buf; do { - ret = recv(_socket, buf.data(), buf.size(), 0); + ret = recv(_socket, buf.data(), buf.size(), MSG_DONTWAIT); if (ret > 0) { auto data = std::span{reinterpret_cast(buf.data()), static_cast(ret)}; msg.insert(msg.end(), data.begin(), data.end()); } } while (ret > 0 && !_quit); } + ICHOR_LOG_TRACE(_logger, "[{}] last received {} bytes, msg size = {}, errno = {}", getServiceId(), ret, msg.size(), errno); if (_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting", getServiceId()); return; } @@ -286,6 +291,7 @@ void Ichor::TcpConnectionService::recvHandler() { if(ret == 0) { // closed connection + ICHOR_LOG_INFO(_logger, "[{}] peer closed connection", getServiceId()); GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); return; } @@ -294,7 +300,7 @@ void Ichor::TcpConnectionService::recvHandler() { if(errno == EAGAIN) { return; } - ICHOR_LOG_ERROR(_logger, "[{}] Error receiving from socket: {}", _id, errno); + ICHOR_LOG_ERROR(_logger, "[{}] Error receiving from socket: {}", getServiceId(), errno); GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); return; } diff --git a/src/services/network/tcp/TcpHostService.cpp b/src/services/network/tcp/TcpHostService.cpp index f7e43d4e..ff3444a6 100644 --- a/src/services/network/tcp/TcpHostService.cpp +++ b/src/services/network/tcp/TcpHostService.cpp @@ -125,7 +125,6 @@ Ichor::AsyncGenerator Ichor::TcpHostService::handleEvent( props.emplace("Priority", Ichor::make_any(_priority)); props.emplace("Socket", Ichor::make_any(evt.socket)); props.emplace("TimeoutSendUs", Ichor::make_any(_sendTimeout)); - props.emplace("TimeoutRecvUs", Ichor::make_any(_recvTimeout)); _connections.emplace_back(GetThreadLocalManager().template createServiceManager(std::move(props))->getServiceId()); co_return {}; diff --git a/src/services/timer/Timer.cpp b/src/services/timer/Timer.cpp index 459994bb..36f274ee 100644 --- a/src/services/timer/Timer.cpp +++ b/src/services/timer/Timer.cpp @@ -36,7 +36,7 @@ bool Ichor::Timer::startTimer(bool fireImmediately) { } std::unique_lock l{_m}; INTERNAL_IO_DEBUG("timer {} for {} startTimer({}) {} {}", _timerId, _requestingServiceId, fireImmediately, _state, _quitCbs.size()); - if(_state == TimerState::STOPPED) { + if(_state == TimerState::STOPPED || _state == TimerState::STOPPING) { l.unlock(); if(_eventInsertionThread && _eventInsertionThread->joinable()) { _eventInsertionThread->join(); diff --git a/test/TcpTests.cpp b/test/TcpTests.cpp index 3a3b9146..ff35c640 100644 --- a/test/TcpTests.cpp +++ b/test/TcpTests.cpp @@ -67,7 +67,7 @@ TEST_CASE("TcpTests") { auto queue = std::make_unique(500, true); #endif ServiceIdType tcpClientId; - evtGate = false; + evtGate = 0; std::thread t([&]() { #ifdef TEST_URING @@ -145,7 +145,7 @@ TEST_CASE("TcpTests") { _evt = std::make_unique(); auto queue = std::make_unique(true); ServiceIdType tcpClientId; - evtGate = false; + evtGate = 0; std::thread t([&]() { #ifdef TEST_URING @@ -153,8 +153,8 @@ TEST_CASE("TcpTests") { #endif auto &dm = queue->createManager(); uint64_t priorityToEnsureHostStartingFirst = 51; - dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); + dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_DEBUG)}}, priorityToEnsureHostStartingFirst); + dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_DEBUG)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}, {"BufferEntries", Ichor::make_any(static_cast(16))}, {"BufferEntrySize", Ichor::make_any(static_cast(16'384))}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, IClientFactory>(); #ifndef TEST_URING @@ -221,7 +221,7 @@ TEST_CASE("TcpTests") { _evt = std::make_unique(); auto queue = std::make_unique(true); ServiceIdType tcpClientId; - evtGate = false; + evtGate = 0; std::thread t([&]() { #ifdef TEST_URING @@ -308,7 +308,7 @@ TEST_CASE("TcpTests") { _evt = std::make_unique(); auto queue = std::make_unique(true); ServiceIdType tcpClientId; - evtGate = false; + evtGate = 0; std::thread t([&]() { #ifdef TEST_URING @@ -384,7 +384,7 @@ TEST_CASE("TcpTests") { _evt = std::make_unique(); auto queue = std::make_unique(true); ServiceIdType tcpClientId; - evtGate = false; + evtGate = 0; std::thread t([&]() { #ifdef TEST_URING @@ -467,7 +467,7 @@ TEST_CASE("TcpTests") { _evt = std::make_unique(); auto queue = std::make_unique(true); ServiceIdType tcpClientId; - evtGate = false; + evtGate = 0; std::thread t([&]() { #ifdef TEST_URING diff --git a/test/TestServices/TcpService.h b/test/TestServices/TcpService.h index cac1aea4..efcb7e28 100644 --- a/test/TestServices/TcpService.h +++ b/test/TestServices/TcpService.h @@ -31,13 +31,14 @@ class TcpService final : public ITcpService, public AdvancedService ~TcpService() final = default; void addDependencyInstance(IConnectionService &connectionService, IService &svc) { - fmt::println("svc injected {} {}", svc.getServiceId(), connectionService.isClient()); + fmt::println("{} svc injected {} {} {} {} {}", getServiceId(), svc.getServiceId(), connectionService.isClient(), evtGate.load(std::memory_order_acquire), _clientService == nullptr, _hostService == nullptr); if(connectionService.isClient()) { _clientService = &connectionService; _clientId = svc.getServiceId(); } else { _hostService = &connectionService; _hostService->setReceiveHandler([this](std::span data) { + fmt::println("svc recv {}", data.size()); std::string_view fullMsg{reinterpret_cast(data.data()), data.size()}; if(msgs.empty()) { msgs.emplace_back();