Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Sep 4, 2024
1 parent 6889b81 commit c1df132
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 56 deletions.
9 changes: 6 additions & 3 deletions src/HTTP2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function Base.show(io::IO, r::Request)
print_request(io, r)
end
# backwards compat
header(r::Request, key::String) = getheader(r.headers, key)
const header = getheader

struct StreamMetrics
send_start_timestamp_ns::Int64
Expand Down Expand Up @@ -157,7 +157,8 @@ mutable struct Client
keep_alive_interval_sec,
keep_alive_timeout_sec,
keep_alive_max_failed_probes,
keepalive
keepalive,
ntuple(x -> Cchar(0), 16) # network_interface_name
))
# tls options
host_str = String(host)
Expand Down Expand Up @@ -192,7 +193,9 @@ mutable struct Client
C_NULL, # shutdown_complete_user_data::Ptr{Cvoid}
C_NULL, # shutdown_complete_callback::Ptr{aws_http_connection_manager_shutdown_complete_fn}
false, # enable_read_back_pressure::Bool
max_connection_idle_in_milliseconds
max_connection_idle_in_milliseconds,
C_NULL, # network_interface_names_array
0, # num_network_interface_names
))
connection_manager = aws_http_connection_manager_new(allocator, conn_manager_opts)
connection_manager == C_NULL && aws_throw_error()
Expand Down
2 changes: 2 additions & 0 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ function c_on_response_body(stream, data::Ptr{aws_byte_cursor}, ctx_ptr)
# common response body manual type unrolling here
if body isa IOBuffer
if !hasroom(body, bc.len)
@error "response body buffer is too small 1"
ctx.error = CapturedException(ArgumentError("response body buffer is too small"), Base.backtrace())
ctx.should_retry = false
return Cint(0)
end
unsafe_write(body, bc.ptr, bc.len)
elseif body isa Base.GenericIOBuffer{SubArray{UInt8, 1, Vector{UInt8}, Tuple{UnitRange{Int64}}, true}}
if !hasroom(body, bc.len)
@error "response body buffer is too small 2"
ctx.error = CapturedException(ArgumentError("response body buffer is too small"), Base.backtrace())
ctx.should_retry = false
return Cint(0)
Expand Down
127 changes: 74 additions & 53 deletions src/server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,52 @@ socket_endpoint(host, port) = aws_socket_endpoint(
port % UInt32
)

mutable struct Connection{F}
f::F
allocator::Ptr{aws_allocator}
server::Any # Server
connection::Ptr{aws_http_connection}
connection_options::aws_http_server_connection_options
mutable struct Connection{F, S}
const f::F
const server::S # Server
const allocator::Ptr{aws_allocator}
const connection::Ptr{aws_http_connection}
request_handler_options::aws_http_request_handler_options
current_request::Request
current_response::Ptr{aws_http_message}
Connection{F}() where {F} = new{F}()
connection_options::aws_http_server_connection_options
current_request::Request

Connection{F, S}(
f::F,
server::S,
allocator::Ptr{aws_allocator},
connection::Ptr{aws_http_connection},
) where {F, S} = new{F, S}(f, server, allocator, connection)
end

Base.hash(c::Connection, h::UInt) = hash(c.connection, h)

mutable struct Server{F}
f::F
const f::F
const comm::Channel{Symbol}
const allocator::Ptr{aws_allocator}
const endpoint::aws_socket_endpoint
const socket_options::aws_socket_options
const tls_options::Union{aws_tls_connection_options, Nothing}
const connections_lock::ReentrantLock
const connections::Set{Connection}
const closed::Threads.Event
@atomic state::Symbol # :initializing, :running, :closed
comm::Channel{Symbol}
allocator::Ptr{aws_allocator}
endpoint::aws_socket_endpoint
socket_options::aws_socket_options
tls_options::Union{aws_tls_connection_options, Nothing}
server_options::aws_http_server_options
server::Ptr{aws_http_server}
connections_lock::ReentrantLock
connections::Set{Connection}
closed::Threads.Event
Server{F}() where {F} = new{F}()
server_options::aws_http_server_options

Server{F}(
f::F,
comm::Channel{Symbol},
allocator::Ptr{aws_allocator},
endpoint::aws_socket_endpoint,
socket_options::aws_socket_options,
tls_options::Union{aws_tls_connection_options, Nothing},
connections_lock::ReentrantLock,
connections::Set{Connection},
closed::Threads.Event,
state::Symbol,
) where {F} = new{F}(f, comm, allocator, endpoint, socket_options, tls_options, connections_lock, connections, closed, state)
end

