Skip to content

Commit

Permalink
Minor correction after core review.
Browse files Browse the repository at this point in the history
Additional test simplification.
Now it needs only two connections,
where 1st fails asap
and 2nd fails with delay - giving connection reaper time to meet deadlock
  • Loading branch information
egorse committed Jan 22, 2024
1 parent 9fcf8e4 commit 66d39ee
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 44 deletions.
66 changes: 22 additions & 44 deletions reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ package zmq4

import (
"context"
"fmt"
"io"
"net"
"runtime"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -37,11 +36,11 @@ func TestConnReaperDeadlock2(t *testing.T) {
id := "client-x"
srv.sck.mu.Lock()
rmw := srv.sck.w.(*routerMWriter)
for i := 0; i < 10; i++ {
for i := 0; i < 2; i++ {
w := &Conn{}
w.Peer.Meta = make(Metadata)
w.Peer.Meta[sysSockID] = id
w.rw = &sockSendEof{}
w.rw = &sockSendEOF{}
w.onCloseErrorCB = srv.sck.scheduleRmConn
// Do not to call srv.addConn as we dont want to have listener on this fake socket
rmw.addConn(w)
Expand All @@ -57,68 +56,47 @@ func TestConnReaperDeadlock2(t *testing.T) {
}
}

type sockSendEof struct {
type sockSendEOF struct {
}

func (r *sockSendEof) Write(b []byte) (n int, err error) {
runtime.Gosched()
time.Sleep(1 * time.Second)
var a atomic.Int32

func (r *sockSendEOF) Write(b []byte) (n int, err error) {
// Each odd write fails asap.
// Each even write fails after sleep.
// Such a way we ensure the short write failure
// will cause socket be assinged to connection reaper
// while srv.Send is still in progress due to long writes.
if x := a.Add(1); x&1 == 0 {
time.Sleep(1 * time.Second)
}
return 0, io.EOF
}

func (r *sockSendEof) Read(b []byte) (int, error) {
func (r *sockSendEOF) Read(b []byte) (int, error) {
return 0, nil
}

func (r *sockSendEof) Close() error {
func (r *sockSendEOF) Close() error {
return nil
}

func (r *sockSendEof) LocalAddr() net.Addr {
func (r *sockSendEOF) LocalAddr() net.Addr {
return nil
}

func (r *sockSendEof) RemoteAddr() net.Addr {
func (r *sockSendEOF) RemoteAddr() net.Addr {
return nil
}

func (r *sockSendEof) SetDeadline(t time.Time) error {
func (r *sockSendEOF) SetDeadline(t time.Time) error {
return nil
}

func (r *sockSendEof) SetReadDeadline(t time.Time) error {
func (r *sockSendEOF) SetReadDeadline(t time.Time) error {
return nil
}

func (r *sockSendEof) SetWriteDeadline(t time.Time) error {
func (r *sockSendEOF) SetWriteDeadline(t time.Time) error {
return nil
}

func must(str string, err error) string {
if err != nil {
panic(err)
}
return str
}

func EndPoint(transport string) (string, error) {
switch transport {
case "tcp":
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return "", err
}
defer l.Close()
return fmt.Sprintf("tcp://%s", l.Addr()), nil
case "ipc":
return "ipc://tmp-" + newUUID(), nil
case "inproc":
return "inproc://tmp-" + newUUID(), nil
default:
panic("invalid transport: [" + transport + "]")
}
}
31 changes: 31 additions & 0 deletions zall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,41 @@
package zmq4

import (
"fmt"
"io"
"log"
"net"
)

var (
Devnull = log.New(io.Discard, "zmq4: ", 0)
)

func must(str string, err error) string {
if err != nil {
panic(err)
}
return str
}

func EndPoint(transport string) (string, error) {
switch transport {
case "tcp":
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return "", err
}
defer l.Close()
return fmt.Sprintf("tcp://%s", l.Addr()), nil
case "ipc":
return "ipc://tmp-" + newUUID(), nil
case "inproc":
return "inproc://tmp-" + newUUID(), nil
default:
panic("invalid transport: [" + transport + "]")
}
}

0 comments on commit 66d39ee

Please sign in to comment.