Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZhizhin committed Nov 13, 2024
1 parent 3f82afb commit 6999756
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 123 deletions.
32 changes: 31 additions & 1 deletion x/configurl/wait_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package configurl

import (
"context"
"fmt"
"net/url"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/x/wait_stream"
Expand All @@ -27,6 +30,33 @@ func registerWaitStreamDialer(r TypeRegistry[transport.StreamDialer], typeID str
if err != nil {
return nil, err
}
return wait_stream.NewStreamDialer(sd)

queryUrlParameters, err := url.ParseQuery(config.URL.Opaque)
if err != nil {
return nil, fmt.Errorf("waitstream: failed to parse URL parameters: %w", err)
}

resultStreamDialer, err := wait_stream.NewStreamDialer(sd)
if err != nil {
return nil, err
}

if queryUrlParameters.Has("timeout") {
timeout, err := time.ParseDuration(queryUrlParameters.Get("timeout"))
if err != nil {
return nil, fmt.Errorf("waitstream: failed to parse timeout parameter: %w", err)
}
resultStreamDialer.SetWaitingTimeout(timeout)
}

if queryUrlParameters.Has("delay") {
delay, err := time.ParseDuration(queryUrlParameters.Get("delay"))
if err != nil {
return nil, fmt.Errorf("waitstream: failed to parse delay parameter: %w", err)
}
resultStreamDialer.SetWaitingDelay(delay)
}

return resultStreamDialer, err
})
}
39 changes: 0 additions & 39 deletions x/sockopt/is_sending_bytes_linux.go

This file was deleted.

