From 66d39ee5d9b571b14ca92a3f078d0e7a1eb4dbeb Mon Sep 17 00:00:00 2001 From: egorse Date: Mon, 22 Jan 2024 20:08:06 +0200 Subject: [PATCH] Minor correction after core review. Additional test simplification. Now it needs only two connections, where 1st fails asap and 2nd fails with delay - giving connection reaper time to meet deadlock --- reaper_test.go | 66 +++++++++++++++++--------------------------------- zall_test.go | 31 ++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/reaper_test.go b/reaper_test.go index 83e21fc..4b756c5 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -6,10 +6,9 @@ package zmq4 import ( "context" - "fmt" "io" "net" - "runtime" + "sync/atomic" "testing" "time" ) @@ -37,11 +36,11 @@ func TestConnReaperDeadlock2(t *testing.T) { id := "client-x" srv.sck.mu.Lock() rmw := srv.sck.w.(*routerMWriter) - for i := 0; i < 10; i++ { + for i := 0; i < 2; i++ { w := &Conn{} w.Peer.Meta = make(Metadata) w.Peer.Meta[sysSockID] = id - w.rw = &sockSendEof{} + w.rw = &sockSendEOF{} w.onCloseErrorCB = srv.sck.scheduleRmConn // Do not to call srv.addConn as we dont want to have listener on this fake socket rmw.addConn(w) @@ -57,68 +56,47 @@ func TestConnReaperDeadlock2(t *testing.T) { } } -type sockSendEof struct { +type sockSendEOF struct { } -func (r *sockSendEof) Write(b []byte) (n int, err error) { - runtime.Gosched() - time.Sleep(1 * time.Second) +var a atomic.Int32 + +func (r *sockSendEOF) Write(b []byte) (n int, err error) { + // Each odd write fails asap. + // Each even write fails after sleep. + // Such a way we ensure the short write failure + // will cause socket be assinged to connection reaper + // while srv.Send is still in progress due to long writes. + if x := a.Add(1); x&1 == 0 { + time.Sleep(1 * time.Second) + } return 0, io.EOF } -func (r *sockSendEof) Read(b []byte) (int, error) { +func (r *sockSendEOF) Read(b []byte) (int, error) { return 0, nil } -func (r *sockSendEof) Close() error { +func (r *sockSendEOF) Close() error { return nil } -func (r *sockSendEof) LocalAddr() net.Addr { +func (r *sockSendEOF) LocalAddr() net.Addr { return nil } -func (r *sockSendEof) RemoteAddr() net.Addr { +func (r *sockSendEOF) RemoteAddr() net.Addr { return nil } -func (r *sockSendEof) SetDeadline(t time.Time) error { +func (r *sockSendEOF) SetDeadline(t time.Time) error { return nil } -func (r *sockSendEof) SetReadDeadline(t time.Time) error { +func (r *sockSendEOF) SetReadDeadline(t time.Time) error { return nil } -func (r *sockSendEof) SetWriteDeadline(t time.Time) error { +func (r *sockSendEOF) SetWriteDeadline(t time.Time) error { return nil } - -func must(str string, err error) string { - if err != nil { - panic(err) - } - return str -} - -func EndPoint(transport string) (string, error) { - switch transport { - case "tcp": - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return "", err - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return "", err - } - defer l.Close() - return fmt.Sprintf("tcp://%s", l.Addr()), nil - case "ipc": - return "ipc://tmp-" + newUUID(), nil - case "inproc": - return "inproc://tmp-" + newUUID(), nil - default: - panic("invalid transport: [" + transport + "]") - } -} diff --git a/zall_test.go b/zall_test.go index c3cd079..90fc514 100644 --- a/zall_test.go +++ b/zall_test.go @@ -5,10 +5,41 @@ package zmq4 import ( + "fmt" "io" "log" + "net" ) var ( Devnull = log.New(io.Discard, "zmq4: ", 0) ) + +func must(str string, err error) string { + if err != nil { + panic(err) + } + return str +} + +func EndPoint(transport string) (string, error) { + switch transport { + case "tcp": + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return "", err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return "", err + } + defer l.Close() + return fmt.Sprintf("tcp://%s", l.Addr()), nil + case "ipc": + return "ipc://tmp-" + newUUID(), nil + case "inproc": + return "inproc://tmp-" + newUUID(), nil + default: + panic("invalid transport: [" + transport + "]") + } +}