-
Notifications
You must be signed in to change notification settings - Fork 1
/
ws_coder.go
130 lines (114 loc) · 3.87 KB
/
ws_coder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package openairt
import (
"context"
"io"
"net/http"
"github.com/coder/websocket"
)
// CoderWebSocketOptions is the options for CoderWebSocketConn.
type CoderWebSocketOptions struct {
// ReadLimit is the maximum size of a message in bytes. -1 means no limit. Default is -1.
ReadLimit int64
// DialOptions is the options to pass to the websocket.Dial function.
DialOptions *websocket.DialOptions
}
// CoderWebSocketDialer is a WebSocket dialer implementation based on coder/websocket.
type CoderWebSocketDialer struct {
options CoderWebSocketOptions
}
// NewCoderWebSocketDialer creates a new CoderWebSocketDialer.
func NewCoderWebSocketDialer(
options CoderWebSocketOptions,
) *CoderWebSocketDialer {
// set default read limit
if options.ReadLimit <= 0 {
options.ReadLimit = -1
}
return &CoderWebSocketDialer{
options: options,
}
}
// Dial establishes a new WebSocket connection to the given URL.
func (d *CoderWebSocketDialer) Dial(ctx context.Context, url string, header http.Header) (WebSocketConn, error) {
mergedHeader := http.Header{}
for k, v := range header {
mergedHeader[k] = append(mergedHeader[k], v...)
}
if d.options.DialOptions == nil {
d.options.DialOptions = &websocket.DialOptions{
HTTPHeader: mergedHeader,
}
} else {
for k, v := range d.options.DialOptions.HTTPHeader {
mergedHeader[k] = append(mergedHeader[k], v...)
}
d.options.DialOptions.HTTPHeader = mergedHeader
}
conn, resp, err := websocket.Dial(ctx, url, d.options.DialOptions)
if resp != nil && resp.Body != nil {
// The resp.Body is no longer needed after the dial succeeds.
// When dial fails, the resp.Body contains the original body of the response,
// which we don't need now.
_ = resp.Body.Close()
}
if err != nil {
return nil, err
}
conn.SetReadLimit(d.options.ReadLimit)
return &CoderWebSocketConn{conn: conn, options: d.options, resp: resp}, nil
}
// CoderWebSocketConn is a WebSocket connection implementation based on coder/websocket.
type CoderWebSocketConn struct {
conn *websocket.Conn
resp *http.Response
options CoderWebSocketOptions
}
// ReadMessage reads a message from the WebSocket connection.
//
// The ctx could be used to cancel the read operation. If the ctx is canceled or timedout,
// the read operation will be canceled and the connection will be closed.
//
// If the returned error is Permanent, the future read operations on the same connection will not succeed.
func (c *CoderWebSocketConn) ReadMessage(ctx context.Context) (MessageType, []byte, error) {
messageType, r, err := c.conn.Reader(ctx)
if err != nil {
return 0, nil, Permanent(err)
}
data, err := io.ReadAll(r)
if err != nil {
return 0, nil, Permanent(err)
}
switch messageType {
case websocket.MessageText:
return MessageText, data, nil
case websocket.MessageBinary:
return MessageBinary, data, nil
default:
return 0, nil, ErrUnsupportedMessageType
}
}
// WriteMessage writes a message to the WebSocket connection.
//
// The ctx could be used to cancel the write operation. If the ctx is canceled or timedout,
// the write operation will be canceled and the connection will be closed.
//
// If the returned error is Permanent, the future write operations on the same connection will not succeed.
func (c *CoderWebSocketConn) WriteMessage(ctx context.Context, messageType MessageType, data []byte) error {
switch messageType {
case MessageText:
return Permanent(c.conn.Write(ctx, websocket.MessageText, data))
case MessageBinary:
return Permanent(c.conn.Write(ctx, websocket.MessageBinary, data))
default:
return ErrUnsupportedMessageType
}
}
// Close closes the WebSocket connection.
func (c *CoderWebSocketConn) Close() error {
return c.conn.Close(websocket.StatusNormalClosure, "")
}
// Response returns the *http.Response of the WebSocket connection.
// Commonly used to get response headers.
func (c *CoderWebSocketConn) Response() *http.Response {
return c.resp
}