53 changes: 1 addition & 52 deletions x/sockopt/sockopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,11 @@ import (
"fmt"
"net"
"net/netip"
"time"

"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

type HasWaitUntilBytesAreSent interface {
// Wait until all bytes are sent to the socket.
// Returns ErrUnsupported if the platform doesn't support it.
// May return a different error.
WaitUntilBytesAreSent() error
// Checks if the OS supports waiting until the bytes are sent
OsSupportsWaitingUntilBytesAreSent() bool
}

// HasHopLimit enables manipulation of the hop limit option.
type HasHopLimit interface {
// HopLimit returns the hop limit field value for outgoing packets.
Expand All @@ -60,51 +50,15 @@ var _ HasHopLimit = (*hopLimitOption)(nil)

// TCPOptions represents options for TCP connections.
type TCPOptions interface {
HasWaitUntilBytesAreSent
HasHopLimit
}

type tcpOptions struct {
hopLimitOption

conn *net.TCPConn

// Timeout after which we return an error
waitingTimeout time.Duration
// Delay between checking the socket
waitingDelay time.Duration
}

var _ TCPOptions = (*tcpOptions)(nil)

func (o *tcpOptions) SetWaitingTimeout(timeout time.Duration) {
o.waitingTimeout = timeout
}

func (o *tcpOptions) SetWaitingDelay(delay time.Duration) {
o.waitingDelay = delay
}

func (o *tcpOptions) OsSupportsWaitingUntilBytesAreSent() bool {
return isConnectionSendingBytesImplemented()
}

func (o *tcpOptions) WaitUntilBytesAreSent() error {
startTime := time.Now()
for time.Since(startTime) < o.waitingTimeout {
isSendingBytes, err := isConnectionSendingBytes(o.conn)
if err != nil {
return err
}
if !isSendingBytes {
return nil
}

time.Sleep(o.waitingDelay)
}
return fmt.Errorf("waiting for socket to send all bytes: timeout exceeded")
}

// newHopLimit creates a hopLimitOption from a [net.Conn]. Works for both TCP or UDP.
func newHopLimit(conn net.Conn) (*hopLimitOption, error) {
addr, err := netip.ParseAddrPort(conn.LocalAddr().String())
Expand Down Expand Up @@ -133,10 +87,5 @@ func NewTCPOptions(conn *net.TCPConn) (TCPOptions, error) {
if err != nil {
return nil, err
}
return &tcpOptions{
hopLimitOption: *hopLimit,
conn: conn,
waitingTimeout: 10 * time.Millisecond,
waitingDelay: 100 * time.Microsecond,
}, nil
return &tcpOptions{hopLimitOption: *hopLimit}, nil
}
15 changes: 15 additions & 0 deletions x/wait_stream/is_sending_bytes_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//go:build linux

package wait_stream

import (
"golang.org/x/sys/unix"
)

func isSocketFdSendingBytes(fd int) (bool, error) {
tcpInfo, err := unix.GetsockoptTCPInfo(fd, unix.IPPROTO_TCP, unix.TCP_INFO)
if err != nil {
return false, err
}
return tcpInfo.Notsent_bytes != 0, nil
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
//go:build !linux

package sockopt
package wait_stream

import (
"errors"
"fmt"
"net"
)

func isConnectionSendingBytesImplemented() bool {
return false
}

func isConnectionSendingBytes(_ *net.TCPConn) (bool, error) {
func isSocketFdSendingBytes(_ int) (bool, error) {
return false, fmt.Errorf("%w: checking if socket is sending bytes is not implemented on this platform", errors.ErrUnsupported)
}
42 changes: 30 additions & 12 deletions x/wait_stream/stream_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,48 @@ import (
"context"
"errors"
"net"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/x/sockopt"
)

type waitStreamDialer struct {
type WaitStreamDialer struct {
dialer transport.StreamDialer

// Stop waiting on a packet after this timeout
waitingTimeout time.Duration
// Check if socket is sending bytes that often
waitingDelay time.Duration
}

var _ transport.StreamDialer = (*waitStreamDialer)(nil)
var _ transport.StreamDialer = (*WaitStreamDialer)(nil)

// byeDPI uses a default delay of 500ms with 1ms sleep
// We might reconsider the defaults later, if needed.
// https://github.com/hufrea/byedpi/blob/main/desync.c#L90
var defaultTimeout = time.Millisecond * 10
var defaultDelay = time.Microsecond * 1

func NewStreamDialer(dialer transport.StreamDialer) (transport.StreamDialer, error) {
func NewStreamDialer(dialer transport.StreamDialer) (*WaitStreamDialer, error) {
if dialer == nil {
return nil, errors.New("argument dialer must not be nil")
}
return &waitStreamDialer{dialer: dialer}, nil
return &WaitStreamDialer{
dialer: dialer,
waitingTimeout: defaultTimeout,
waitingDelay: defaultDelay,
}, nil
}

func (d *WaitStreamDialer) SetWaitingTimeout(timeout time.Duration) {
d.waitingTimeout = timeout
}

func (d *WaitStreamDialer) SetWaitingDelay(timeout time.Duration) {
d.waitingDelay = timeout
}

func (d *waitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) {
func (d *WaitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) {
innerConn, err := d.dialer.DialStream(ctx, remoteAddr)
if err != nil {
return nil, err
Expand All @@ -47,12 +70,7 @@ func (d *waitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (t
return nil, errors.New("wait_stream strategy: expected base dialer to return TCPConn")
}

tcpOptions, err := sockopt.NewTCPOptions(tcpInnerConn)
if err != nil {
return nil, err
}

dw := NewWriter(innerConn, tcpOptions)
dw := NewWriter(tcpInnerConn, d.waitingTimeout, d.waitingDelay)

return transport.WrapConn(innerConn, innerConn, dw), nil
}
54 changes: 42 additions & 12 deletions x/wait_stream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,62 @@ import (
"errors"
"fmt"
"io"

"github.com/Jigsaw-Code/outline-sdk/x/sockopt"
"net"
"time"
)

type waitStreamWriter struct {
conn io.Writer
tcpOptions sockopt.TCPOptions
conn *net.TCPConn

waitingTimeout time.Duration
waitingDelay time.Duration
}

var _ io.Writer = (*waitStreamWriter)(nil)

func NewWriter(conn io.Writer, tcpOptions sockopt.TCPOptions) io.Writer {
func NewWriter(conn *net.TCPConn, waitingTimeout time.Duration, waitingDelay time.Duration) io.Writer {
return &waitStreamWriter{
conn: conn,
tcpOptions: tcpOptions,
conn: conn,
waitingTimeout: waitingTimeout,
waitingDelay: waitingDelay,
}
}

func (w *waitStreamWriter) Write(data []byte) (written int, err error) {
written, err = w.conn.Write(data)
func isConnectionSendingBytes(conn *net.TCPConn) (result bool, err error) {
syscallConn, err := conn.SyscallConn()
if err != nil {
return false, err
}
syscallConn.Control(func(fd uintptr) {
result, err = isSocketFdSendingBytes(int(fd))
})
return
}

func waitUntilBytesAreSent(conn *net.TCPConn, waitingTimeout time.Duration, waitingDelay time.Duration) error {
startTime := time.Now()
for time.Since(startTime) < waitingTimeout {
isSendingBytes, err := isConnectionSendingBytes(conn)
if err != nil {
return err
}
if !isSendingBytes {
return nil
}

time.Sleep(waitingDelay)
}
// not sure about the right behaviour here: fail or give up waiting?
// giving up feels safer, and matches byeDPI behavior
return nil
}

func (w *waitStreamWriter) Write(data []byte) (written int, err error) {
// This may not be implemented, so it's best effort really.
waitUntilBytesAreSentErr := w.tcpOptions.WaitUntilBytesAreSent()
waitUntilBytesAreSentErr := waitUntilBytesAreSent(w.conn, w.waitingTimeout, w.waitingDelay)
if waitUntilBytesAreSentErr != nil && !errors.Is(waitUntilBytesAreSentErr, errors.ErrUnsupported) {
return written, fmt.Errorf("error when waiting for stream to send all bytes: %w", waitUntilBytesAreSentErr)
return 0, fmt.Errorf("error when waiting for stream to send all bytes: %w", waitUntilBytesAreSentErr)
}

return
return w.conn.Write(data)
}

0 comments on commit 6999756

Please sign in to comment.