diff --git a/CMakeLists.txt b/CMakeLists.txt index 89c8de8a..73392b67 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -115,7 +115,7 @@ set(FMT_SOURCES ${ICHOR_EXTERNAL_DIR}/fmt/src/format.cc ${ICHOR_EXTERNAL_DIR}/fm file(GLOB_RECURSE ICHOR_FRAMEWORK_SOURCES ${ICHOR_TOP_DIR}/src/ichor/*.cpp) file(GLOB_RECURSE ICHOR_OPTIONAL_ETCD_SOURCES ${ICHOR_TOP_DIR}/src/services/etcd/*.cpp) file(GLOB_RECURSE ICHOR_LOGGING_SOURCES ${ICHOR_TOP_DIR}/src/services/logging/*.cpp) -file(GLOB_RECURSE ICHOR_TCP_SOURCES ${ICHOR_TOP_DIR}/src/services/network/tcp/*.cpp) +file(GLOB_RECURSE ICHOR_HTTP_SOURCES ${ICHOR_TOP_DIR}/src/services/network/http/*.cpp) file(GLOB_RECURSE ICHOR_BOOST_BEAST_SOURCES ${ICHOR_TOP_DIR}/src/services/network/boost/*.cpp) file(GLOB_RECURSE ICHOR_METRICS_SOURCES ${ICHOR_TOP_DIR}/src/services/metrics/*.cpp) file(GLOB_RECURSE ICHOR_TIMER_SOURCES ${ICHOR_TOP_DIR}/src/services/timer/Timer.cpp ${ICHOR_TOP_DIR}/src/services/timer/TimerFactoryFactory.cpp) @@ -137,7 +137,7 @@ if(ICHOR_USE_MIMALLOC AND NOT ICHOR_USE_SYSTEM_MIMALLOC) set(ICHOR_FRAMEWORK_SOURCES ${ICHOR_FRAMEWORK_SOURCES} ${ICHOR_TOP_DIR}/external/mimalloc/src/static.c) endif() -add_library(ichor ${FMT_SOURCES} ${ICHOR_FRAMEWORK_SOURCES} ${ICHOR_LOGGING_SOURCES} ${ICHOR_TCP_SOURCES} ${ICHOR_METRICS_SOURCES} ${ICHOR_TIMER_SOURCES} ${ICHOR_IO_SOURCES} ${ICHOR_BASE64_SOURCES} ${ICHOR_STL_SOURCES}) +add_library(ichor ${FMT_SOURCES} ${ICHOR_FRAMEWORK_SOURCES} ${ICHOR_LOGGING_SOURCES} ${ICHOR_TCP_SOURCES} ${ICHOR_HTTP_SOURCES} ${ICHOR_METRICS_SOURCES} ${ICHOR_TIMER_SOURCES} ${ICHOR_IO_SOURCES} ${ICHOR_BASE64_SOURCES} ${ICHOR_STL_SOURCES}) if(ICHOR_ENABLE_INTERNAL_DEBUGGING) target_compile_definitions(ichor PUBLIC ICHOR_ENABLE_INTERNAL_DEBUGGING) diff --git a/docs/02-DependencyInjection.md b/docs/02-DependencyInjection.md index fff4e6cc..5a0cb457 100644 --- a/docs/02-DependencyInjection.md +++ b/docs/02-DependencyInjection.md @@ -38,7 +38,7 @@ e.g. here is a service that adds a REST API endpoint to the running HTTP host se class BasicService final { public: BasicService(IHttpHostService *hostService) { - _routeRegistration = hostService->addRoute(HttpMethod::get, "/basic", [this, serializer](HttpRequest &req) -> AsyncGenerator { + _routeRegistration = hostService->addRoute(HttpMethod::get, "/basic", [this, serializer](HttpRequest &req) -> Task { co_return HttpResponse{HttpStatus::ok, "application/text, "This is my basic webpage", {}}; }); } diff --git a/docs/09-HttpServer.md b/docs/09-HttpServer.md index c55f3232..313be0fc 100644 --- a/docs/09-HttpServer.md +++ b/docs/09-HttpServer.md @@ -30,7 +30,7 @@ int main(int argc, char *argv[]) { class UsingHttpService final { UsingHttpService(IHttpHostService *host) { - _routeRegistrations.emplace_back(host->addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> AsyncGenerator { + _routeRegistrations.emplace_back(host->addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> Task { co_return HttpResponse{HttpStatus::ok, "application/text", "This is my basic webpage", {}}; })); } @@ -47,7 +47,7 @@ To add routes that capture parts of the URL, Ichor provides a regex route matche class UsingHttpService final { UsingHttpService(IHttpHostService *host) { - _routeRegistrations.emplace_back(svc.addRoute(HttpMethod::get, std::make_unique>(), [this](HttpRequest &req) -> AsyncGenerator { + _routeRegistrations.emplace_back(svc.addRoute(HttpMethod::get, std::make_unique>(), [this](HttpRequest &req) -> Task { std::string user_id = req.regex_params[0]; if(req.regex_params.size() > 1) { std::string query_params = req.regex_params[1]; // e.g. param1=one¶m2=two, parsing string is left to the user for now, though Ichor does provide a string_view split function in stl/StringUtils.h @@ -87,7 +87,7 @@ struct CustomRouteMatcher final : public RouteMatcher { And use that with the route registration: ```c++ -_routeRegistrations.emplace_back(svc.addRoute(HttpMethod::get, std::make_unique(), [this](HttpRequest &req) -> AsyncGenerator { +_routeRegistrations.emplace_back(svc.addRoute(HttpMethod::get, std::make_unique(), [this](HttpRequest &req) -> Task { co_return HttpResponse{HttpStatus::ok, "text/plain", {}, {}}; })); ``` diff --git a/examples/common/DebugService.h b/examples/common/DebugService.h index 8a8dda1a..684ee76c 100644 --- a/examples/common/DebugService.h +++ b/examples/common/DebugService.h @@ -20,11 +20,12 @@ class DebugService final : public AdvancedService { auto &_timer = _timerFactory->createTimer(); _timer.setCallback([this]() { printServices(); + std::terminate(); }); - _timer.setChronoInterval(500ms); + _timer.setChronoInterval(1000ms); _timer.startTimer(); - printServices(); + // printServices(); co_return {}; } @@ -69,6 +70,8 @@ class DebugService final : public AdvancedService { ICHOR_LOG_INFO(_logger, ""); } + + ICHOR_LOG_INFO(_logger, "\n===================================\n"); } void addDependencyInstance(ILogger &logger, IService &) { diff --git a/examples/factory_example/FactoryService.h b/examples/factory_example/FactoryService.h index d694d829..11de7870 100644 --- a/examples/factory_example/FactoryService.h +++ b/examples/factory_example/FactoryService.h @@ -65,6 +65,7 @@ class FactoryService final { if(runtimeService == end(_scopedRuntimeServices)) { auto newProps = *evt.properties.value(); + newProps.erase("Filter"); // `Filter` is a magic keyword that Ichor uses to determine if this service is global or if Ichor should use its filtering logic. // In this case, we tell Ichor to only insert this service if the requesting service has a matching scope newProps.emplace("Filter", Ichor::make_any(ScopeFilterEntry{scope})); diff --git a/examples/http_example/UsingHttpService.h b/examples/http_example/UsingHttpService.h index e93fd30a..4a0c2d48 100644 --- a/examples/http_example/UsingHttpService.h +++ b/examples/http_example/UsingHttpService.h @@ -74,12 +74,12 @@ class UsingHttpService final : public AdvancedService { void addDependencyInstance(IHttpHostService &svc, IService&) { ICHOR_LOG_INFO(_logger, "Inserted IHttpHostService"); - _routeRegistrations.emplace_back(svc.addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> AsyncGenerator { + _routeRegistrations.emplace_back(svc.addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> Task { auto msg = _serializer->deserialize(req.body); ICHOR_LOG_WARN(_logger, "received request on route {} {} with testmsg {} - {}", (int)req.method, req.route, msg->id, msg->val); co_return HttpResponse{HttpStatus::ok, "application/json", _serializer->serialize(TestMsg{11, "hello"}), {}}; })); - _routeRegistrations.emplace_back(svc.addRoute(HttpMethod::get, std::make_unique>(), [this](HttpRequest &req) -> AsyncGenerator { + _routeRegistrations.emplace_back(svc.addRoute(HttpMethod::get, std::make_unique>(), [this](HttpRequest &req) -> Task { ICHOR_LOG_WARN(_logger, "received request on route {} {} with params:", (int)req.method, req.route); for(auto const ¶m : req.regex_params) { ICHOR_LOG_WARN(_logger, "{}", param); diff --git a/examples/http_ping_pong/PongService.h b/examples/http_ping_pong/PongService.h index 1cfe3ceb..8e23e36e 100644 --- a/examples/http_ping_pong/PongService.h +++ b/examples/http_ping_pong/PongService.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "PingMsg.h" using namespace Ichor; @@ -12,7 +12,7 @@ using namespace Ichor; class PongService final { public: PongService(ILogger *logger, ISerializer *serializer, IHttpHostService *hostService) : _logger(logger) { - _routeRegistration = hostService->addRoute(HttpMethod::post, "/ping", [this, serializer](HttpRequest &req) -> AsyncGenerator { + _routeRegistration = hostService->addRoute(HttpMethod::post, "/ping", [this, serializer](HttpRequest &req) -> Task { ICHOR_LOG_INFO(_logger, "received request from {} with body {} ", req.address, std::string_view{reinterpret_cast(req.body.data()), req.body.size()}); auto msg = serializer->deserialize(req.body); ICHOR_LOG_INFO(_logger, "received request from {} on route {} {} with PingMsg {}", req.address, (int) req.method, req.route, msg->sequence); diff --git a/examples/tcp_example/main.cpp b/examples/tcp_example/main.cpp index 7f1f7f8f..042cf160 100644 --- a/examples/tcp_example/main.cpp +++ b/examples/tcp_example/main.cpp @@ -68,7 +68,7 @@ int main(int argc, char *argv[]) { dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager>(); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef URING_EXAMPLE dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif diff --git a/include/ichor/Concepts.h b/include/ichor/Concepts.h index 19ad80c6..83201d92 100644 --- a/include/ichor/Concepts.h +++ b/include/ichor/Concepts.h @@ -5,6 +5,8 @@ #include #include +#include "Concepts.h" + namespace Ichor { struct DependencyRegister; @@ -25,6 +27,12 @@ namespace Ichor { template concept Derived = std::is_base_of::value; + template + concept DerivedEither = std::is_base_of::value || std::is_base_of::value; + + template + concept DerivedAny = std::is_base_of::value || std::is_base_of::value; + template class U> concept DerivedTemplated = std::is_base_of, T>::value; diff --git a/include/ichor/coroutines/Task.h b/include/ichor/coroutines/Task.h index c19366bc..02a32f71 100644 --- a/include/ichor/coroutines/Task.h +++ b/include/ichor/coroutines/Task.h @@ -17,7 +17,7 @@ namespace Ichor { - template class ICHOR_CORO_AWAIT_ELIDABLE ICHOR_CORO_LIFETIME_BOUND ICHOR_CORO_RETURN_TYPE Task; + template class ICHOR_CORO_AWAIT_ELIDABLE ICHOR_CORO_LIFETIME_BOUND Task; namespace Detail { @@ -250,7 +250,7 @@ namespace Ichor /// caller. Execution of the coroutine body does not start until the /// coroutine is first co_await'ed. template - class [[nodiscard]] ICHOR_CORO_AWAIT_ELIDABLE ICHOR_CORO_LIFETIME_BOUND ICHOR_CORO_RETURN_TYPE Task + class [[nodiscard]] ICHOR_CORO_AWAIT_ELIDABLE ICHOR_CORO_LIFETIME_BOUND Task { public: diff --git a/include/ichor/events/InternalEvents.h b/include/ichor/events/InternalEvents.h index 7e990980..8da05f61 100644 --- a/include/ichor/events/InternalEvents.h +++ b/include/ichor/events/InternalEvents.h @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace Ichor { @@ -44,7 +45,7 @@ namespace Ichor { /// When a new service gets created that requests dependencies, each dependency it requests adds this event struct DependencyRequestEvent final : public Event { - explicit DependencyRequestEvent(uint64_t _id, ServiceIdType _originatingService, uint64_t _priority, Dependency _dependency, tl::optional _properties) noexcept : + explicit DependencyRequestEvent(uint64_t _id, ServiceIdType _originatingService, uint64_t _priority, Dependency _dependency, tl::optional> _properties) noexcept : Event(_id, _originatingService, _priority), dependency(_dependency), properties{_properties} {} ~DependencyRequestEvent() final = default; @@ -56,7 +57,7 @@ namespace Ichor { } Dependency dependency; - tl::optional properties; + tl::optional> properties; static constexpr NameHashType TYPE = typeNameHash(); static constexpr std::string_view NAME = typeName(); }; diff --git a/include/ichor/services/logging/LoggerFactory.h b/include/ichor/services/logging/LoggerFactory.h index 2cc746aa..0af51f78 100644 --- a/include/ichor/services/logging/LoggerFactory.h +++ b/include/ichor/services/logging/LoggerFactory.h @@ -18,7 +18,7 @@ namespace Ichor { class LoggerFactory final : public ILoggerFactory, public AdvancedService> { public: LoggerFactory(DependencyRegister ®, Properties props) : AdvancedService>(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); auto logLevelProp = AdvancedService>::getProperties().find("DefaultLogLevel"); if(logLevelProp != end(AdvancedService>::getProperties())) { @@ -65,10 +65,12 @@ namespace Ichor { } Properties props{}; + props.reserve(2); props.template emplace<>("Filter", Ichor::make_any(ServiceIdFilterEntry{evt.originatingService})); props.template emplace<>("LogLevel", Ichor::make_any(requestedLevel)); auto newLogger = GetThreadLocalManager().template createServiceManager(std::move(props), evt.priority); _loggers.emplace(evt.originatingService, newLogger->getServiceId()); + ICHOR_LOG_TRACE(_logger, "created logger for svcid {}", evt.originatingService); } else { ICHOR_LOG_TRACE(_logger, "svcid {} already has logger", evt.originatingService); } diff --git a/include/ichor/services/network/ClientFactory.h b/include/ichor/services/network/ClientFactory.h index 3aa212b4..6e95b274 100644 --- a/include/ichor/services/network/ClientFactory.h +++ b/include/ichor/services/network/ClientFactory.h @@ -13,11 +13,12 @@ namespace Ichor { class ClientFactory final : public IClientFactory, public AdvancedService> { public: ClientFactory(DependencyRegister ®, Properties properties) : AdvancedService>(std::move(properties)) { - reg.registerDependency(this, DependencyFlags::NONE, AdvancedService>::getProperties()); + reg.registerDependency(this, DependencyFlags::REQUIRED, AdvancedService>::getProperties()); } ~ClientFactory() final = default; uint64_t createNewConnection(NeverNull requestingSvc, Properties properties) final { + properties.erase("Filter"); properties.emplace("Filter", Ichor::make_any(ServiceIdFilterEntry{requestingSvc->getServiceId()})); ConnectionCounterType count = _connectionCounter++; @@ -74,19 +75,24 @@ namespace Ichor { AsyncGenerator handleDependencyRequest(AlwaysNull, DependencyRequestEvent const &evt) { if(!evt.properties.has_value()) { - throw std::runtime_error("Missing properties"); + ICHOR_LOG_TRACE(_logger, "Missing properties when creating new connection {}", evt.originatingService); + co_return {}; } if(!evt.properties.value()->contains("Address")) { - throw std::runtime_error("Missing address"); + ICHOR_LOG_TRACE(_logger, "Missing address when creating new connection {}", evt.originatingService); + co_return {}; } if(!evt.properties.value()->contains("Port")) { - throw std::runtime_error("Missing port"); + ICHOR_LOG_TRACE(_logger, "Missing port when creating new connection {}", evt.originatingService); + co_return {}; } if(!_connections.contains(evt.originatingService)) { + fmt::println("{} creating {} for {}", typeName>(), typeName(), evt.originatingService); auto newProps = *evt.properties.value(); + newProps.erase("Filter"); newProps.emplace("Filter", Ichor::make_any(ServiceIdFilterEntry{evt.originatingService})); unordered_map newMap; diff --git a/include/ichor/services/network/IConnectionService.h b/include/ichor/services/network/IConnectionService.h index 61bb13f1..9e7029c1 100644 --- a/include/ichor/services/network/IConnectionService.h +++ b/include/ichor/services/network/IConnectionService.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace Ichor { class IConnectionService { @@ -41,4 +42,18 @@ namespace Ichor { protected: ~IConnectionService() = default; }; + + struct IHostConnectionService : public IConnectionService { + using IConnectionService::IConnectionService; + + protected: + ~IHostConnectionService() = default; + }; + + struct IClientConnectionService : public IConnectionService { + using IConnectionService::IConnectionService; + + protected: + ~IClientConnectionService() = default; + }; } diff --git a/include/ichor/services/network/boost/HttpHostService.h b/include/ichor/services/network/boost/HttpHostService.h index c71f30d9..f1d1d440 100644 --- a/include/ichor/services/network/boost/HttpHostService.h +++ b/include/ichor/services/network/boost/HttpHostService.h @@ -50,8 +50,8 @@ namespace Ichor::Boost { HttpHostService(DependencyRegister ®, Properties props); ~HttpHostService() final = default; - HttpRouteRegistration addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) final; - HttpRouteRegistration addRoute(HttpMethod method, std::unique_ptr matcher, std::function(HttpRequest&)> handler) final; + HttpRouteRegistration addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) final; + HttpRouteRegistration addRoute(HttpMethod method, std::unique_ptr matcher, std::function(HttpRequest&)> handler) final; void removeRoute(HttpMethod method, RouteIdType id) final; void setPriority(uint64_t priority) final; @@ -94,7 +94,7 @@ namespace Ichor::Boost { bool _debug{}; std::atomic _logger{}; IAsioContextService *_asioContextService{}; - unordered_map, std::function(HttpRequest&)>>> _handlers{}; + unordered_map, std::function(HttpRequest&)>>> _handlers{}; AsyncManualResetEvent _startStopEvent{}; IEventQueue *_queue; }; diff --git a/include/ichor/services/network/boost/WsConnectionService.h b/include/ichor/services/network/boost/WsConnectionService.h index e7f5c350..d64ec05e 100644 --- a/include/ichor/services/network/boost/WsConnectionService.h +++ b/include/ichor/services/network/boost/WsConnectionService.h @@ -26,7 +26,7 @@ namespace Ichor::Boost { }; } - class WsConnectionService final : public IConnectionService, public AdvancedService { + class WsConnectionService final : public IConnectionService, public IHostConnectionService, public IClientConnectionService, public AdvancedService { public: WsConnectionService(DependencyRegister ®, Properties props); ~WsConnectionService() final = default; diff --git a/include/ichor/services/network/http/HttpConnectionService.h b/include/ichor/services/network/http/HttpConnectionService.h new file mode 100644 index 00000000..f27a4436 --- /dev/null +++ b/include/ichor/services/network/http/HttpConnectionService.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Ichor { + /** + * Service for creating an HTTP/1.1 server. Does not support chunked encoding and is technically not RFC 7320 compliant. + * + * Properties: + * - "Address" std::string - What address to bind to (required) + * - "Port" uint16_t - What port to bind to (required) + * - "Priority" uint64_t - What priority to insert events with (e.g. when getting a response from the client) + * - "NoDelay" bool - whether to enable TCP nodelay, a.k.a. disabling Nagle's algorithm, for reduced latency at the expense of throughput. (default: false) + * - "ConnectOverSsl" bool - Set to true to connect over HTTPS instead of HTTP (default: false) + * - "RootCA" std::string - If ConnectOverSsl is true and this property is set, trust the given RootCA (default: not set) + * - "TryConnectIntervalMs" uint64_t - with which interval in milliseconds to try (re)connecting (default: 100 ms) + * - "TimeoutMs" uint64_t - with which interval in milliseconds to timeout for (re)connecting, after which the service stops itself (default: 10'000 ms) + * - "Debug" bool - Enable verbose logging of requests and responses (default: false) + */ + class HttpConnectionService final : public IHttpConnectionService, public AdvancedService { + public: + HttpConnectionService(DependencyRegister ®, Properties props); + ~HttpConnectionService() final = default; + + Task sendAsync(HttpMethod method, std::string_view route, unordered_map &&headers, std::vector&& msg) final; + + Task close() final; + + void setPriority(uint64_t priority) final; + uint64_t getPriority() final; + + private: + Task> start() final; + Task stop() final; + + void addDependencyInstance(ILogger &logger, IService &isvc); + void removeDependencyInstance(ILogger &logger, IService &isvc); + + void addDependencyInstance(IEventQueue &q, IService &isvc); + void removeDependencyInstance(IEventQueue &q, IService &isvc); + + void addDependencyInstance(IClientConnectionService &c, IService &isvc); + void removeDependencyInstance(IClientConnectionService &c, IService &isvc); + + tl::expected parseResponse(std::string_view complete, size_t& len) const; + + friend DependencyRegister; + + uint64_t _priority{INTERNAL_EVENT_PRIORITY}; + bool _debug{}; + ILogger* _logger{}; + IEventQueue *_queue{}; + IClientConnectionService *_connection{}; + std::deque>> _events; + std::string _buffer; + }; +} diff --git a/include/ichor/services/network/http/HttpHostService.h b/include/ichor/services/network/http/HttpHostService.h new file mode 100644 index 00000000..8cad37e0 --- /dev/null +++ b/include/ichor/services/network/http/HttpHostService.h @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace Ichor { + /** + * Service for creating an HTTP/1.1 server using boost. Requires an IAsioContextService and a logger. + * + * Properties: + * - "Address" std::string - What address to bind to (required) + * - "Port" uint16_t - What port to bind to (required) + * - "Priority" uint64_t - What priority to insert events with (e.g. when getting a response from the client) + * - "NoDelay" bool - whether to enable TCP nodelay, a.k.a. disabling Nagle's algorithm, for reduced latency at the expense of throughput. (default: false) + * - "ConnectOverSsl" bool - Set to true to connect over HTTPS instead of HTTP (default: false) + * - "RootCA" std::string - If ConnectOverSsl is true and this property is set, trust the given RootCA (default: not set) + * - "TryConnectIntervalMs" uint64_t - with which interval in milliseconds to try (re)connecting (default: 100 ms) + * - "TimeoutMs" uint64_t - with which interval in milliseconds to timeout for (re)connecting, after which the service stops itself (default: 10'000 ms) + * - "Debug" bool - Enable verbose logging of requests and responses (default: false) + */ + class HttpHostService final : public IHttpHostService, public AdvancedService { + public: + HttpHostService(DependencyRegister ®, Properties props); + ~HttpHostService() final = default; + + HttpRouteRegistration addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) final; + HttpRouteRegistration addRoute(HttpMethod method, std::unique_ptr matcher, std::function(HttpRequest&)> handler) final; + void removeRoute(HttpMethod method, RouteIdType id) final; + + void setPriority(uint64_t priority) final; + uint64_t getPriority() final; + + private: + Task> start() final; + Task stop() final; + + void addDependencyInstance(ILogger &logger, IService &isvc); + void removeDependencyInstance(ILogger &logger, IService &isvc); + + void addDependencyInstance(IEventQueue &q, IService &isvc); + void removeDependencyInstance(IEventQueue &q, IService &isvc); + + void addDependencyInstance(IHostService &c, IService &isvc); + void removeDependencyInstance(IHostService &c, IService &isvc); + + void addDependencyInstance(IHostConnectionService &c, IService &isvc); + void removeDependencyInstance(IHostConnectionService &c, IService &isvc); + + tl::expected parseRequest(std::string_view complete, size_t& len) const; + Task receiveRequestHandler(ServiceIdType id); + Task sendResponse(ServiceIdType id, const HttpResponse &response); + + friend DependencyRegister; + + uint64_t _priority{INTERNAL_EVENT_PRIORITY}; + uint64_t _streamIdCounter{}; + uint64_t _matchersIdCounter{}; + bool _sendServerHeader{true}; + bool _debug{}; + unordered_map, std::function(HttpRequest&)>>> _handlers{}; + ILogger* _logger{}; + IEventQueue *_queue; + unordered_set _hostServiceIds; + unordered_map _connections; + unordered_map _connectionBuffers; + }; +} diff --git a/include/ichor/services/network/http/HttpInternal.h b/include/ichor/services/network/http/HttpInternal.h new file mode 100644 index 00000000..c7f54383 --- /dev/null +++ b/include/ichor/services/network/http/HttpInternal.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include + +namespace Ichor { + extern unordered_map ICHOR_METHOD_MATCHING; + extern unordered_map ICHOR_REVERSE_METHOD_MATCHING; + extern unordered_map ICHOR_STATUS_MATCHING; + constexpr std::string_view ICHOR_HTTP_VERSION_MATCH = "HTTP/1.1"; + + enum class HttpParseError : int_fast16_t { + NONE, + BADREQUEST, + WRONGHTTPVERSION, + QUITTING, + BUFFEROVERFLOW + }; +} + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { + return ctx.end(); + } + + template + auto format(const Ichor::HttpParseError& change, FormatContext& ctx) const { + switch(change) { + case Ichor::HttpParseError::NONE: + return fmt::format_to(ctx.out(), "NONE"); + case Ichor::HttpParseError::BADREQUEST: + return fmt::format_to(ctx.out(), "BADREQUEST"); + case Ichor::HttpParseError::WRONGHTTPVERSION: + return fmt::format_to(ctx.out(), "WRONGHTTPVERSION"); + case Ichor::HttpParseError::QUITTING: + return fmt::format_to(ctx.out(), "QUITTING"); + case Ichor::HttpParseError::BUFFEROVERFLOW: + return fmt::format_to(ctx.out(), "BUFFEROVERFLOW"); + } + return fmt::format_to(ctx.out(), "error, please file a bug in Ichor"); + } +}; diff --git a/include/ichor/services/network/http/IHttpHostService.h b/include/ichor/services/network/http/IHttpHostService.h index e8d1b3ec..bc57530e 100644 --- a/include/ichor/services/network/http/IHttpHostService.h +++ b/include/ichor/services/network/http/IHttpHostService.h @@ -98,8 +98,8 @@ namespace Ichor { class IHttpHostService { public: - virtual HttpRouteRegistration addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) = 0; - virtual HttpRouteRegistration addRoute(HttpMethod method, std::unique_ptr matcher, std::function(HttpRequest&)> handler) = 0; + virtual HttpRouteRegistration addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) = 0; + virtual HttpRouteRegistration addRoute(HttpMethod method, std::unique_ptr matcher, std::function(HttpRequest&)> handler) = 0; virtual void setPriority(uint64_t priority) = 0; virtual uint64_t getPriority() = 0; diff --git a/include/ichor/services/network/tcp/IOUringTcpConnectionService.h b/include/ichor/services/network/tcp/IOUringTcpConnectionService.h index 87552920..ef839b14 100644 --- a/include/ichor/services/network/tcp/IOUringTcpConnectionService.h +++ b/include/ichor/services/network/tcp/IOUringTcpConnectionService.h @@ -26,7 +26,8 @@ namespace Ichor { * - "BufferEntries" uint32_t - If kernel supports multishot, how many buffers to create for recv (default 8) * - "BufferEntrySize" uint32_t - If kernel supports multishot, how big one entry is for the allocated buffers (default 16'384) */ - class IOUringTcpConnectionService final : public IConnectionService, public AdvancedService { + template requires DerivedAny + class IOUringTcpConnectionService final : public InterfaceT, public AdvancedService> { public: IOUringTcpConnectionService(DependencyRegister ®, Properties props); ~IOUringTcpConnectionService() final = default; @@ -54,9 +55,7 @@ namespace Ichor { friend DependencyRegister; - static uint64_t tcpConnId; int _socket; - uint64_t _id; int64_t _sendTimeout{250'000}; int64_t _recvTimeout{250'000}; uint32_t _bufferEntries{16}; diff --git a/include/ichor/services/network/tcp/TcpConnectionService.h b/include/ichor/services/network/tcp/TcpConnectionService.h index 721ec689..148d85e8 100644 --- a/include/ichor/services/network/tcp/TcpConnectionService.h +++ b/include/ichor/services/network/tcp/TcpConnectionService.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace Ichor { @@ -18,7 +19,8 @@ namespace Ichor { * - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY) * - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000) */ - class TcpConnectionService final : public IConnectionService, public AdvancedService { + template requires DerivedAny + class TcpConnectionService final : public InterfaceT, public AdvancedService> { public: TcpConnectionService(DependencyRegister ®, Properties props); ~TcpConnectionService() final = default; diff --git a/include/ichor/stl/NeverAlwaysNull.h b/include/ichor/stl/NeverAlwaysNull.h index d54a5e7e..050bd3cc 100644 --- a/include/ichor/stl/NeverAlwaysNull.h +++ b/include/ichor/stl/NeverAlwaysNull.h @@ -30,6 +30,7 @@ namespace Ichor { class NeverNull { public: static_assert(Detail::is_comparable_to_nullptr::value, "T cannot be compared to nullptr."); + static_assert(std::is_pointer_v, "T has to be a pointer."); template ::value>> constexpr NeverNull(U&& u) : ptr_(std::forward(u)) diff --git a/include/ichor/stl/StringUtils.h b/include/ichor/stl/StringUtils.h index 93ac7ef4..4972221e 100644 --- a/include/ichor/stl/StringUtils.h +++ b/include/ichor/stl/StringUtils.h @@ -7,6 +7,7 @@ #include #include #include +#include #ifdef _GLIBCXX_DEBUG #define NO_DEBUG_CONSTEXPR @@ -38,6 +39,16 @@ namespace Ichor { while ((x = uint8_t(*str++ - '0')) <= 9) val = val * 10 + x; return val; } + static constexpr uint64_t FastAtoiu(std::string_view str) noexcept { + uint64_t val = 0; + uint8_t x; + size_t pos{}; + while (pos < str.size() && (x = uint8_t(str[pos] - '0')) <= 9) { + val = val * 10 + x; + pos++; + } + return val; + } struct Version final { uint64_t major; @@ -83,16 +94,22 @@ namespace Ichor { } } + static constexpr bool IsOnlyDigits(std::string_view str) { + return std::all_of(str.cbegin(), str.cend(), [](char const c) { + return c >= '0' && c <= '9'; + }); + } + static constexpr tl::optional parseStringAsVersion(std::string_view str) { if(str.length() < 5) { return {}; } - auto wrongLetterCount = std::count_if(str.cbegin(), str.cend(), [](char const c) { + auto wrongLetterCount = std::any_of(str.cbegin(), str.cend(), [](char const c) { return c != '0' && c != '1' && c != '2' && c != '3' && c != '4' && c != '5' && c != '6' && c != '7' && c != '8' && c != '9' && c != '.'; }); - if(wrongLetterCount != 0) { + if(wrongLetterCount) { return {}; } diff --git a/src/services/etcd/EtcdV2Service.cpp b/src/services/etcd/EtcdV2Service.cpp index 868892a3..0b9b3c6d 100644 --- a/src/services/etcd/EtcdV2Service.cpp +++ b/src/services/etcd/EtcdV2Service.cpp @@ -200,7 +200,7 @@ struct glz::meta { }; EtcdService::EtcdService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE, getProperties()); + reg.registerDependency(this, DependencyFlags::REQUIRED, getProperties()); reg.registerDependency(this, DependencyFlags::REQUIRED | DependencyFlags::ALLOW_MULTIPLE, getProperties()); reg.registerDependency(this, DependencyFlags::REQUIRED); } diff --git a/src/services/etcd/EtcdV3Service.cpp b/src/services/etcd/EtcdV3Service.cpp index 26b5829b..2431e17e 100644 --- a/src/services/etcd/EtcdV3Service.cpp +++ b/src/services/etcd/EtcdV3Service.cpp @@ -1041,7 +1041,7 @@ namespace glz::detail { } EtcdService::EtcdService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE, getProperties()); + reg.registerDependency(this, DependencyFlags::REQUIRED, getProperties()); reg.registerDependency(this, DependencyFlags::REQUIRED | DependencyFlags::ALLOW_MULTIPLE, getProperties()); reg.registerDependency(this, DependencyFlags::REQUIRED); } diff --git a/src/services/io/IOUringAsyncFileIO.cpp b/src/services/io/IOUringAsyncFileIO.cpp index 16a0313f..2a6bc87b 100644 --- a/src/services/io/IOUringAsyncFileIO.cpp +++ b/src/services/io/IOUringAsyncFileIO.cpp @@ -7,7 +7,7 @@ Ichor::IOUringAsyncFileIO::IOUringAsyncFileIO(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { reg.registerDependency(this, DependencyFlags::REQUIRED); - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); } diff --git a/src/services/network/boost/AsioContextService.cpp b/src/services/network/boost/AsioContextService.cpp index f1acfcc6..a67c6bab 100644 --- a/src/services/network/boost/AsioContextService.cpp +++ b/src/services/network/boost/AsioContextService.cpp @@ -15,7 +15,7 @@ Ichor::Boost::AsioContextService::AsioContextService(DependencyRegister ®, Pr } } - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); } Ichor::Boost::AsioContextService::~AsioContextService() { diff --git a/src/services/network/boost/HttpConnectionService.cpp b/src/services/network/boost/HttpConnectionService.cpp index c03f2140..acd5d8cb 100644 --- a/src/services/network/boost/HttpConnectionService.cpp +++ b/src/services/network/boost/HttpConnectionService.cpp @@ -6,7 +6,7 @@ #include Ichor::Boost::HttpConnectionService::HttpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } diff --git a/src/services/network/boost/HttpHostService.cpp b/src/services/network/boost/HttpHostService.cpp index 777fb4bf..740aacda 100644 --- a/src/services/network/boost/HttpHostService.cpp +++ b/src/services/network/boost/HttpHostService.cpp @@ -4,7 +4,7 @@ #include Ichor::Boost::HttpHostService::HttpHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } @@ -130,17 +130,17 @@ uint64_t Ichor::Boost::HttpHostService::getPriority() { return _priority.load(std::memory_order_acquire); } -Ichor::HttpRouteRegistration Ichor::Boost::HttpHostService::addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) { +Ichor::HttpRouteRegistration Ichor::Boost::HttpHostService::addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) { return addRoute(method, std::make_unique(route), std::move(handler)); } -Ichor::HttpRouteRegistration Ichor::Boost::HttpHostService::addRoute(HttpMethod method, std::unique_ptr newMatcher, std::function(HttpRequest&)> handler) { +Ichor::HttpRouteRegistration Ichor::Boost::HttpHostService::addRoute(HttpMethod method, std::unique_ptr newMatcher, std::function(HttpRequest&)> handler) { auto routes = _handlers.find(method); newMatcher->set_id(_matchersIdCounter); if(routes == _handlers.end()) { - unordered_map, std::function(HttpRequest&)>> newSubMap{}; + unordered_map, std::function(HttpRequest&)>> newSubMap{}; newSubMap.emplace(std::move(newMatcher), std::move(handler)); _handlers.emplace(method, std::move(newSubMap)); } else { @@ -335,14 +335,14 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context #else _queue->pushEvent(getServiceId(), [this, connection, httpReq = std::move(httpReq), version = req.version(), keep_alive = req.keep_alive()]() mutable -> AsyncGenerator { #endif - auto routes = _handlers.find(static_cast(httpReq.method)); + auto routes = _handlers.find(httpReq.method); if (_quit.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { co_return{}; } if (routes != std::end(_handlers)) { - std::function(HttpRequest&)> const *f{}; + std::function(HttpRequest&)> const *f{}; for(auto const &[matcher, handler] : routes->second) { if(matcher->matches(httpReq.route)) { httpReq.regex_params = matcher->route_params(); @@ -352,9 +352,7 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context } if (f != nullptr) { - auto gen = (*f)(httpReq); - auto it = co_await gen.begin(); - HttpResponse httpRes = std::move(*it); + auto httpRes = co_await (*f)(httpReq); http::response, http::basic_fields>> res{ static_cast(httpRes.status), version }; if (_sendServerHeader) { res.set(http::field::server, BOOST_BEAST_VERSION_STRING); diff --git a/src/services/network/boost/WsConnectionService.cpp b/src/services/network/boost/WsConnectionService.cpp index 2cae4406..0da69a29 100644 --- a/src/services/network/boost/WsConnectionService.cpp +++ b/src/services/network/boost/WsConnectionService.cpp @@ -25,7 +25,7 @@ void setup_stream(std::shared_ptr>& ws) } Ichor::Boost::WsConnectionService::WsConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); if(auto propIt = getProperties().find("WsHostServiceId"); propIt != getProperties().end()) { reg.registerDependency(this, DependencyFlags::REQUIRED, diff --git a/src/services/network/boost/WsHostService.cpp b/src/services/network/boost/WsHostService.cpp index 6717010a..772600c1 100644 --- a/src/services/network/boost/WsHostService.cpp +++ b/src/services/network/boost/WsHostService.cpp @@ -7,7 +7,7 @@ #include Ichor::Boost::WsHostService::WsHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } @@ -106,7 +106,7 @@ Ichor::AsyncGenerator Ichor::Boost::WsHostService::handle co_return {}; } - auto connection = GetThreadLocalManager().createServiceManager(Properties{ + auto connection = GetThreadLocalManager().createServiceManager(Properties{ {"WsHostServiceId", Ichor::make_any(getServiceId())}, {"Socket", Ichor::make_unformattable_any(evt._socket)} }); diff --git a/src/services/network/http/HttpConnectionService.cpp b/src/services/network/http/HttpConnectionService.cpp new file mode 100644 index 00000000..2b82b9b1 --- /dev/null +++ b/src/services/network/http/HttpConnectionService.cpp @@ -0,0 +1,308 @@ +#include +#include + +Ichor::HttpConnectionService::HttpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { + reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED, getProperties()); +} + +Ichor::Task Ichor::HttpConnectionService::sendAsync(HttpMethod method, std::string_view route, unordered_map &&headers, std::vector &&msg) { + if(_connection == nullptr) { + ICHOR_LOG_TRACE(_logger, "_connection nullptr"); + co_return {}; + } + + std::vector resp; + resp.reserve(8192); + auto methodText = ICHOR_REVERSE_METHOD_MATCHING.find(method); + if (methodText == ICHOR_REVERSE_METHOD_MATCHING.end()) { + co_return HttpResponse{}; + } + fmt::format_to(std::back_inserter(resp), "{} {} HTTP/1.1\r\n", methodText->second, route); + for (auto const &[k, v] : headers) { + if(k.empty() || k.front() == ' ' || k.back() == ' ') { + co_return HttpResponse{}; + } + if(v.empty() || v.front() == ' ' || v.back() == ' ') { + co_return HttpResponse{}; + } + fmt::format_to(std::back_inserter(resp), "{}: {}\r\n", k, v); + } + if(!msg.empty()) { + fmt::format_to(std::back_inserter(resp), "Content-Length: {}\r\n", msg.size()); + } + fmt::format_to(std::back_inserter(resp), "\r\n"); + if(!msg.empty()) { + resp.insert(resp.end(), msg.begin(), msg.end()); + } + // ICHOR_LOG_TRACE(_logger, "HttpConnection {} sending\n{}\n===", getServiceId(), std::string_view{reinterpret_cast(resp.data()), resp.size()}); + auto success = co_await _connection->sendAsync(std::move(resp)); + + if(!success) { + co_return HttpResponse{}; + } + + auto &evt = _events.emplace_back(); + + auto &parseResp = co_await evt; + + if(!parseResp) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} Failed to parse response: {}", getServiceId(), parseResp.error()); + co_return HttpResponse{}; + } + + co_return *parseResp; +} + +Ichor::Task Ichor::HttpConnectionService::close() { + co_return; +} + +Ichor::Task> Ichor::HttpConnectionService::start() { + auto addrIt = getProperties().find("Address"); + auto portIt = getProperties().find("Port"); + + if(addrIt == getProperties().end()) { + ICHOR_LOG_ERROR(_logger, "Missing address"); + co_return tl::unexpected(StartError::FAILED); + } + if(portIt == getProperties().end()) { + ICHOR_LOG_ERROR(_logger, "Missing port"); + co_return tl::unexpected(StartError::FAILED); + } + + if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) { + _priority = Ichor::any_cast(propIt->second); + } + if(auto propIt = getProperties().find("Debug"); propIt != getProperties().end()) { + _debug = Ichor::any_cast(propIt->second); + } + ICHOR_LOG_TRACE(_logger, "HttpConnection {} started", getServiceId()); + + co_return {}; +} + +Ichor::Task Ichor::HttpConnectionService::stop() { + if(!_events.empty()) { + std::terminate(); + } + ICHOR_LOG_TRACE(_logger, "HttpConnection {} stopped", getServiceId()); + + co_return; +} + +void Ichor::HttpConnectionService::addDependencyInstance(ILogger &logger, IService &) { + _logger = &logger; + ICHOR_LOG_TRACE(_logger, "HttpConnection {} got logger", getServiceId()); +} + +void Ichor::HttpConnectionService::removeDependencyInstance(ILogger &, IService &) { + _logger = nullptr; +} + +void Ichor::HttpConnectionService::addDependencyInstance(IEventQueue &q, IService &) { + _queue = &q; + ICHOR_LOG_TRACE(_logger, "HttpConnection {} got queue", getServiceId()); +} + +void Ichor::HttpConnectionService::removeDependencyInstance(IEventQueue &, IService &) { + _queue = nullptr; +} + +void Ichor::HttpConnectionService::addDependencyInstance(IClientConnectionService &client, IService &s) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} got connection {}", getServiceId(), s.getServiceId()); + if(!client.isClient()) { + ICHOR_LOG_TRACE(_logger, "connection {} is not a client connection", s.getServiceId()); + return; + } + + _connection = &client; + _connection->setReceiveHandler([this](std::span buffer) { + std::string_view msg{reinterpret_cast(buffer.data()), buffer.size()}; + _buffer.append(msg.data(), msg.size()); + if(_buffer.size() > 1024*1024*512) { + _buffer.clear(); + tl::expected err = tl::unexpected(HttpParseError::BUFFEROVERFLOW); + for(auto &evt : _events) { + evt.set(err); + } + return; + } + msg = _buffer; + // ICHOR_LOG_TRACE(_logger, "HttpConnection {} receive buffer {}", getServiceId(), msg); + + if(_events.empty()) { + ICHOR_LOG_ERROR(_logger, "No events to set?"); + } + + auto pos = msg.find("\r\n\r\n"); + + while(pos != std::string_view::npos) { + pos += 4; + auto resp = parseResponse(msg, pos); + + if(!_events.empty()) { + { + auto &front = _events.front(); + front.set(resp); + } + _events.pop_front(); + } + + msg = msg.substr(pos); + pos = msg.find("\r\n\r\n"); + } + + if(!msg.empty()) { + _buffer = msg; + } else { + _buffer.clear(); + } + }); +} + +void Ichor::HttpConnectionService::removeDependencyInstance(IClientConnectionService &client, IService &) { + if(&client != _connection) { + return; + } + + _connection = nullptr; + for(auto &evt : _events) { + evt.set(tl::unexpected(HttpParseError::QUITTING)); + } + _events.clear(); +} + +void Ichor::HttpConnectionService::setPriority(uint64_t priority) { + _priority = priority; +} + +uint64_t Ichor::HttpConnectionService::getPriority() { + return _priority; +} + +tl::expected Ichor::HttpConnectionService::parseResponse(std::string_view complete, size_t& len) const { + HttpResponse resp{}; + uint64_t lineNo{}; + uint64_t crlfCounter{}; + uint64_t contentLength{}; + std::string_view partial{complete.data(), len}; + bool badRequest{}; + // ICHOR_LOG_TRACE(_logger, "HttpConnection {} parseResponse {}", getServiceId(), complete); + + split(partial, "\r\n", false, [&](std::string_view line) { + if(badRequest) { + return; + } + + if(lineNo == 0) { + uint64_t wordNo{}; + bool matchedStatus{}; + split(line, " ", false, [&](std::string_view word) { + if(badRequest) { + return; + } + + if(wordNo == 0) { + if(word != ICHOR_HTTP_VERSION_MATCH) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest version mismatch {} != {}", getServiceId(), word, ICHOR_HTTP_VERSION_MATCH); + badRequest = true; + } + } else if(wordNo == 1) { + if(word.empty() || !std::ranges::all_of(word, ::isdigit)) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest version !all_of digits {}", getServiceId(), word); + badRequest = true; + } else { + auto status = FastAtoiu(word.data()); + if(status < 100 || status > 599) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest HTTP status out of range {}", getServiceId(), status); + badRequest = true; + } else { + resp.status = static_cast(status); + matchedStatus = true; + } + } + } + wordNo++; + }); + + if(!matchedStatus) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest did not match status", getServiceId()); + badRequest = true; + } + crlfCounter++; + } else if(!line.empty()) { + uint64_t wordNo{}; + bool matchedValue{}; + bool contentLengthHeader{}; + std::string_view key; + split(line, ": ", false, [&](std::string_view word) { + if(badRequest) { + return; + } + + if(wordNo == 0) { + key = word; + if(key == "Content-Length") { + contentLengthHeader = true; + } + } else if(wordNo == 1) { + resp.headers.emplace(key, word); + matchedValue = true; + if(contentLengthHeader) { + if(!IsOnlyDigits(word)) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest Content-Length not digits {}", getServiceId(), word); + badRequest = true; + } else { + contentLength = FastAtoiu(word); + } + } + } else { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest header split error {}", getServiceId(), line); + badRequest = true; + } + + wordNo++; + }); + + if(!matchedValue) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest header did not match value {}", getServiceId(), line); + badRequest = true; + } + crlfCounter = 1; + } else if(line.empty()) { + crlfCounter++; + } + + lineNo++; + }); + + if(crlfCounter < 2) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} not enough crlf detected {}", getServiceId(), crlfCounter); + badRequest = true; + } + + if(badRequest) { + len = complete.size(); + return tl::unexpected(HttpParseError::BADREQUEST); + } + + if(complete.size() - len != contentLength) { + ICHOR_LOG_TRACE(_logger, "HttpConnection {} BadRequest complete.size() - len != contentLength ({} - {} != {})", getServiceId(), complete.size(), len, contentLength); + badRequest = true; + } else { + std::string content{complete.data() + len, contentLength}; + resp.body.assign(content.begin(), content.end()); + len = complete.size(); + } + + if(badRequest) { + len = complete.size(); + return tl::unexpected(HttpParseError::BADREQUEST); + } + + ICHOR_LOG_TRACE(_logger, "HttpConnection {} parsed {} {}", getServiceId(), static_cast(resp.status), resp.body.size()); + + return resp; +} diff --git a/src/services/network/http/HttpHostService.cpp b/src/services/network/http/HttpHostService.cpp new file mode 100644 index 00000000..79d557ee --- /dev/null +++ b/src/services/network/http/HttpHostService.cpp @@ -0,0 +1,376 @@ +#include +#include +#include +#include +#include + + +template <> +struct fmt::formatter> { + constexpr auto parse(format_parse_context& ctx) { + return ctx.end(); + } + + template + auto format(const Ichor::unordered_set& s, FormatContext& ctx) const { + bool first = true; + fmt::format_to(ctx.out(), "("); + for(const auto& id : s) { + if(first) { + fmt::format_to(ctx.out(), "{}", id); + first = false; + } else { + fmt::format_to(ctx.out(), ", {}", id); + } + } + return fmt::format_to(ctx.out(), ")"); + } +}; + +Ichor::HttpHostService::HttpHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { + reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED, getProperties()); + reg.registerDependency(this, DependencyFlags::ALLOW_MULTIPLE); +} + +Ichor::Task> Ichor::HttpHostService::start() { + auto addrIt = getProperties().find("Address"); + auto portIt = getProperties().find("Port"); + + if(addrIt == getProperties().end()) { + ICHOR_LOG_ERROR(_logger, "Missing address"); + co_return tl::unexpected(StartError::FAILED); + } + if(portIt == getProperties().end()) { + ICHOR_LOG_ERROR(_logger, "Missing port"); + co_return tl::unexpected(StartError::FAILED); + } + + if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) { + _priority = Ichor::any_cast(propIt->second); + } + if(auto propIt = getProperties().find("Debug"); propIt != getProperties().end()) { + _debug = Ichor::any_cast(propIt->second); + } + if(auto propIt = getProperties().find("SendServerHeader"); propIt != getProperties().end()) { + _sendServerHeader = Ichor::any_cast(propIt->second); + } + + co_return {}; +} + +Ichor::Task Ichor::HttpHostService::stop() { + co_return; +} + +void Ichor::HttpHostService::addDependencyInstance(ILogger &logger, IService &) { + _logger = &logger; + ICHOR_LOG_TRACE(_logger, "HttpHost {} got logger", getServiceId()); +} + +void Ichor::HttpHostService::removeDependencyInstance(ILogger &, IService &) { + _logger = nullptr; +} + +void Ichor::HttpHostService::addDependencyInstance(IEventQueue &q, IService &) { + ICHOR_LOG_TRACE(_logger, "HttpHost {} got queue", getServiceId()); + _queue = &q; +} + +void Ichor::HttpHostService::removeDependencyInstance(IEventQueue &, IService &) { + _queue = nullptr; +} + +void Ichor::HttpHostService::addDependencyInstance(IHostService &, IService &s) { + ICHOR_LOG_TRACE(_logger, "HttpHost {} got host service {}", getServiceId(), s.getServiceId()); + _hostServiceIds.emplace(s.getServiceId()); +} + +void Ichor::HttpHostService::removeDependencyInstance(IHostService &, IService &s) { + _hostServiceIds.erase(s.getServiceId()); +} + +void Ichor::HttpHostService::addDependencyInstance(IHostConnectionService &client, IService &s) { + ICHOR_LOG_TRACE(_logger, "HttpHost {} got connection {}", getServiceId(), s.getServiceId()); + if(client.isClient()) { + ICHOR_LOG_TRACE(_logger, "connection {} is not a host connection", s.getServiceId()); + return; + } + + auto TcpHostProp = s.getProperties().find("TcpHostService"); + + if(TcpHostProp == s.getProperties().end() || !_hostServiceIds.contains(Ichor::any_cast(TcpHostProp->second))) { + if(_logger->getLogLevel() == LogLevel::LOG_TRACE) { + ICHOR_LOG_TRACE(_logger, "New connection {}:{} did not match hostServiceId {}", s.getServiceId(), Ichor::any_cast(TcpHostProp->second), _hostServiceIds); + } + return; + } + + client.setReceiveHandler([this, id = s.getServiceId()](std::span buffer) { + std::string_view msg{reinterpret_cast(buffer.data()), buffer.size()}; + auto &string = _connectionBuffers[id]; + string.append(msg.data(), msg.size()); + _queue->pushEvent(getServiceId(), [this, id]() -> AsyncGenerator { + co_await receiveRequestHandler(id); + co_return {}; + }); + }); + + _connections.emplace(s.getServiceId(), &client); +} + +void Ichor::HttpHostService::removeDependencyInstance(IHostConnectionService &, IService &s) { + _connections.erase(s.getServiceId()); + _connectionBuffers.erase(s.getServiceId()); +} + +tl::expected Ichor::HttpHostService::parseRequest(std::string_view complete, size_t& len) const { + HttpRequest req{}; + uint64_t lineNo{}; + uint64_t crlfCounter{}; + uint64_t contentLength{}; + std::string_view partial{complete.data(), len}; + bool badRequest{}; + // ICHOR_LOG_TRACE(_logger, "HttpHostService {} parseRequest {}", getServiceId(), complete); + + split(partial, "\r\n", false, [&](std::string_view line) { + if(badRequest) { + return; + } + + if(lineNo == 0) { + uint64_t wordNo{}; + bool matchedVersion{}; + split(line, " ", false, [&](std::string_view word) { + if(badRequest) { + return; + } + + if(wordNo == 0) { + auto it = ICHOR_METHOD_MATCHING.find(word); + if(it == ICHOR_METHOD_MATCHING.end()) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} couldn't match method {}", getServiceId(), word); + badRequest = true; + } else { + req.method = it->second; + } + } else if(wordNo == 1) { + req.route = word; + } else if(wordNo == 2) { + if(word != ICHOR_HTTP_VERSION_MATCH) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} BadRequest version mismatch {} != {}", getServiceId(), word, ICHOR_HTTP_VERSION_MATCH); + badRequest = true; + } + matchedVersion = true; + } else if(wordNo == 3) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} too many words on line", getServiceId(), line); + badRequest = true; + } + wordNo++; + }); + + if(!matchedVersion) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} BadRequest did not match version", getServiceId()); + badRequest = true; + } + crlfCounter++; + } else if(!line.empty()) { + uint64_t wordNo{}; + bool matchedValue{}; + bool contentLengthHeader{}; + std::string_view key; + split(line, ": ", false, [&](std::string_view word) { + if(badRequest) { + return; + } + + if(wordNo == 0) { + key = word; + if(key == "Content-Length") { + contentLengthHeader = true; + } + } else if(wordNo == 1) { + req.headers.emplace(key, word); + matchedValue = true; + if(contentLengthHeader) { + if(!IsOnlyDigits(word)) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} BadRequest Content-Length not digits {}", getServiceId(), word); + badRequest = true; + } else { + contentLength = FastAtoiu(word); + } + } + } else { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} too many words on line", getServiceId(), line); + badRequest = true; + } + + wordNo++; + }); + + if(!matchedValue) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} BadRequest did not match value", getServiceId()); + badRequest = true; + } + crlfCounter = 1; + } else if(line.empty()) { + crlfCounter++; + } + + lineNo++; + }); + + if(crlfCounter < 2) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} not enough crlf detected {}", getServiceId(), crlfCounter); + badRequest = true; + } + + if(badRequest) { + len = complete.size(); + return tl::unexpected(HttpParseError::BADREQUEST); + } + + if(complete.size() - len != contentLength) { + ICHOR_LOG_TRACE(_logger, "HttpHostService {} BadRequest complete.size() - len != contentLength ({} - {} != {})", getServiceId(), complete.size(), len, contentLength); + badRequest = true; + len = complete.size(); + } else { + std::string content{complete.data() + len, contentLength}; + req.body.assign(content.begin(), content.end()); + len = complete.size(); + } + + if(badRequest) { + len = complete.size(); + return tl::unexpected(HttpParseError::BADREQUEST); + } + + ICHOR_LOG_TRACE(_logger, "HttpHostService {} parsed {} {} {} {}", getServiceId(), ICHOR_REVERSE_METHOD_MATCHING[req.method], req.address, req.route, req.body.size()); + + return req; +} + +Ichor::Task Ichor::HttpHostService::receiveRequestHandler(ServiceIdType id) { + auto &string = _connectionBuffers[id]; + if(string.size() > 1024*1024*512) { + string.clear(); + HttpResponse resp{}; + resp.status = HttpStatus::internal_server_error; + co_await sendResponse(id, resp); + co_return; + } + std::string_view msg = string; + + auto pos = msg.find("\r\n\r\n"); + + while(pos != std::string_view::npos) { + // ICHOR_LOG_TRACE(_logger, "HttpHostService {} received\n{}\n===", getServiceId(), msg); + pos += 4; + auto req = parseRequest(msg, pos); + + HttpResponse resp{}; + + if(!req) { + resp.status = HttpStatus::bad_request; + } else { + auto routes = _handlers.find(req->method); + + if(routes == _handlers.end()) { + resp.status = HttpStatus::not_found; + } else { + for(auto const &[matcher, handler] : routes->second) { + if(matcher->matches(req->route)) { + req->regex_params = matcher->route_params(); + + resp = co_await handler(*req); + break; + } + } + } + } + + co_await sendResponse(id, resp); + + msg = msg.substr(pos); + pos = msg.find("\r\n\r\n"); + } + + if(!msg.empty()) { + string = msg; + } else { + string.clear(); + } + + co_return; +} + +Ichor::Task Ichor::HttpHostService::sendResponse(ServiceIdType id, const HttpResponse &response) { + using namespace std::literals; + + std::vector resp; + resp.reserve(8192); + auto statusText = ICHOR_STATUS_MATCHING.find(response.status); + fmt::format_to(std::back_inserter(resp), "HTTP/1.1 {} {}\r\n", static_cast(response.status), statusText == ICHOR_STATUS_MATCHING.end() ? "Unknown"sv : statusText->second); + for(auto const &[k, v] : response.headers) { + fmt::format_to(std::back_inserter(resp), "{}: {}\r\n", k, v); + } + if(response.contentType) { + fmt::format_to(std::back_inserter(resp), "Content-Type: {}\r\n", *response.contentType); + } + if(!response.body.empty()) { + fmt::format_to(std::back_inserter(resp), "Content-Length: {}\r\n\r\n", response.body.size()); + resp.insert(resp.end(), response.body.begin(), response.body.end()); + } else { + fmt::format_to(std::back_inserter(resp), "\r\n"); + } + + auto client = _connections.find(id); + + if(client == _connections.end()) { + co_return; + } + + co_await client->second->sendAsync(std::move(resp)); + co_return; +} + +Ichor::HttpRouteRegistration Ichor::HttpHostService::addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) { + return addRoute(method, std::make_unique(route), std::move(handler)); +} + +Ichor::HttpRouteRegistration Ichor::HttpHostService::addRoute(HttpMethod method, std::unique_ptr newMatcher, std::function(HttpRequest&)> handler) { + auto routes = _handlers.find(method); + + newMatcher->set_id(_matchersIdCounter); + + if(routes == _handlers.end()) { + unordered_map, std::function(HttpRequest&)>> newSubMap{}; + newSubMap.emplace(std::move(newMatcher), std::move(handler)); + _handlers.emplace(method, std::move(newSubMap)); + } else { + routes->second.emplace(std::move(newMatcher), std::move(handler)); + } + + return {method, _matchersIdCounter++, this}; +} + +void Ichor::HttpHostService::removeRoute(HttpMethod method, RouteIdType id) { + auto routes = _handlers.find(method); + + if(routes == std::end(_handlers)) { + return; + } + + std::erase_if(routes->second, [id](auto const &item) { + return item.first->get_id() == id; + }); +} + +void Ichor::HttpHostService::setPriority(uint64_t priority) { + _priority = priority; +} + +uint64_t Ichor::HttpHostService::getPriority() { + return _priority; +} diff --git a/src/services/network/http/HttpInternal.cpp b/src/services/network/http/HttpInternal.cpp new file mode 100644 index 00000000..c9050b17 --- /dev/null +++ b/src/services/network/http/HttpInternal.cpp @@ -0,0 +1,138 @@ +#include + +namespace Ichor { + unordered_map ICHOR_METHOD_MATCHING { + {"DELETE", HttpMethod::delete_}, + {"GET", HttpMethod::get}, + {"HEAD", HttpMethod::head}, + {"POST", HttpMethod::post}, + {"PUT", HttpMethod::put}, + {"CONNECT", HttpMethod::connect}, + {"OPTIONS", HttpMethod::options}, + {"TRACE", HttpMethod::trace}, + {"COPY", HttpMethod::copy}, + {"LOCK", HttpMethod::lock}, + {"MKCOL", HttpMethod::mkcol}, + {"MOVE", HttpMethod::move}, + {"PROPFIND", HttpMethod::propfind}, + {"PROPPATCH", HttpMethod::proppatch}, + {"SEARCH", HttpMethod::search}, + {"UNLOCK", HttpMethod::unlock}, + {"BIND", HttpMethod::bind}, + {"REBIND", HttpMethod::rebind}, + {"UNBIND", HttpMethod::unbind}, + {"ACL", HttpMethod::acl}, + {"REPORT", HttpMethod::report}, + {"MKACTIVITY", HttpMethod::mkactivity}, + {"CHECKOUT", HttpMethod::checkout}, + {"MERGE", HttpMethod::merge}, + {"MSEARCH", HttpMethod::msearch}, + {"NOTIFY", HttpMethod::notify}, + {"SUBSCRIBE", HttpMethod::subscribe}, + {"UNSUBSCRIBE", HttpMethod::unsubscribe}, + {"PATCH", HttpMethod::patch}, + {"PURGE", HttpMethod::purge}, + {"MKCALENDAR", HttpMethod::mkcalendar}, + {"LINK", HttpMethod::link}, + {"UNLINK", HttpMethod::unlink}, + }; + unordered_map ICHOR_REVERSE_METHOD_MATCHING { + {HttpMethod::delete_, "DELETE"}, + {HttpMethod::get, "GET"}, + {HttpMethod::head, "HEAD"}, + {HttpMethod::post, "POST"}, + {HttpMethod::put, "PUT"}, + {HttpMethod::connect, "CONNECT"}, + {HttpMethod::options, "OPTIONS"}, + {HttpMethod::trace, "TRACE"}, + {HttpMethod::copy, "COPY"}, + {HttpMethod::lock, "LOCK"}, + {HttpMethod::mkcol, "MKCOL"}, + {HttpMethod::move, "MOVE"}, + {HttpMethod::propfind, "PROPFIND"}, + {HttpMethod::proppatch, "PROPPATCH"}, + {HttpMethod::search, "SEARCH"}, + {HttpMethod::unlock, "UNLOCK"}, + {HttpMethod::bind, "BIND"}, + {HttpMethod::rebind, "REBIND"}, + {HttpMethod::unbind, "UNBIND"}, + {HttpMethod::acl, "ACL"}, + {HttpMethod::report, "REPORT"}, + {HttpMethod::mkactivity, "MKACTIVITY"}, + {HttpMethod::checkout, "CHECKOUT"}, + {HttpMethod::merge, "MERGE"}, + {HttpMethod::msearch, "MSEARCH"}, + {HttpMethod::notify, "NOTIFY"}, + {HttpMethod::subscribe, "SUBSCRIBE"}, + {HttpMethod::unsubscribe, "UNSUBSCRIBE"}, + {HttpMethod::patch, "PATCH"}, + {HttpMethod::purge, "PURGE"}, + {HttpMethod::mkcalendar, "MKCALENDAR"}, + {HttpMethod::link, "LINK"}, + {HttpMethod::unlink, "UNLINK"}, + }; + unordered_map ICHOR_STATUS_MATCHING { + {HttpStatus::continue_, "Continue"}, + {HttpStatus::switching_protocols, "Switching Protocols"}, + {HttpStatus::processing, "Processing"}, + {HttpStatus::ok, "OK"}, + {HttpStatus::created, "Created"}, + {HttpStatus::accepted, "Accepted"}, + {HttpStatus::non_authoritative_information, "Non Authoritative Information"}, + {HttpStatus::no_content, "No Content"}, + {HttpStatus::reset_content, "Reset Content"}, + {HttpStatus::partial_content, "Partial Content"}, + {HttpStatus::multi_status, "Multi Status"}, + {HttpStatus::already_reported, "Already Reported"}, + {HttpStatus::im_used, "IM Used"}, + {HttpStatus::multiple_choices, "Multiple Choices"}, + {HttpStatus::moved_permanently, "Moved Permanently"}, + {HttpStatus::found, "Found"}, + {HttpStatus::see_other, "See Other"}, + {HttpStatus::not_modified, "Not Modified"}, + {HttpStatus::use_proxy, "Use Proxy"}, + {HttpStatus::temporary_redirect, "Temporary Redirect"}, + {HttpStatus::permanent_redirect, "Permanent Redirect"}, + {HttpStatus::bad_request, "Bad Request"}, + {HttpStatus::unauthorized, "Unauthorized"}, + {HttpStatus::payment_required, "Payment Required"}, + {HttpStatus::forbidden, "Forbidden"}, + {HttpStatus::not_found, "Not Found"}, + {HttpStatus::method_not_allowed, "Method Not Allowed"}, + {HttpStatus::not_acceptable, "Not Acceptable"}, + {HttpStatus::proxy_authentication_required, "Proxy Authentication Required"}, + {HttpStatus::request_timeout, "Request Timeout"}, + {HttpStatus::conflict, "Conflict"}, + {HttpStatus::gone, "Gone"}, + {HttpStatus::length_required, "Length Required"}, + {HttpStatus::precondition_failed, "Precondition Failed"}, + {HttpStatus::payload_too_large, "Payload Too Large"}, + {HttpStatus::uri_too_long, "URI Too Long"}, + {HttpStatus::unsupported_media_type, "Unsupported Media Type"}, + {HttpStatus::range_not_satisfiable, "Range Not Satisfiable"}, + {HttpStatus::expectation_failed, "Expectation Failed"}, + {HttpStatus::misdirected_request, "Misdirected Request"}, + {HttpStatus::unprocessable_entity, "Unprocessable Entity"}, + {HttpStatus::locked, "Locked"}, + {HttpStatus::failed_dependency, "Failed Dependency"}, + {HttpStatus::upgrade_required, "Upgrade Required"}, + {HttpStatus::precondition_required, "Precondition Required"}, + {HttpStatus::too_many_requests, "Too Many Requests"}, + {HttpStatus::request_header_fields_too_large, "Request Header Fields Too Large"}, + {HttpStatus::connection_closed_without_response, "Connection Closed Without Response"}, + {HttpStatus::unavailable_for_legal_reasons, "Unavailable For Legal Reasons"}, + {HttpStatus::client_closed_request, "Client Closed Request"}, + {HttpStatus::internal_server_error, "Internal Server Error"}, + {HttpStatus::not_implemented, "Not Implemented"}, + {HttpStatus::bad_gateway, "Bad Gateway"}, + {HttpStatus::service_unavailable, "Service Unavailable"}, + {HttpStatus::gateway_timeout, "Gateway Timeout"}, + {HttpStatus::http_version_not_supported, "HTTP Version Not Supported"}, + {HttpStatus::variant_also_negotiates, "Variant Also Negotiates"}, + {HttpStatus::insufficient_storage, "Insufficient Storage"}, + {HttpStatus::loop_detected, "Loop Detected"}, + {HttpStatus::not_extended, "Not Extended"}, + {HttpStatus::network_authentication_required, "Network Authentication Required"}, + {HttpStatus::network_connect_timeout_error, "Network Connect Timeout Error"}, + }; +} diff --git a/src/services/network/tcp/IOUringTcpConnectionService.cpp b/src/services/network/tcp/IOUringTcpConnectionService.cpp index 1e61e31f..c1892c6b 100644 --- a/src/services/network/tcp/IOUringTcpConnectionService.cpp +++ b/src/services/network/tcp/IOUringTcpConnectionService.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include @@ -10,47 +9,49 @@ #include #include -uint64_t Ichor::IOUringTcpConnectionService::tcpConnId{}; - -Ichor::IOUringTcpConnectionService::IOUringTcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _id(tcpConnId++) { - reg.registerDependency(this, DependencyFlags::NONE); +template requires Ichor::DerivedAny +Ichor::IOUringTcpConnectionService::IOUringTcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1) { + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } -Ichor::Task> Ichor::IOUringTcpConnectionService::start() { +template requires Ichor::DerivedAny +Ichor::Task> Ichor::IOUringTcpConnectionService::start() { if(_q->getKernelVersion() < Version{5, 5, 0}) { fmt::println("Kernel version too old to use IOUringTcpConnectionService. Requires >= 5.5.0"); co_return tl::unexpected(StartError::FAILED); } + fmt::println("{}::start() {}", typeName(), AdvancedService::getServiceId()); + auto const &props = AdvancedService::getProperties(); - if(auto propIt = getProperties().find("TimeoutSendUs"); propIt != getProperties().end()) { + if(auto propIt = props.find("TimeoutSendUs"); propIt != props.end()) { _sendTimeout = Ichor::any_cast(propIt->second); } - if(auto propIt = getProperties().find("TimeoutRecvUs"); propIt != getProperties().end()) { + if(auto propIt = props.find("TimeoutRecvUs"); propIt != props.end()) { _recvTimeout = Ichor::any_cast(propIt->second); } - if(auto propIt = getProperties().find("BufferEntries"); propIt != getProperties().end()) { + if(auto propIt = props.find("BufferEntries"); propIt != props.end()) { _bufferEntries = Ichor::any_cast(propIt->second); } - if(auto propIt = getProperties().find("BufferEntrySize"); propIt != getProperties().end()) { + if(auto propIt = props.find("BufferEntrySize"); propIt != props.end()) { _bufferEntrySize = Ichor::any_cast(propIt->second); } - auto socketPropIt = getProperties().find("Socket"); - auto addrIt = getProperties().find("Address"); - auto portIt = getProperties().find("Port"); + auto socketPropIt = props.find("Socket"); + auto addrIt = props.find("Address"); + auto portIt = props.find("Port"); - if(socketPropIt != getProperties().end()) { + if(socketPropIt != props.end()) { _socket = Ichor::any_cast(socketPropIt->second); - ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for existing socket", _id); + ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for existing socket", AdvancedService::getServiceId()); } else { - if(addrIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "[{}] Missing address", _id); + if(addrIt == props.end()) { + ICHOR_LOG_ERROR(_logger, "[{}] Missing address", AdvancedService::getServiceId()); co_return tl::unexpected(StartError::FAILED); } - if(portIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "[{}] Missing port", _id); + if(portIt == props.end()) { + ICHOR_LOG_ERROR(_logger, "[{}] Missing port", AdvancedService::getServiceId()); co_return tl::unexpected(StartError::FAILED); } @@ -68,7 +69,7 @@ Ichor::Task> Ichor::IOUringTcpConnectionSe co_await evt; if (res < 0) { - ICHOR_LOG_ERROR(_logger, "Couldn't open a socket to {}:{}: {}", Ichor::any_cast(addrIt->second), + ICHOR_LOG_ERROR(_logger, "Couldn't open a socket to {}:{}: {}", Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second), mapErrnoToError(-res)); co_return tl::unexpected(StartError::FAILED); } @@ -139,12 +140,12 @@ Ichor::Task> Ichor::IOUringTcpConnectionSe } } - if(socketPropIt == getProperties().end()) { + if(socketPropIt == props.end()) { sockaddr_in address{}; address.sin_family = AF_INET; address.sin_port = htons(Ichor::any_cast(portIt->second)); - int ret = inet_pton(AF_INET, Ichor::any_cast(addrIt->second).c_str(), &address.sin_addr); + int ret = inet_pton(AF_INET, Ichor::any_cast(addrIt->second).c_str(), &address.sin_addr); if(ret == 0) { throw std::runtime_error("inet_pton invalid address for given address family (has to be ipv4-valid address)"); @@ -160,7 +161,7 @@ Ichor::Task> Ichor::IOUringTcpConnectionSe io_uring_prep_connect(sqe, _socket, (struct sockaddr *)&address, sizeof(address)); co_await evt; - ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for {}:{}", _id, Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); + ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for {}:{}", AdvancedService::getServiceId(), Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); } bool armedMultishot{}; @@ -178,7 +179,7 @@ Ichor::Task> Ichor::IOUringTcpConnectionSe } } if(!armedMultishot) { - if(auto propIt = getProperties().find("RecvBufferSize"); propIt != getProperties().end()) { + if(auto propIt = props.find("RecvBufferSize"); propIt != props.end()) { _recvBuf.reserve(Ichor::any_cast(propIt->second)); ICHOR_LOG_WARN(_logger, "_recvBuf size {}", _recvBuf.size()); } else { @@ -193,7 +194,8 @@ Ichor::Task> Ichor::IOUringTcpConnectionSe co_return {}; } -Ichor::Task Ichor::IOUringTcpConnectionService::stop() { +template requires Ichor::DerivedAny +Ichor::Task Ichor::IOUringTcpConnectionService::stop() { _quit = true; INTERNAL_IO_DEBUG("quit"); @@ -251,23 +253,28 @@ Ichor::Task Ichor::IOUringTcpConnectionService::stop() { co_return; } -void Ichor::IOUringTcpConnectionService::addDependencyInstance(ILogger &logger, IService &) noexcept { +template requires Ichor::DerivedAny +void Ichor::IOUringTcpConnectionService::addDependencyInstance(ILogger &logger, IService &) noexcept { _logger = &logger; } -void Ichor::IOUringTcpConnectionService::removeDependencyInstance(ILogger &, IService&) noexcept { +template requires Ichor::DerivedAny +void Ichor::IOUringTcpConnectionService::removeDependencyInstance(ILogger &, IService&) noexcept { _logger = nullptr; } -void Ichor::IOUringTcpConnectionService::addDependencyInstance(IIOUringQueue &q, IService&) noexcept { +template requires Ichor::DerivedAny +void Ichor::IOUringTcpConnectionService::addDependencyInstance(IIOUringQueue &q, IService&) noexcept { _q = &q; } -void Ichor::IOUringTcpConnectionService::removeDependencyInstance(IIOUringQueue&, IService&) noexcept { +template requires Ichor::DerivedAny +void Ichor::IOUringTcpConnectionService::removeDependencyInstance(IIOUringQueue&, IService&) noexcept { _q = nullptr; } -std::function Ichor::IOUringTcpConnectionService::createRecvHandler() noexcept { +template requires Ichor::DerivedAny +std::function Ichor::IOUringTcpConnectionService::createRecvHandler() noexcept { return [this](io_uring_cqe *cqe) { INTERNAL_IO_DEBUG("recv res: {} {}", cqe->res, cqe->res < 0 ? strerror(-cqe->res) : ""); @@ -280,7 +287,7 @@ std::function Ichor::IOUringTcpConnectionService::createRec // TODO: check for -ENOBUFS and if so, create more provided buffers, swap and re-arm if(cqe->res <= 0) { ICHOR_LOG_ERROR(_logger, "recv returned an error {}:{}", cqe->res, strerror(-cqe->res)); - GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); + GetThreadLocalEventQueue().pushEvent(AdvancedService::getServiceId(), AdvancedService::getServiceId(), true); _quitEvt.set(); return; } @@ -315,17 +322,18 @@ std::function Ichor::IOUringTcpConnectionService::createRec }; } -Ichor::Task> Ichor::IOUringTcpConnectionService::sendAsync(std::vector &&msg) { +template requires Ichor::DerivedAny +Ichor::Task> Ichor::IOUringTcpConnectionService::sendAsync(std::vector &&msg) { size_t sent_bytes = 0; if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", AdvancedService::getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } while(sent_bytes < msg.size()) { if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", AdvancedService::getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } @@ -351,14 +359,15 @@ Ichor::Task> Ichor::IOUringTcpConnectionServi co_return {}; } -Ichor::Task> Ichor::IOUringTcpConnectionService::sendAsync(std::vector> &&msgs) { +template requires Ichor::DerivedAny +Ichor::Task> Ichor::IOUringTcpConnectionService::sendAsync(std::vector> &&msgs) { if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", AdvancedService::getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", AdvancedService::getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } @@ -412,19 +421,23 @@ Ichor::Task> Ichor::IOUringTcpConnectionServi co_return {}; } -void Ichor::IOUringTcpConnectionService::setPriority(uint64_t priority) { +template requires Ichor::DerivedAny +void Ichor::IOUringTcpConnectionService::setPriority(uint64_t priority) { } -uint64_t Ichor::IOUringTcpConnectionService::getPriority() { +template requires Ichor::DerivedAny +uint64_t Ichor::IOUringTcpConnectionService::getPriority() { return 0; } -bool Ichor::IOUringTcpConnectionService::isClient() const noexcept { - return getProperties().find("Socket") == getProperties().end(); +template requires Ichor::DerivedAny +bool Ichor::IOUringTcpConnectionService::isClient() const noexcept { + return AdvancedService::getProperties().find("Socket") == AdvancedService::getProperties().end(); } -void Ichor::IOUringTcpConnectionService::setReceiveHandler(std::function)> recvHandler) { +template requires Ichor::DerivedAny +void Ichor::IOUringTcpConnectionService::setReceiveHandler(std::function)> recvHandler) { _recvHandler = recvHandler; for(auto &msg : _queuedMessages) { @@ -432,3 +445,7 @@ void Ichor::IOUringTcpConnectionService::setReceiveHandler(std::function; +template class Ichor::IOUringTcpConnectionService; +template class Ichor::IOUringTcpConnectionService; diff --git a/src/services/network/tcp/IOUringTcpHostService.cpp b/src/services/network/tcp/IOUringTcpHostService.cpp index 315bbc5e..44c94332 100644 --- a/src/services/network/tcp/IOUringTcpHostService.cpp +++ b/src/services/network/tcp/IOUringTcpHostService.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -11,7 +12,7 @@ #include Ichor::IOUringTcpHostService::IOUringTcpHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _bindFd(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } @@ -250,17 +251,19 @@ std::function Ichor::IOUringTcpHostService::createAcceptHa } Properties props{}; + props.reserve(7); props.emplace("Priority", Ichor::make_any(_priority)); props.emplace("Socket", Ichor::make_any(cqe->res)); props.emplace("TimeoutSendUs", Ichor::make_any(_sendTimeout)); props.emplace("TimeoutRecvUs", Ichor::make_any(_recvTimeout)); + props.emplace("TcpHostService", Ichor::make_any(getServiceId())); if(_bufferEntries) { props.emplace("BufferEntries", Ichor::make_any(*_bufferEntries)); } if(_bufferEntrySize) { props.emplace("BufferEntrySize", Ichor::make_any(*_bufferEntrySize)); } - _connections.emplace_back(GetThreadLocalManager().template createServiceManager(std::move(props))->getServiceId()); + _connections.emplace_back(GetThreadLocalManager().template createServiceManager, IConnectionService, IHostConnectionService>(std::move(props))->getServiceId()); if(_q->getKernelVersion() < Version{5, 19, 0}) { auto *sqe = _q->getSqeWithData(this, createAcceptHandler()); diff --git a/src/services/network/tcp/TcpConnectionService.cpp b/src/services/network/tcp/TcpConnectionService.cpp index 2c2b20ef..fd9db8bf 100644 --- a/src/services/network/tcp/TcpConnectionService.cpp +++ b/src/services/network/tcp/TcpConnectionService.cpp @@ -12,21 +12,23 @@ #include #include -Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { - reg.registerDependency(this, DependencyFlags::NONE); +template requires Ichor::DerivedAny +Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService>(std::move(props)), _socket(-1), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } -Ichor::Task> Ichor::TcpConnectionService::start() { - if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) { +template requires Ichor::DerivedAny +Ichor::Task> Ichor::TcpConnectionService::start() { + if(auto propIt = AdvancedService::getProperties().find("Priority"); propIt != AdvancedService::getProperties().end()) { _priority = Ichor::any_cast(propIt->second); } - if(auto propIt = getProperties().find("TimeoutSendUs"); propIt != getProperties().end()) { + if(auto propIt = AdvancedService::getProperties().find("TimeoutSendUs"); propIt != AdvancedService::getProperties().end()) { _sendTimeout = Ichor::any_cast(propIt->second); } - if(getProperties().contains("Socket")) { - if(auto propIt = getProperties().find("Socket"); propIt != getProperties().end()) { + if(AdvancedService::getProperties().contains("Socket")) { + if(auto propIt = AdvancedService::getProperties().find("Socket"); propIt != AdvancedService::getProperties().end()) { _socket = Ichor::any_cast(propIt->second); } @@ -37,20 +39,20 @@ Ichor::Task> Ichor::TcpConnectionService:: timeout.tv_usec = _sendTimeout; setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); - ICHOR_LOG_DEBUG(_logger, "[{}] Starting TCP connection for existing socket", getServiceId()); + ICHOR_LOG_DEBUG(_logger, "[{}] Starting TCP connection for existing socket", AdvancedService::getServiceId()); } else { - auto addrIt = getProperties().find("Address"); - auto portIt = getProperties().find("Port"); + auto addrIt = AdvancedService::getProperties().find("Address"); + auto portIt = AdvancedService::getProperties().find("Port"); - if(addrIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "[{}] Missing address", getServiceId()); + if(addrIt == AdvancedService::getProperties().end()) { + ICHOR_LOG_ERROR(_logger, "[{}] Missing address", AdvancedService::getServiceId()); co_return tl::unexpected(StartError::FAILED); } - if(portIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "[{}] Missing port", getServiceId()); + if(portIt == AdvancedService::getProperties().end()) { + ICHOR_LOG_ERROR(_logger, "[{}] Missing port", AdvancedService::getServiceId()); co_return tl::unexpected(StartError::FAILED); } - ICHOR_LOG_TRACE(_logger, "[{}] connecting to {}:{}", getServiceId(), Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); + ICHOR_LOG_TRACE(_logger, "[{}] connecting to {}:{}", AdvancedService::getServiceId(), Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); // The start function possibly gets called multiple times due to trying to recover from not being able to connect if(_socket == -1) { @@ -83,7 +85,7 @@ Ichor::Task> Ichor::TcpConnectionService:: if(connected) { break; } - ICHOR_LOG_TRACE(_logger, "[{}] connect error {}", getServiceId(), errno); + ICHOR_LOG_TRACE(_logger, "[{}] connect error {}", AdvancedService::getServiceId(), errno); if(errno == EINPROGRESS) { // this is from when the socket was marked as nonblocking, don't think this is necessary anymore. pollfd pfd{}; @@ -92,7 +94,7 @@ Ichor::Task> Ichor::TcpConnectionService:: ret = poll(&pfd, 1, static_cast(_sendTimeout/1'000)); if(ret < 0) { - ICHOR_LOG_ERROR(_logger, "[{}] poll error {}", getServiceId(), errno); + ICHOR_LOG_ERROR(_logger, "[{}] poll error {}", AdvancedService::getServiceId(), errno); continue; } @@ -102,9 +104,9 @@ Ichor::Task> Ichor::TcpConnectionService:: } if(pfd.revents & POLLERR) { - ICHOR_LOG_ERROR(_logger, "[{}] POLLERR {}", getServiceId(), pfd.revents); + ICHOR_LOG_ERROR(_logger, "[{}] POLLERR {}", AdvancedService::getServiceId(), pfd.revents); } else if(pfd.revents & POLLHUP) { - ICHOR_LOG_ERROR(_logger, "[{}] POLLHUP {}", getServiceId(), pfd.revents); + ICHOR_LOG_ERROR(_logger, "[{}] POLLHUP {}", AdvancedService::getServiceId(), pfd.revents); } else if(pfd.revents & POLLOUT) { int connect_result{}; socklen_t result_len = sizeof(connect_result); @@ -116,7 +118,7 @@ Ichor::Task> Ichor::TcpConnectionService:: // connect failed, retry if(connect_result < 0) { - ICHOR_LOG_ERROR(_logger, "[{}] POLLOUT {} {}", getServiceId(), pfd.revents, connect_result); + ICHOR_LOG_ERROR(_logger, "[{}] POLLOUT {} {}", AdvancedService::getServiceId(), pfd.revents, connect_result); break; } connected = true; @@ -135,11 +137,11 @@ Ichor::Task> Ichor::TcpConnectionService:: auto *ip = ::inet_ntoa(address.sin_addr); if(!connected) { - ICHOR_LOG_ERROR(_logger, "[{}] Couldn't start TCP connection for {}:{}", getServiceId(), ip, ::ntohs(address.sin_port)); - GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); + ICHOR_LOG_ERROR(_logger, "[{}] Couldn't start TCP connection for {}:{}", AdvancedService::getServiceId(), ip, ::ntohs(address.sin_port)); + GetThreadLocalEventQueue().pushEvent(AdvancedService::getServiceId(), AdvancedService::getServiceId(), true); co_return tl::unexpected(StartError::FAILED); } - ICHOR_LOG_DEBUG(_logger, "[{}] Starting TCP connection for {}:{}", getServiceId(), ip, ::ntohs(address.sin_port)); + ICHOR_LOG_DEBUG(_logger, "[{}] Starting TCP connection for {}:{}", AdvancedService::getServiceId(), ip, ::ntohs(address.sin_port)); } _timer = &_timerFactory->createTimer(); @@ -153,9 +155,10 @@ Ichor::Task> Ichor::TcpConnectionService:: co_return {}; } -Ichor::Task Ichor::TcpConnectionService::stop() { +template requires Ichor::DerivedAny +Ichor::Task Ichor::TcpConnectionService::stop() { _quit = true; - ICHOR_LOG_INFO(_logger, "[{}] stopping service", getServiceId()); + ICHOR_LOG_INFO(_logger, "[{}] stopping service", AdvancedService::getServiceId()); if(_socket >= 0) { ::shutdown(_socket, SHUT_RDWR); @@ -165,33 +168,38 @@ Ichor::Task Ichor::TcpConnectionService::stop() { co_return; } -void Ichor::TcpConnectionService::addDependencyInstance(ILogger &logger, IService &) { +template requires Ichor::DerivedAny +void Ichor::TcpConnectionService::addDependencyInstance(ILogger &logger, IService &) { _logger = &logger; } -void Ichor::TcpConnectionService::removeDependencyInstance(ILogger &, IService&) { +template requires Ichor::DerivedAny +void Ichor::TcpConnectionService::removeDependencyInstance(ILogger &, IService&) { _logger = nullptr; } -void Ichor::TcpConnectionService::addDependencyInstance(ITimerFactory &timerFactory, IService &) { +template requires Ichor::DerivedAny +void Ichor::TcpConnectionService::addDependencyInstance(ITimerFactory &timerFactory, IService &) { _timerFactory = &timerFactory; } -void Ichor::TcpConnectionService::removeDependencyInstance(ITimerFactory &, IService&) { +template requires Ichor::DerivedAny +void Ichor::TcpConnectionService::removeDependencyInstance(ITimerFactory &, IService&) { _timerFactory = nullptr; } -Ichor::Task> Ichor::TcpConnectionService::sendAsync(std::vector &&msg) { +template requires Ichor::DerivedAny +Ichor::Task> Ichor::TcpConnectionService::sendAsync(std::vector &&msg) { size_t sent_bytes = 0; if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", getServiceId()); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", AdvancedService::getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } while(sent_bytes < msg.size()) { auto ret = ::send(_socket, msg.data() + sent_bytes, msg.size() - sent_bytes, MSG_NOSIGNAL); - ICHOR_LOG_TRACE(_logger, "[{}] queued sending {} bytes, errno = {}", getServiceId(), ret, errno); + ICHOR_LOG_TRACE(_logger, "[{}] queued sending {} bytes, errno = {}", AdvancedService::getServiceId(), ret, errno); if(ret < 0) { co_return tl::unexpected(IOError::FAILED); @@ -203,9 +211,10 @@ Ichor::Task> Ichor::TcpConnectionService::sen co_return {}; } -Ichor::Task> Ichor::TcpConnectionService::sendAsync(std::vector> &&msgs) { +template requires Ichor::DerivedAny +Ichor::Task> Ichor::TcpConnectionService::sendAsync(std::vector> &&msgs) { if(_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", getServiceId()); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", AdvancedService::getServiceId()); co_return tl::unexpected(IOError::SERVICE_QUITTING); } @@ -214,7 +223,7 @@ Ichor::Task> Ichor::TcpConnectionService::sen while(sent_bytes < msg.size()) { auto ret = ::send(_socket, msg.data() + sent_bytes, msg.size() - sent_bytes, 0); - ICHOR_LOG_TRACE(_logger, "[{}] queued sending {} bytes", getServiceId(), ret); + ICHOR_LOG_TRACE(_logger, "[{}] queued sending {} bytes", AdvancedService::getServiceId(), ret); if(ret < 0) { co_return tl::unexpected(IOError::FAILED); @@ -227,19 +236,23 @@ Ichor::Task> Ichor::TcpConnectionService::sen co_return {}; } -void Ichor::TcpConnectionService::setPriority(uint64_t priority) { +template requires Ichor::DerivedAny +void Ichor::TcpConnectionService::setPriority(uint64_t priority) { _priority = priority; } -uint64_t Ichor::TcpConnectionService::getPriority() { +template requires Ichor::DerivedAny +uint64_t Ichor::TcpConnectionService::getPriority() { return _priority; } -bool Ichor::TcpConnectionService::isClient() const noexcept { - return getProperties().find("Socket") == getProperties().end(); +template requires Ichor::DerivedAny +bool Ichor::TcpConnectionService::isClient() const noexcept { + return AdvancedService::getProperties().find("Socket") == AdvancedService::getProperties().end(); } -void Ichor::TcpConnectionService::setReceiveHandler(std::function)> recvHandler) { +template requires Ichor::DerivedAny +void Ichor::TcpConnectionService::setReceiveHandler(std::function)> recvHandler) { _recvHandler = recvHandler; for(auto &msg : _queuedMessages) { @@ -248,18 +261,19 @@ void Ichor::TcpConnectionService::setReceiveHandler(std::function requires Ichor::DerivedAny +void Ichor::TcpConnectionService::recvHandler() { ScopeGuard sg{[this]() { if(!_quit) { if(!_timer->startTimer()) { - GetThreadLocalEventQueue().pushEvent(getServiceId(), [this]() { + GetThreadLocalEventQueue().pushEvent(AdvancedService::getServiceId(), [this]() { if(!_timer->startTimer()) { std::terminate(); } }); } } else { - ICHOR_LOG_TRACE(_logger, "[{}] quitting, no push", getServiceId()); + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no push", AdvancedService::getServiceId()); } }}; std::vector msg{}; @@ -274,10 +288,10 @@ void Ichor::TcpConnectionService::recvHandler() { } } while (ret > 0 && !_quit); } - ICHOR_LOG_TRACE(_logger, "[{}] last received {} bytes, msg size = {}, errno = {}", getServiceId(), ret, msg.size(), errno); + ICHOR_LOG_TRACE(_logger, "[{}] last received {} bytes, msg size = {}, errno = {}", AdvancedService::getServiceId(), ret, msg.size(), errno); if (_quit) { - ICHOR_LOG_TRACE(_logger, "[{}] quitting", getServiceId()); + ICHOR_LOG_TRACE(_logger, "[{}] quitting", AdvancedService::getServiceId()); return; } @@ -291,8 +305,8 @@ void Ichor::TcpConnectionService::recvHandler() { if(ret == 0) { // closed connection - ICHOR_LOG_INFO(_logger, "[{}] peer closed connection", getServiceId()); - GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); + ICHOR_LOG_INFO(_logger, "[{}] peer closed connection", AdvancedService::getServiceId()); + GetThreadLocalEventQueue().pushEvent(AdvancedService::getServiceId(), AdvancedService::getServiceId(), true); return; } @@ -300,10 +314,14 @@ void Ichor::TcpConnectionService::recvHandler() { if(errno == EAGAIN) { return; } - ICHOR_LOG_ERROR(_logger, "[{}] Error receiving from socket: {}", getServiceId(), errno); - GetThreadLocalEventQueue().pushEvent(getServiceId(), getServiceId(), true); + ICHOR_LOG_ERROR(_logger, "[{}] Error receiving from socket: {}", AdvancedService::getServiceId(), errno); + GetThreadLocalEventQueue().pushEvent(AdvancedService::getServiceId(), AdvancedService::getServiceId(), true); return; } } +template class Ichor::TcpConnectionService; +template class Ichor::TcpConnectionService; +template class Ichor::TcpConnectionService; + #endif diff --git a/src/services/network/tcp/TcpHostService.cpp b/src/services/network/tcp/TcpHostService.cpp index ff3444a6..f0d8e9a6 100644 --- a/src/services/network/tcp/TcpHostService.cpp +++ b/src/services/network/tcp/TcpHostService.cpp @@ -14,7 +14,7 @@ #include Ichor::TcpHostService::TcpHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _bindFd(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { - reg.registerDependency(this, DependencyFlags::NONE); + reg.registerDependency(this, DependencyFlags::REQUIRED); reg.registerDependency(this, DependencyFlags::REQUIRED); } @@ -122,10 +122,12 @@ uint64_t Ichor::TcpHostService::getPriority() { Ichor::AsyncGenerator Ichor::TcpHostService::handleEvent(NewSocketEvent const &evt) { Properties props{}; + props.reserve(4); props.emplace("Priority", Ichor::make_any(_priority)); props.emplace("Socket", Ichor::make_any(evt.socket)); props.emplace("TimeoutSendUs", Ichor::make_any(_sendTimeout)); - _connections.emplace_back(GetThreadLocalManager().template createServiceManager(std::move(props))->getServiceId()); + props.emplace("TcpHostService", Ichor::make_any(getServiceId())); + _connections.emplace_back(GetThreadLocalManager().template createServiceManager, IConnectionService, IHostConnectionService>(std::move(props))->getServiceId()); co_return {}; } diff --git a/test/AsyncManualResetEventTests.cpp b/test/AsyncManualResetEventTests.cpp index 1d83ec6f..0785f4b1 100644 --- a/test/AsyncManualResetEventTests.cpp +++ b/test/AsyncManualResetEventTests.cpp @@ -3,20 +3,17 @@ TEST_CASE("AsyncManualResetEventTests") { - SECTION("default constructor initially not set") - { + SECTION("default constructor initially not set") { Ichor::AsyncManualResetEvent event; CHECK(!event.is_set()); } - SECTION("construct event initially set") - { + SECTION("construct event initially set") { Ichor::AsyncManualResetEvent event{true}; CHECK(event.is_set()); } - SECTION("set and reset") - { + SECTION("set and reset") { Ichor::AsyncManualResetEvent event; CHECK(!event.is_set()); event.set(); @@ -30,4 +27,4 @@ TEST_CASE("AsyncManualResetEventTests") { event.set(); CHECK(event.is_set()); } -} \ No newline at end of file +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c6b0acff..92152a2b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -15,11 +15,14 @@ foreach(filename ${PROJECT_TEST_SOURCES}) continue() endif() - add_executable(${testname} ${filename}) - target_link_libraries(${testname} ${CMAKE_THREAD_LIBS_INIT}) - target_link_libraries(${testname} ichor) - target_link_libraries(${testname} Catch2::Catch2WithMain) - target_compile_definitions(${testname} PUBLIC CATCH_CONFIG_FAST_COMPILE) + + if(NOT ${testname} STREQUAL "HttpTests") + add_executable(${testname} ${filename}) + target_link_libraries(${testname} ${CMAKE_THREAD_LIBS_INIT}) + target_link_libraries(${testname} ichor) + target_link_libraries(${testname} Catch2::Catch2WithMain) + target_compile_definitions(${testname} PUBLIC CATCH_CONFIG_FAST_COMPILE) + endif() # We want to build them, but not run/discover them. if(ICHOR_SKIP_EXTERNAL_TESTS AND (${testname} STREQUAL "EtcdTests" OR ${testname} STREQUAL "RedisTests")) @@ -29,7 +32,7 @@ foreach(filename ${PROJECT_TEST_SOURCES}) target_compile_definitions(${testname} PUBLIC ICHOR_SKIP_EXTERNAL_TESTS) endif() if(ICHOR_USE_LIBURING AND NOT (ICHOR_SKIP_EXTERNAL_TESTS AND ICHOR_AARCH64)) - if(${testname} STREQUAL "TcpTests" OR ${testname} STREQUAL "AsyncFileIOTests" OR ${testname} STREQUAL "ServicesTests" OR ${testname} STREQUAL "CoroutineTests") + if(${testname} STREQUAL "TcpTests" OR ${testname} STREQUAL "AsyncFileIOTests" OR ${testname} STREQUAL "ServicesTests" OR ${testname} STREQUAL "CoroutineTests" OR ${testname} STREQUAL "HttpTests") add_executable(${testname}_uring ${filename}) target_link_libraries(${testname}_uring ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(${testname}_uring ichor) @@ -48,6 +51,16 @@ foreach(filename ${PROJECT_TEST_SOURCES}) catch_discover_tests(${testname}_sdevent) endif() endif() + if(ICHOR_USE_BOOST_BEAST AND NOT (ICHOR_SKIP_EXTERNAL_TESTS AND ICHOR_AARCH64)) + if(${testname} STREQUAL "HttpTests") + add_executable(${testname}_boost ${filename}) + target_link_libraries(${testname}_boost ${CMAKE_THREAD_LIBS_INIT}) + target_link_libraries(${testname}_boost ichor) + target_link_libraries(${testname}_boost Catch2::Catch2WithMain) + target_compile_definitions(${testname}_boost PUBLIC CATCH_CONFIG_FAST_COMPILE TEST_BOOST) + catch_discover_tests(${testname}_boost) + endif() + endif() if(${testname} STREQUAL "ServicesTests" OR ${testname} STREQUAL "CoroutineTests") add_executable(${testname}_ordered ${filename}) target_link_libraries(${testname}_ordered ${CMAKE_THREAD_LIBS_INIT}) @@ -57,5 +70,7 @@ foreach(filename ${PROJECT_TEST_SOURCES}) catch_discover_tests(${testname}_ordered) endif() - catch_discover_tests(${testname}) + if(NOT ${testname} STREQUAL "HttpTests") + catch_discover_tests(${testname}) + endif() endforeach() diff --git a/test/HttpTests.cpp b/test/HttpTests.cpp index f701be42..f3035e00 100644 --- a/test/HttpTests.cpp +++ b/test/HttpTests.cpp @@ -1,12 +1,7 @@ -#ifdef ICHOR_USE_BOOST_BEAST - #include #include #include #include -#include -#include -#include #include #include #include @@ -16,31 +11,98 @@ #include "../examples/common/TestMsgGlazeSerializer.h" #include "serialization/RegexJsonMsgSerializer.h" +#ifdef TEST_BOOST +#include +#include +#include +#include + + +#define QIMPL PriorityQueue +#define HTTPHOSTIMPL Boost::HttpHostService +#define HTTPCONNIMPL Boost::HttpConnectionService +#elif defined(TEST_URING) +#include +#include +#include +#include +#include +#include +#include + +using namespace std::string_literals; + +#define QIMPL IOUringQueue +#define CONNIMPL IOUringTcpConnectionService +#define HOSTIMPL IOUringTcpHostService +#define HTTPHOSTIMPL HttpHostService +#define HTTPCONNIMPL HttpConnectionService +#else +#error "no uring/boost" +#endif + using namespace Ichor; std::unique_ptr _evt; std::thread::id testThreadId; std::thread::id dmThreadId; std::atomic evtGate; +#if defined(TEST_URING) +tl::optional emulateKernelVersion; + +TEST_CASE("HttpTests_uring") { + + auto version = Ichor::kernelVersion(); + + REQUIRE(version); + if(version < Version{5, 18, 0}) { + return; + } + + auto gen_i = GENERATE(1, 2); + + if(gen_i == 2) { + emulateKernelVersion = Version{5, 18, 0}; + fmt::println("emulating kernel version {}", *emulateKernelVersion); + } else { + fmt::println("kernel version {}", *version); + } +#else +TEST_CASE("HttpTests_boost") { +#endif -TEST_CASE("HttpTests") { SECTION("Http events on same thread") { testThreadId = std::this_thread::get_id(); _evt = std::make_unique(); - auto queue = std::make_unique(true); +#if defined(TEST_URING) + auto queue = std::make_unique(500, 100'000'000, emulateKernelVersion); +#else + auto queue = std::make_unique(500, true); +#endif auto &dm = queue->createManager(); evtGate = false; std::thread t([&]() { +#ifdef TEST_URING + REQUIRE(queue->createEventLoop()); +#endif dmThreadId = std::this_thread::get_id(); dm.createServiceManager({}, 10); - dm.createServiceManager, ILoggerFactory>(); + dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}); dm.createServiceManager>(); dm.createServiceManager>(); +#ifdef TEST_BOOST dm.createServiceManager(); - dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}}); - dm.createServiceManager, IClientFactory>(); +#else + +#endif + dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}}); + dm.createServiceManager, IClientFactory>(); +#ifdef TEST_URING + dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}); + dm.createServiceManager, IClientConnectionService>, IClientFactory>(); +#endif dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}}); queue->start(CaptureSigInt); @@ -61,6 +123,7 @@ TEST_CASE("HttpTests") { t.join(); } +#ifdef TEST_BOOST SECTION("Https events on same thread") { testThreadId = std::this_thread::get_id(); _evt = std::make_unique(); @@ -220,16 +283,5 @@ TEST_CASE("HttpTests") { t.join(); } -} - -#else - -#include "Common.h" - -TEST_CASE("HttpTests") { - SECTION("Empty Test so that Catch2 exits with 0") { - REQUIRE(true); - } -} - #endif +} diff --git a/test/StlTests.cpp b/test/StlTests.cpp index a976faed..46ccb136 100644 --- a/test/StlTests.cpp +++ b/test/StlTests.cpp @@ -439,7 +439,12 @@ TEST_CASE("STL Tests") { REQUIRE(Ichor::FastAtoiu("0") == 0); REQUIRE(Ichor::FastAtoiu("u10") == 0); REQUIRE(Ichor::FastAtoiu("10u") == 10); + REQUIRE(Ichor::FastAtoiu("10"sv) == 10); + REQUIRE(Ichor::FastAtoiu("0"sv) == 0); + REQUIRE(Ichor::FastAtoiu("u10"sv) == 0); + REQUIRE(Ichor::FastAtoiu("10u"sv) == 10); REQUIRE(Ichor::FastAtoiu(std::to_string(std::numeric_limits::max()).c_str()) == std::numeric_limits::max()); + REQUIRE(Ichor::FastAtoiu(std::to_string(std::numeric_limits::max())) == std::numeric_limits::max()); REQUIRE(Ichor::FastAtoi("10") == 10); REQUIRE(Ichor::FastAtoi("0") == 0); REQUIRE(Ichor::FastAtoi("u10") == 0); diff --git a/test/TcpTests.cpp b/test/TcpTests.cpp index a2b25458..52241fa3 100644 --- a/test/TcpTests.cpp +++ b/test/TcpTests.cpp @@ -77,7 +77,7 @@ TEST_CASE("TcpTests") { dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef TEST_URING dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif @@ -155,7 +155,7 @@ TEST_CASE("TcpTests") { dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_DEBUG)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_DEBUG)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}, {"BufferEntries", Ichor::make_any(static_cast(16))}, {"BufferEntrySize", Ichor::make_any(static_cast(16'384))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef TEST_URING dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif @@ -231,7 +231,7 @@ TEST_CASE("TcpTests") { dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef TEST_URING dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif @@ -318,7 +318,7 @@ TEST_CASE("TcpTests") { dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef TEST_URING dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif @@ -394,7 +394,7 @@ TEST_CASE("TcpTests") { dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}, {"BufferEntries", Ichor::make_any(static_cast(512))}, {"BufferEntrySize", Ichor::make_any(static_cast(32))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef TEST_URING dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif @@ -477,7 +477,7 @@ TEST_CASE("TcpTests") { dm.createServiceManager(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}, priorityToEnsureHostStartingFirst); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager>, IClientFactory>(); #ifndef TEST_URING dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif diff --git a/test/TestServices/HttpThreadService.h b/test/TestServices/HttpThreadService.h index 1e083134..dd8bccbf 100644 --- a/test/TestServices/HttpThreadService.h +++ b/test/TestServices/HttpThreadService.h @@ -82,7 +82,8 @@ class HttpThreadService final : public AdvancedService { } void addDependencyInstance(IHttpHostService &svc, IService&) { - _routeRegistration = svc.addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> AsyncGenerator { + _routeRegistration = svc.addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> Task { + fmt::println("/test POST"); if(dmThreadId != std::this_thread::get_id()) { throw std::runtime_error("dmThreadId id incorrect"); } @@ -104,7 +105,8 @@ class HttpThreadService final : public AdvancedService { co_return HttpResponse{HttpStatus::ok, "application/json", _testSerializer->serialize(TestMsg{11, "hello"}), {}}; }); - _regexRouteRegistration = svc.addRoute(HttpMethod::get, std::make_unique>(), [this](HttpRequest &req) -> AsyncGenerator { + _regexRouteRegistration = svc.addRoute(HttpMethod::get, std::make_unique>(), [this](HttpRequest &req) -> Task { + fmt::println("/regex_test POST"); if(dmThreadId != std::this_thread::get_id()) { throw std::runtime_error("dmThreadId id incorrect"); } @@ -136,10 +138,14 @@ class HttpThreadService final : public AdvancedService { } if(response.status != HttpStatus::ok) { + fmt::println("test status not ok {}", static_cast(response.status)); throw std::runtime_error("test status not ok"); } auto msg = _testSerializer->deserialize(response.body); + if(!msg) { + std::terminate(); + } fmt::print("Received TestMsg {}:{}\n", msg->id, msg->val); GetThreadLocalEventQueue().pushEvent(getServiceId(), [this]() -> AsyncGenerator { @@ -167,7 +173,7 @@ class HttpThreadService final : public AdvancedService { } if(response.status != HttpStatus::ok) { - throw std::runtime_error(fmt::format("regex1 status not ok {}", (int)response.status).c_str()); + throw std::runtime_error(fmt::format("regex1 status not ok {}", (int)response.status)); } auto msg = _regexSerializer->deserialize(response.body); @@ -191,7 +197,7 @@ class HttpThreadService final : public AdvancedService { } if(response.status != HttpStatus::ok) { - throw std::runtime_error(fmt::format("regex2 status not ok {}", (int)response.status).c_str()); + throw std::runtime_error(fmt::format("regex2 status not ok {}", (int)response.status)); } msg = _regexSerializer->deserialize(response.body); @@ -215,7 +221,7 @@ class HttpThreadService final : public AdvancedService { } if(response.status != HttpStatus::ok) { - throw std::runtime_error(fmt::format("regex3 status not ok {}", (int)response.status).c_str()); + throw std::runtime_error(fmt::format("regex3 status not ok {}", (int)response.status)); } msg = _regexSerializer->deserialize(response.body); @@ -242,7 +248,7 @@ class HttpThreadService final : public AdvancedService { } if(response.status != HttpStatus::ok) { - throw std::runtime_error(fmt::format("regex4 status not ok {}", (int)response.status).c_str()); + throw std::runtime_error(fmt::format("regex4 status not ok {}", (int)response.status)); } msg = _regexSerializer->deserialize(response.body);