Base.wait(s::Server) = wait(s.closed)
Expand Down Expand Up @@ -59,31 +77,35 @@ function serve!(f, host="127.0.0.1", port=8080;
ssl_alpn_list="h2;http/1.1",
initial_window_size=typemax(UInt64),
)
server = Server{typeof(f)}()
server.f = f
@atomic server.state = :initializing
server.closed = Threads.Event()
server.comm = Channel{Symbol}(1)
server.allocator = allocator
server.endpoint = endpoint !== nothing ? endpoint : socket_endpoint(host, port)
server.socket_options = socket_options !== nothing ? socket_options : aws_socket_options(
AWS_SOCKET_STREAM, # socket type
socket_domain == :ipv4 ? AWS_SOCKET_IPV4 : AWS_SOCKET_IPV6, # socket domain
connect_timeout_ms,
keep_alive_interval_sec,
keep_alive_timeout_sec,
keep_alive_max_failed_probes,
keepalive
server = Server{typeof(f)}(
f, # RequestHandler
Channel{Symbol}(1), # comm
allocator,
endpoint !== nothing ? endpoint : socket_endpoint(host, port),
socket_options !== nothing ? socket_options : aws_socket_options(
AWS_SOCKET_STREAM, # socket type
socket_domain == :ipv4 ? AWS_SOCKET_IPV4 : AWS_SOCKET_IPV6, # socket domain
connect_timeout_ms,
keep_alive_interval_sec,
keep_alive_timeout_sec,
keep_alive_max_failed_probes,
keepalive
),
tls_options !== nothing ? tls_options :
any(x -> x !== nothing, (ssl_cert, ssl_key, ssl_capath, ssl_cacert)) ? LibAwsIO.tlsoptions(host;
ssl_cert,
ssl_key,
ssl_capath,
ssl_cacert,
ssl_insecure,
ssl_alpn_list
) : nothing,
ReentrantLock(), # connections_lock
Set{Connection}(), # connections
Threads.Event(), # closed
:initializing, # state
C_NULL # server
)
server.tls_options = tls_options !== nothing ? tls_options :
any(x -> x !== nothing, (ssl_cert, ssl_key, ssl_capath, ssl_cacert)) ? LibAwsIO.tlsoptions(host;
ssl_cert,
ssl_key,
ssl_capath,
ssl_cacert,
ssl_insecure,
ssl_alpn_list
) : nothing
server.server_options = aws_http_server_options(
1,
allocator,
Expand All @@ -97,8 +119,6 @@ function serve!(f, host="127.0.0.1", port=8080;
on_destroy_complete[],
false # manual_window_management
)
server.connections_lock = ReentrantLock()
server.connections = Set{Connection}()
server.server = aws_http_server_new(FieldRef(server, :server_options))
@assert server.server != C_NULL "failed to create server"
@atomic server.state = :running
Expand All @@ -113,11 +133,12 @@ function c_on_incoming_connection(aws_server, aws_conn, error_code, server_ptr)
@error "incoming connection error" exception=(aws_error(), Base.backtrace())
return
end
conn = Connection{ftype(server)}()
conn.f = server.f
conn.allocator = server.allocator
conn.server = server
conn.connection = aws_conn
conn = Connection(
server.f,
server,
server.allocator,
aws_conn,
)
conn.connection_options = aws_http_server_connection_options(
1,
pointer_from_objref(conn),
Expand Down Expand Up @@ -169,10 +190,10 @@ end

const on_request_headers = Ref{Ptr{Cvoid}}(C_NULL)

function c_on_request_headers(stream, header_block, header_array, num_headers, conn_ptr)
function c_on_request_headers(stream, header_block, header_array::Ptr{aws_http_header}, num_headers, conn_ptr)
conn = unsafe_pointer_to_objref(conn_ptr)
headers = unsafe_wrap(Array, Ptr{aws_http_header}(header_array), num_headers)
for header in headers
for i = 1:num_headers
header = unsafe_load(header_array, i)
name = unsafe_string(header.name.ptr, header.name.len)
value = unsafe_string(header.value.ptr, header.value.len)
push!(conn.current_request.headers, name => value)
Expand Down

0 comments on commit c1df132

Please sign in to comment.