From af83b370f36fc5aa200a93e5cd6d6650000286c3 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Mon, 11 Sep 2023 16:01:37 +0300 Subject: [PATCH 01/18] Added first implementation of BaseModule. --- pkg/modules/baseModule.go | 52 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 pkg/modules/baseModule.go diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go new file mode 100644 index 0000000..f4a528a --- /dev/null +++ b/pkg/modules/baseModule.go @@ -0,0 +1,52 @@ +package modules + +import ( + "context" + "github.com/dipdup-net/indexer-sdk/pkg/sync" + "github.com/pkg/errors" +) + +var _ Module = &BaseModule{} + +type BaseModule struct { + inputs sync.Map[string, *Input] + outputs sync.Map[string, *Output] +} + +func (*BaseModule) Name() string { + return "base_module" +} + +func (*BaseModule) Start(ctx context.Context) { +} + +func (*BaseModule) Close() error { + // TODO: close all inputs + return nil +} + +func (m *BaseModule) Input(name string) (*Input, error) { + input, ok := m.inputs.Get(name) + if !ok { + return nil, errors.Wrap(ErrUnknownInput, name) + } + return input, nil +} + +func (m *BaseModule) Output(name string) (*Output, error) { + output, ok := m.outputs.Get(name) + if !ok { + return nil, errors.Wrap(ErrUnknownOutput, name) + } + return output, nil +} + +func (m *BaseModule) AttachTo(outputName string, input *Input) error { + output, err := m.Output(outputName) + if err != nil { + return err + } + + output.Attach(input) + return nil +} From 987a4864a17f9917b64a4b7649dd1b735d27e706 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Mon, 11 Sep 2023 19:55:02 +0300 Subject: [PATCH 02/18] Added unit tests for Input / Output implementations in BaseModule. --- pkg/modules/baseModule.go | 18 +++++------ pkg/modules/baseModule_test.go | 59 ++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 9 deletions(-) create mode 100644 pkg/modules/baseModule_test.go diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go index f4a528a..cf3ae34 100644 --- a/pkg/modules/baseModule.go +++ b/pkg/modules/baseModule.go @@ -9,24 +9,24 @@ import ( var _ Module = &BaseModule{} type BaseModule struct { - inputs sync.Map[string, *Input] - outputs sync.Map[string, *Output] + Inputs sync.Map[string, *Input] + Outputs sync.Map[string, *Output] } func (*BaseModule) Name() string { return "base_module" } -func (*BaseModule) Start(ctx context.Context) { -} +func (*BaseModule) Start(ctx context.Context) {} -func (*BaseModule) Close() error { - // TODO: close all inputs - return nil +func (m *BaseModule) Close() error { + return m.Inputs.Range(func(name string, input *Input) (error, bool) { + return input.Close(), false + }) } func (m *BaseModule) Input(name string) (*Input, error) { - input, ok := m.inputs.Get(name) + input, ok := m.Inputs.Get(name) if !ok { return nil, errors.Wrap(ErrUnknownInput, name) } @@ -34,7 +34,7 @@ func (m *BaseModule) Input(name string) (*Input, error) { } func (m *BaseModule) Output(name string) (*Output, error) { - output, ok := m.outputs.Get(name) + output, ok := m.Outputs.Get(name) if !ok { return nil, errors.Wrap(ErrUnknownOutput, name) } diff --git a/pkg/modules/baseModule_test.go b/pkg/modules/baseModule_test.go new file mode 100644 index 0000000..668fd8a --- /dev/null +++ b/pkg/modules/baseModule_test.go @@ -0,0 +1,59 @@ +package modules + +import ( + "github.com/dipdup-net/indexer-sdk/pkg/sync" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBaseModule_ExistingInput(t *testing.T) { + bm := &BaseModule{ + Inputs: sync.NewMap[string, *Input](), + } + existingChannelName := "input-channel" + bm.Inputs.Set(existingChannelName, NewInput(existingChannelName)) + + // Act + input, err := bm.Input(existingChannelName) + assert.NoError(t, err) + assert.Equal(t, existingChannelName, input.Name()) +} + +func TestBaseModule_NonExistingInput(t *testing.T) { + bm := &BaseModule{ + Inputs: sync.NewMap[string, *Input](), + } + nonExistingChannelName := "non-existing-input-channel" + + // Act + input, err := bm.Input(nonExistingChannelName) + assert.ErrorIs(t, err, ErrUnknownInput) + assert.Errorf(t, err, "%s: %s", ErrUnknownInput.Error(), nonExistingChannelName) + assert.Nil(t, input) +} + +func TestBaseModule_ExistingOutput(t *testing.T) { + bm := &BaseModule{ + Outputs: sync.NewMap[string, *Output](), + } + existingChannelName := "output-channel" + bm.Outputs.Set(existingChannelName, NewOutput(existingChannelName)) + + // Act + output, err := bm.Output(existingChannelName) + assert.NoError(t, err) + assert.Equal(t, existingChannelName, output.Name()) +} + +func TestBaseModule_NonExistingOutput(t *testing.T) { + bm := &BaseModule{ + Outputs: sync.NewMap[string, *Output](), + } + nonExistingChannelName := "non-existing-output-channel" + + // Act + output, err := bm.Output(nonExistingChannelName) + assert.ErrorIs(t, err, ErrUnknownOutput) + assert.Errorf(t, err, "%s: %s", ErrUnknownOutput.Error(), nonExistingChannelName) + assert.Nil(t, output) +} From 45f22211017b24811208e7ef9b8be427596a5199 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Mon, 11 Sep 2023 20:24:07 +0300 Subject: [PATCH 03/18] Added test on AttachTo and Close. --- pkg/modules/baseModule.go | 42 ++++++++++++++++++++++++++++++---- pkg/modules/baseModule_test.go | 32 ++++++++++++++++++++++++++ pkg/modules/printer/module.go | 2 +- pkg/sync/map.go | 16 ++++++------- 4 files changed, 79 insertions(+), 13 deletions(-) diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go index cf3ae34..2362671 100644 --- a/pkg/modules/baseModule.go +++ b/pkg/modules/baseModule.go @@ -9,8 +9,8 @@ import ( var _ Module = &BaseModule{} type BaseModule struct { - Inputs sync.Map[string, *Input] - Outputs sync.Map[string, *Output] + Inputs *sync.Map[string, *Input] + Outputs *sync.Map[string, *Output] } func (*BaseModule) Name() string { @@ -19,8 +19,12 @@ func (*BaseModule) Name() string { func (*BaseModule) Start(ctx context.Context) {} -func (m *BaseModule) Close() error { - return m.Inputs.Range(func(name string, input *Input) (error, bool) { +func (m *BaseModule) Close() error { // TODO-DISCUSS + if m.Inputs == nil { + return nil + } + + return m.Inputs.Range(func(name string, input *Input) (error, bool) { // TODO-DISCUSS return input.Close(), false }) } @@ -50,3 +54,33 @@ func (m *BaseModule) AttachTo(outputName string, input *Input) error { output.Attach(input) return nil } + +//func (m *BaseModule) AttachTo(output Module, channelName string) error { +// outputChannel, err := output.Output(channelName) +// if err != nil { +// return err +// } +// +// input, e := m.Input(channelName) +// if e != nil { +// return e +// } +// +// outputChannel.Attach(input) +// return nil +//} + +//func (m *BaseModule) Connect(input Module, channelName string) error { +// inputChannel, err := input.Input(channelName) +// if err != nil { +// return err +// } +// +// output, e := m.Output(channelName) +// if e != nil { +// return e +// } +// +// output.Attach(inputChannel) +// return nil +//} diff --git a/pkg/modules/baseModule_test.go b/pkg/modules/baseModule_test.go index 668fd8a..eede406 100644 --- a/pkg/modules/baseModule_test.go +++ b/pkg/modules/baseModule_test.go @@ -57,3 +57,35 @@ func TestBaseModule_NonExistingOutput(t *testing.T) { assert.Errorf(t, err, "%s: %s", ErrUnknownOutput.Error(), nonExistingChannelName) assert.Nil(t, output) } + +func TestBaseModule_AttachToOnExistingChannel(t *testing.T) { + bmSrc := &BaseModule{Outputs: sync.NewMap[string, *Output]()} + bmDst := &BaseModule{Inputs: sync.NewMap[string, *Input]()} + channelName := "data" + + bmSrc.Outputs.Set(channelName, NewOutput(channelName)) + bmDst.Inputs.Set(channelName, NewInput(channelName)) + + input, err := bmDst.Input(channelName) + assert.NoError(t, err) + + err = bmSrc.AttachTo(channelName, input) + assert.NoError(t, err) + + output, ok := bmSrc.Outputs.Get(channelName) + assert.True(t, ok) + + output.Push("hello") + + msg := <-input.Listen() + assert.Equal(t, "hello", msg) + + err = bmSrc.Close() + assert.NoError(t, err) + + err = bmDst.Close() + assert.NoError(t, err) + + _, ok = <-input.Listen() // TODO-DISCUSS + assert.False(t, ok) +} diff --git a/pkg/modules/printer/module.go b/pkg/modules/printer/module.go index 1c6a3e1..987d30b 100644 --- a/pkg/modules/printer/module.go +++ b/pkg/modules/printer/module.go @@ -32,7 +32,7 @@ func NewPrinter() *Printer { } // Name - -func (printer *Printer) Name() string { +func (*Printer) Name() string { return "printer" } diff --git a/pkg/sync/map.go b/pkg/sync/map.go index 71275fe..9270912 100644 --- a/pkg/sync/map.go +++ b/pkg/sync/map.go @@ -7,34 +7,34 @@ type Map[K comparable, V any] struct { mx *sync.RWMutex } -func NewMap[K comparable, V any]() Map[K, V] { - return Map[K, V]{ +func NewMap[K comparable, V any]() *Map[K, V] { + return &Map[K, V]{ m: make(map[K]V), mx: new(sync.RWMutex), } } -func (m Map[K, V]) Get(key K) (V, bool) { +func (m *Map[K, V]) Get(key K) (V, bool) { m.mx.RLock() val, ok := m.m[key] m.mx.RUnlock() return val, ok } -func (m Map[K, V]) Delete(key K) { +func (m *Map[K, V]) Delete(key K) { m.mx.Lock() delete(m.m, key) m.mx.Unlock() } -func (m Map[K, V]) Set(key K, value V) { +func (m *Map[K, V]) Set(key K, value V) { m.mx.Lock() m.m[key] = value m.mx.Unlock() } // Range (WARN) does not support nested ranges with Delete in them. -func (m Map[K, V]) Range(handler func(key K, value V) (error, bool)) error { +func (m *Map[K, V]) Range(handler func(key K, value V) (error, bool)) error { if handler == nil { return nil } @@ -53,7 +53,7 @@ func (m Map[K, V]) Range(handler func(key K, value V) (error, bool)) error { return nil } -func (m Map[K, V]) Clear() { +func (m *Map[K, V]) Clear() { m.mx.Lock() // clear(m.m) TODO: rewrite on go 1.21 for k := range m.m { @@ -62,7 +62,7 @@ func (m Map[K, V]) Clear() { m.mx.Unlock() } -func (m Map[K, V]) Len() int { +func (m *Map[K, V]) Len() int { m.mx.RLock() defer m.mx.RUnlock() return len(m.m) From b3594bdbafa0006c15b1d2f40bc4cfae85472e65 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Mon, 11 Sep 2023 20:28:52 +0300 Subject: [PATCH 04/18] Fixe linter errors. --- pkg/modules/baseModule.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go index 2362671..39ccc66 100644 --- a/pkg/modules/baseModule.go +++ b/pkg/modules/baseModule.go @@ -55,7 +55,7 @@ func (m *BaseModule) AttachTo(outputName string, input *Input) error { return nil } -//func (m *BaseModule) AttachTo(output Module, channelName string) error { +// func (m *BaseModule) AttachTo(output Module, channelName string) error { // outputChannel, err := output.Output(channelName) // if err != nil { // return err @@ -68,9 +68,9 @@ func (m *BaseModule) AttachTo(outputName string, input *Input) error { // // outputChannel.Attach(input) // return nil -//} +// } -//func (m *BaseModule) Connect(input Module, channelName string) error { +// func (m *BaseModule) Connect(input Module, channelName string) error { // inputChannel, err := input.Input(channelName) // if err != nil { // return err @@ -83,4 +83,4 @@ func (m *BaseModule) AttachTo(outputName string, input *Input) error { // // output.Attach(inputChannel) // return nil -//} +// } From f49f7417ea5034dc9b27076afa5379684562dbab Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Mon, 11 Sep 2023 20:32:49 +0300 Subject: [PATCH 05/18] Added unit test on Closing BaseModule. --- pkg/modules/baseModule_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/modules/baseModule_test.go b/pkg/modules/baseModule_test.go index eede406..87b400e 100644 --- a/pkg/modules/baseModule_test.go +++ b/pkg/modules/baseModule_test.go @@ -89,3 +89,21 @@ func TestBaseModule_AttachToOnExistingChannel(t *testing.T) { _, ok = <-input.Listen() // TODO-DISCUSS assert.False(t, ok) } + +func TestBaseModule_CloseSuccessfullyCloseInputChannels(t *testing.T) { + //bmSrc := &BaseModule{Outputs: sync.NewMap[string, *Output]()} + bmDst := &BaseModule{Inputs: sync.NewMap[string, *Input]()} + channelName := "data" + + //bmSrc.Outputs.Set(channelName, NewOutput(channelName)) + bmDst.Inputs.Set(channelName, NewInput(channelName)) + + input, err := bmDst.Input(channelName) + assert.NoError(t, err) + + err = bmDst.Close() + assert.NoError(t, err) + + _, ok := <-input.Listen() // TODO-DISCUSS + assert.False(t, ok) +} From 9f464430620699d3f3b51c855a1e13dcaa3cf238 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Mon, 11 Sep 2023 21:24:12 +0300 Subject: [PATCH 06/18] Fix errors on TestBaseModule close test. --- pkg/modules/baseModule_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/modules/baseModule_test.go b/pkg/modules/baseModule_test.go index 87b400e..cf4f5bb 100644 --- a/pkg/modules/baseModule_test.go +++ b/pkg/modules/baseModule_test.go @@ -91,11 +91,11 @@ func TestBaseModule_AttachToOnExistingChannel(t *testing.T) { } func TestBaseModule_CloseSuccessfullyCloseInputChannels(t *testing.T) { - //bmSrc := &BaseModule{Outputs: sync.NewMap[string, *Output]()} + // bmSrc := &BaseModule{Outputs: sync.NewMap[string, *Output]()} bmDst := &BaseModule{Inputs: sync.NewMap[string, *Input]()} channelName := "data" - //bmSrc.Outputs.Set(channelName, NewOutput(channelName)) + // bmSrc.Outputs.Set(channelName, NewOutput(channelName)) bmDst.Inputs.Set(channelName, NewInput(channelName)) input, err := bmDst.Input(channelName) From 4253fd73d6b4ce4df6fa1a95ccc190c5967f5567 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 10:19:55 +0300 Subject: [PATCH 07/18] Added Init method on BaseModule. Removed Close logic. --- pkg/modules/baseModule.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go index 39ccc66..dde731a 100644 --- a/pkg/modules/baseModule.go +++ b/pkg/modules/baseModule.go @@ -2,31 +2,39 @@ package modules import ( "context" + "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/pkg/sync" "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) var _ Module = &BaseModule{} type BaseModule struct { + name string Inputs *sync.Map[string, *Input] Outputs *sync.Map[string, *Output] + Log zerolog.Logger + G workerpool.Group +} + +func (m *BaseModule) Init(name string) { + m.name = name + m.Inputs = sync.NewMap[string, *Input]() + m.Outputs = sync.NewMap[string, *Output]() + m.Log = log.With().Str("module", name).Logger() + m.G = workerpool.NewGroup() } func (*BaseModule) Name() string { return "base_module" } -func (*BaseModule) Start(ctx context.Context) {} +func (*BaseModule) Start(_ context.Context) {} -func (m *BaseModule) Close() error { // TODO-DISCUSS - if m.Inputs == nil { - return nil - } - - return m.Inputs.Range(func(name string, input *Input) (error, bool) { // TODO-DISCUSS - return input.Close(), false - }) +func (*BaseModule) Close() error { + return nil } func (m *BaseModule) Input(name string) (*Input, error) { From 5e98ab15ef32278cd720ae5cb818829e0a485986 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 11:09:17 +0300 Subject: [PATCH 08/18] Refactor stopper with BaseModule, updated direction of AttachTo. --- pkg/modules/baseModule.go | 65 +++++++++++++-------------------- pkg/modules/baseModule_test.go | 66 +++++++++++++--------------------- pkg/modules/module.go | 2 +- pkg/modules/output.go | 7 +--- pkg/modules/stopper/stopper.go | 56 ++++++++--------------------- 5 files changed, 65 insertions(+), 131 deletions(-) diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go index dde731a..423aaca 100644 --- a/pkg/modules/baseModule.go +++ b/pkg/modules/baseModule.go @@ -13,22 +13,22 @@ var _ Module = &BaseModule{} type BaseModule struct { name string - Inputs *sync.Map[string, *Input] - Outputs *sync.Map[string, *Output] + inputs *sync.Map[string, *Input] + outputs *sync.Map[string, *Output] Log zerolog.Logger G workerpool.Group } func (m *BaseModule) Init(name string) { m.name = name - m.Inputs = sync.NewMap[string, *Input]() - m.Outputs = sync.NewMap[string, *Output]() + m.inputs = sync.NewMap[string, *Input]() + m.outputs = sync.NewMap[string, *Output]() m.Log = log.With().Str("module", name).Logger() m.G = workerpool.NewGroup() } -func (*BaseModule) Name() string { - return "base_module" +func (m *BaseModule) Name() string { + return m.name } func (*BaseModule) Start(_ context.Context) {} @@ -38,57 +38,40 @@ func (*BaseModule) Close() error { } func (m *BaseModule) Input(name string) (*Input, error) { - input, ok := m.Inputs.Get(name) + input, ok := m.inputs.Get(name) if !ok { return nil, errors.Wrap(ErrUnknownInput, name) } return input, nil } +func (m *BaseModule) CreateInput(name string) { + m.inputs.Set(name, NewInput(name)) +} + func (m *BaseModule) Output(name string) (*Output, error) { - output, ok := m.Outputs.Get(name) + output, ok := m.outputs.Get(name) if !ok { return nil, errors.Wrap(ErrUnknownOutput, name) } return output, nil } -func (m *BaseModule) AttachTo(outputName string, input *Input) error { - output, err := m.Output(outputName) +func (m *BaseModule) CreateOutput(name string) { + m.outputs.Set(name, NewOutput(name)) +} + +func (m *BaseModule) AttachTo(outputModule Module, outputName, inputName string) error { + outputChannel, err := outputModule.Output(outputName) + if err != nil { + return err + } + + input, err := m.Input(inputName) if err != nil { return err } - output.Attach(input) + outputChannel.Attach(input) return nil } - -// func (m *BaseModule) AttachTo(output Module, channelName string) error { -// outputChannel, err := output.Output(channelName) -// if err != nil { -// return err -// } -// -// input, e := m.Input(channelName) -// if e != nil { -// return e -// } -// -// outputChannel.Attach(input) -// return nil -// } - -// func (m *BaseModule) Connect(input Module, channelName string) error { -// inputChannel, err := input.Input(channelName) -// if err != nil { -// return err -// } -// -// output, e := m.Output(channelName) -// if e != nil { -// return e -// } -// -// output.Attach(inputChannel) -// return nil -// } diff --git a/pkg/modules/baseModule_test.go b/pkg/modules/baseModule_test.go index cf4f5bb..a8dfee6 100644 --- a/pkg/modules/baseModule_test.go +++ b/pkg/modules/baseModule_test.go @@ -7,11 +7,10 @@ import ( ) func TestBaseModule_ExistingInput(t *testing.T) { - bm := &BaseModule{ - Inputs: sync.NewMap[string, *Input](), - } + bm := &BaseModule{} + bm.Init("module") existingChannelName := "input-channel" - bm.Inputs.Set(existingChannelName, NewInput(existingChannelName)) + bm.CreateInput(existingChannelName) // Act input, err := bm.Input(existingChannelName) @@ -20,9 +19,8 @@ func TestBaseModule_ExistingInput(t *testing.T) { } func TestBaseModule_NonExistingInput(t *testing.T) { - bm := &BaseModule{ - Inputs: sync.NewMap[string, *Input](), - } + bm := &BaseModule{} + bm.Init("module") nonExistingChannelName := "non-existing-input-channel" // Act @@ -33,11 +31,10 @@ func TestBaseModule_NonExistingInput(t *testing.T) { } func TestBaseModule_ExistingOutput(t *testing.T) { - bm := &BaseModule{ - Outputs: sync.NewMap[string, *Output](), - } + bm := &BaseModule{} + bm.Init("module") existingChannelName := "output-channel" - bm.Outputs.Set(existingChannelName, NewOutput(existingChannelName)) + bm.CreateOutput(existingChannelName) // Act output, err := bm.Output(existingChannelName) @@ -46,9 +43,8 @@ func TestBaseModule_ExistingOutput(t *testing.T) { } func TestBaseModule_NonExistingOutput(t *testing.T) { - bm := &BaseModule{ - Outputs: sync.NewMap[string, *Output](), - } + bm := &BaseModule{} + bm.Init("module") nonExistingChannelName := "non-existing-output-channel" // Act @@ -59,21 +55,22 @@ func TestBaseModule_NonExistingOutput(t *testing.T) { } func TestBaseModule_AttachToOnExistingChannel(t *testing.T) { - bmSrc := &BaseModule{Outputs: sync.NewMap[string, *Output]()} - bmDst := &BaseModule{Inputs: sync.NewMap[string, *Input]()} - channelName := "data" + bmSrc := &BaseModule{outputs: sync.NewMap[string, *Output]()} + bmDst := &BaseModule{inputs: sync.NewMap[string, *Input]()} + inputName := "data-in" + outputName := "data-out" - bmSrc.Outputs.Set(channelName, NewOutput(channelName)) - bmDst.Inputs.Set(channelName, NewInput(channelName)) + bmSrc.CreateOutput(outputName) + bmDst.CreateInput(inputName) - input, err := bmDst.Input(channelName) + input, err := bmDst.Input(inputName) assert.NoError(t, err) - err = bmSrc.AttachTo(channelName, input) + err = bmDst.AttachTo(bmSrc, outputName, inputName) assert.NoError(t, err) - output, ok := bmSrc.Outputs.Get(channelName) - assert.True(t, ok) + output, err := bmSrc.Output(outputName) + assert.NoError(t, err) output.Push("hello") @@ -85,25 +82,12 @@ func TestBaseModule_AttachToOnExistingChannel(t *testing.T) { err = bmDst.Close() assert.NoError(t, err) - - _, ok = <-input.Listen() // TODO-DISCUSS - assert.False(t, ok) } -func TestBaseModule_CloseSuccessfullyCloseInputChannels(t *testing.T) { - // bmSrc := &BaseModule{Outputs: sync.NewMap[string, *Output]()} - bmDst := &BaseModule{Inputs: sync.NewMap[string, *Input]()} - channelName := "data" - - // bmSrc.Outputs.Set(channelName, NewOutput(channelName)) - bmDst.Inputs.Set(channelName, NewInput(channelName)) - - input, err := bmDst.Input(channelName) - assert.NoError(t, err) - - err = bmDst.Close() - assert.NoError(t, err) +func TestBaseModule_ReturnsCorrectName(t *testing.T) { + bm := &BaseModule{} + bm.Init("module") - _, ok := <-input.Listen() // TODO-DISCUSS - assert.False(t, ok) + name := bm.Name() + assert.Equal(t, "module", name) } diff --git a/pkg/modules/module.go b/pkg/modules/module.go index 319b44f..82b6ce7 100644 --- a/pkg/modules/module.go +++ b/pkg/modules/module.go @@ -22,5 +22,5 @@ type Module interface { Input(name string) (*Input, error) Output(name string) (*Output, error) - AttachTo(outputName string, input *Input) error + AttachTo(output Module, outputName, inputName string) error } diff --git a/pkg/modules/output.go b/pkg/modules/output.go index 8ae04b8..234d141 100644 --- a/pkg/modules/output.go +++ b/pkg/modules/output.go @@ -1,7 +1,6 @@ package modules import ( - "log" "sync" ) @@ -53,9 +52,5 @@ func (output *Output) Name() string { // Connect - func Connect(outputModule, inputModule Module, outputName, inputName string) error { - input, err := inputModule.Input(inputName) - if err != nil { - log.Panic(err) - } - return outputModule.AttachTo(outputName, input) + return inputModule.AttachTo(outputModule, outputName, inputName) } diff --git a/pkg/modules/stopper/stopper.go b/pkg/modules/stopper/stopper.go index ec92415..0f60418 100644 --- a/pkg/modules/stopper/stopper.go +++ b/pkg/modules/stopper/stopper.go @@ -2,10 +2,7 @@ package stopper import ( "context" - "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -21,38 +18,37 @@ const ( // | | // |----------------| type Module struct { - input *modules.Input - stop context.CancelFunc - log zerolog.Logger - g workerpool.Group + modules.BaseModule + stop context.CancelFunc } func NewModule(cancelFunc context.CancelFunc) Module { m := Module{ - input: modules.NewInput(InputName), - stop: cancelFunc, - g: workerpool.NewGroup(), + BaseModule: modules.BaseModule{}, + stop: cancelFunc, } - m.log = log.With().Str("module", m.Name()).Logger() + m.Init("stopper") + m.CreateInput(InputName) return m } -func (*Module) Name() string { - return "stopper" -} - // Start - func (s *Module) Start(ctx context.Context) { - s.g.GoCtx(ctx, s.listen) + s.G.GoCtx(ctx, s.listen) } func (s *Module) listen(ctx context.Context) { + input, err := s.Input(InputName) + if err != nil { + s.Log.Panic().Msg("while getting default input channel in listen") + } + for { select { case <-ctx.Done(): return - case <-s.input.Listen(): + case <-input.Listen(): log.Info().Msg("stop signal received") if s.stop != nil { log.Info().Msg("cancelling context...") @@ -65,30 +61,6 @@ func (s *Module) listen(ctx context.Context) { // Close - func (s *Module) Close() error { - s.g.Wait() - return s.input.Close() -} - -// Output - -func (*Module) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// Input - -func (s *Module) Input(name string) (*modules.Input, error) { - if name != InputName { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return s.input, nil -} - -// AttachTo - -func (s *Module) AttachTo(name string, input *modules.Input) error { - output, err := s.Output(name) - if err != nil { - return err - } - - output.Attach(input) + s.G.Wait() return nil } From 1af2283fe5c3754dc0aa956c1bc62808c93f80ef Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 11:15:16 +0300 Subject: [PATCH 09/18] Added unit test for stopper from another module. --- pkg/modules/stopper/stopper_test.go | 35 ++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/pkg/modules/stopper/stopper_test.go b/pkg/modules/stopper/stopper_test.go index 64bbf3f..4781365 100644 --- a/pkg/modules/stopper/stopper_test.go +++ b/pkg/modules/stopper/stopper_test.go @@ -2,11 +2,12 @@ package stopper import ( "context" + "github.com/dipdup-net/indexer-sdk/pkg/modules" "github.com/stretchr/testify/assert" "testing" ) -func TestStopperCallsStop(t *testing.T) { +func TestStopper_CallsStop(t *testing.T) { stopWasCalled := false var stopFunc context.CancelFunc = func() { @@ -30,3 +31,35 @@ func TestStopperCallsStop(t *testing.T) { assert.True(t, stopWasCalled, "stop was never called") } + +func TestStopper_CallsStopFromAnotherModule(t *testing.T) { + stopWasCalled := false + + var stopFunc context.CancelFunc = func() { + stopWasCalled = true + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stopperModule := NewModule(stopFunc) + workerModule := &modules.BaseModule{} + workerModule.Init("worker") + workerModule.CreateOutput("stop") + + err := stopperModule.AttachTo(workerModule, "stop", InputName) + assert.NoError(t, err) + + stopperModule.Start(ctx) + + workerOutput, err := workerModule.Output("stop") + assert.NoError(t, err) + + // Act: send stop signal to stopper + workerOutput.Push(struct{}{}) + + err = stopperModule.Close() + assert.NoError(t, err) + + assert.True(t, stopWasCalled, "stop was never called") +} From ed5a6478b15d47cab854b0fc81c8f93451afbaa9 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 11:16:41 +0300 Subject: [PATCH 10/18] Added typecheck for stopper Module to implement interface. --- pkg/modules/stopper/stopper.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/modules/stopper/stopper.go b/pkg/modules/stopper/stopper.go index 0f60418..d6c4801 100644 --- a/pkg/modules/stopper/stopper.go +++ b/pkg/modules/stopper/stopper.go @@ -6,10 +6,6 @@ import ( "github.com/rs/zerolog/log" ) -const ( - InputName = "signal" -) - // Module - cancels context of all application if get signal. // // |----------------| @@ -22,6 +18,12 @@ type Module struct { stop context.CancelFunc } +var _ modules.Module = &Module{} + +const ( + InputName = "signal" +) + func NewModule(cancelFunc context.CancelFunc) Module { m := Module{ BaseModule: modules.BaseModule{}, From 39bc16a1747aa1d03f05b5ba328011f6305f0874 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 12:01:14 +0300 Subject: [PATCH 11/18] Printer rewritten with BaseModule. --- pkg/modules/printer/module.go | 95 ----------------------------- pkg/modules/printer/printer.go | 60 ++++++++++++++++++ pkg/modules/printer/printer_test.go | 43 +++++++++++++ 3 files changed, 103 insertions(+), 95 deletions(-) delete mode 100644 pkg/modules/printer/module.go create mode 100644 pkg/modules/printer/printer.go create mode 100644 pkg/modules/printer/printer_test.go diff --git a/pkg/modules/printer/module.go b/pkg/modules/printer/module.go deleted file mode 100644 index 987d30b..0000000 --- a/pkg/modules/printer/module.go +++ /dev/null @@ -1,95 +0,0 @@ -package printer - -import ( - "context" - "fmt" - "sync" - - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" -) - -// input name -const ( - InputName = "input" -) - -// Printer - the structure which is responsible for print received messages -type Printer struct { - input *modules.Input - - wg *sync.WaitGroup -} - -// NewPrinter - constructor of printer structure -func NewPrinter() *Printer { - return &Printer{ - input: modules.NewInput(InputName), - - wg: new(sync.WaitGroup), - } -} - -// Name - -func (*Printer) Name() string { - return "printer" -} - -// Input - returns input by name -func (printer *Printer) Input(name string) (*modules.Input, error) { - switch name { - case InputName: - return printer.input, nil - default: - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } -} - -// Output - returns output by name -func (printer *Printer) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// AttachTo - attach input to output with name -func (printer *Printer) AttachTo(name string, input *modules.Input) error { - output, err := printer.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - -// Close - gracefully stops module -func (printer *Printer) Close() error { - printer.wg.Wait() - - if err := printer.input.Close(); err != nil { - return err - } - - return nil -} - -// Start - starts module -func (printer *Printer) Start(ctx context.Context) { - printer.wg.Add(1) - go printer.listen(ctx) -} - -func (printer *Printer) listen(ctx context.Context) { - defer printer.wg.Done() - - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-printer.input.Listen(): - if !ok { - return - } - log.Info().Str("obj_type", fmt.Sprintf("%T", msg)).Msgf("%##v", msg) - } - } -} diff --git a/pkg/modules/printer/printer.go b/pkg/modules/printer/printer.go new file mode 100644 index 0000000..b15a82f --- /dev/null +++ b/pkg/modules/printer/printer.go @@ -0,0 +1,60 @@ +package printer + +import ( + "context" + "fmt" + "github.com/dipdup-net/indexer-sdk/pkg/modules" +) + +// Printer - the structure which is responsible for print received messages +type Printer struct { + modules.BaseModule + cancelListener context.CancelFunc +} + +const InputName = "input" + +var _ modules.Module = &Printer{} + +// NewPrinter - constructor of printer structure +func NewPrinter() Printer { + p := Printer{} + p.Init("printer") + p.CreateInput(InputName) + return p +} + +// Close - gracefully stops module +func (printer *Printer) Close() error { + if printer.cancelListener != nil { + printer.cancelListener() + } + printer.G.Wait() + return nil +} + +// Start - starts module +func (printer *Printer) Start(ctx context.Context) { + listenerCtx, c := context.WithCancel(ctx) + printer.cancelListener = c + printer.G.GoCtx(listenerCtx, printer.listen) +} + +func (printer *Printer) listen(ctx context.Context) { + input, err := printer.Input(InputName) + if err != nil { + printer.Log.Panic().Msg("while getting default input channel") + } + + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-input.Listen(): + if !ok { + return + } + printer.Log.Info().Str("obj_type", fmt.Sprintf("%T", msg)).Msgf("%##v", msg) + } + } +} diff --git a/pkg/modules/printer/printer_test.go b/pkg/modules/printer/printer_test.go new file mode 100644 index 0000000..59e8f5e --- /dev/null +++ b/pkg/modules/printer/printer_test.go @@ -0,0 +1,43 @@ +package printer + +import ( + "context" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestPrinter_ExpectedPrintedValue(t *testing.T) { + p := NewPrinter() + logs := &logSink{} + logger := zerolog.New(logs) + p.Log = logger + + input, err := p.Input(InputName) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p.Start(ctx) + + input.Push("hello") + + time.Sleep(time.Millisecond * 100) // TODO-UGLY + err = p.Close() + assert.NoError(t, err) + assert.Contains(t, logs.Index(0), "hello") +} + +type logSink struct { + logs []string +} + +func (l *logSink) Write(p []byte) (n int, err error) { + l.logs = append(l.logs, string(p)) + return len(p), nil +} + +func (l *logSink) Index(i int) string { + return l.logs[i] +} From 08949bc5e8f333b66b37bab71591c52a2bf2e1de Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 12:25:25 +0300 Subject: [PATCH 12/18] Refactored cron module with BaseModule. --- examples/cron/custom.go | 20 +++++++++++++++-- pkg/modules/cron/README.md | 4 ++-- pkg/modules/cron/module.go | 46 +++++++++----------------------------- 3 files changed, 31 insertions(+), 39 deletions(-) diff --git a/examples/cron/custom.go b/examples/cron/custom.go index 12844c6..399707c 100644 --- a/examples/cron/custom.go +++ b/examples/cron/custom.go @@ -9,6 +9,8 @@ import ( "github.com/rs/zerolog/log" ) +var _ modules.Module = &CustomModule{} + // CustomModule - type CustomModule struct { everySecond *modules.Input @@ -50,8 +52,22 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) { } // AttachTo - -func (m *CustomModule) AttachTo(name string, input *modules.Input) error { - return errors.Wrap(modules.ErrUnknownOutput, name) +func (m *CustomModule) AttachTo(outputM modules.Module, outputName, inputName string) error { + output, err := outputM.Output(outputName) + if err != nil { + return nil + } + + switch inputName { + case "every_second": + output.Attach(m.everySecond) + return nil + case "every_five_second": + output.Attach(m.everyFiveSecond) + return nil + default: + return errors.Wrap(modules.ErrUnknownInput, inputName) + } } func (m *CustomModule) listen(ctx context.Context) { diff --git a/pkg/modules/cron/README.md b/pkg/modules/cron/README.md index d33bb1b..b16f110 100644 --- a/pkg/modules/cron/README.md +++ b/pkg/modules/cron/README.md @@ -55,13 +55,13 @@ So if your module should execute some work on `every_second` scheduled events fr ```go // with helper function -if err := modules.Connect(cronModule, customModule, "every_second", "every_second"); err != nil { +if err := modules.Connect(cronModule, customModule, "every_second", "custom_module_every_second_input"); err != nil { log.Panic(err) } // or directly to module -if err := cronModule.AttachTo("every_second", customModule.everySecond); err != nil { +if err := customModule.AttachTo(cronModule, "every_second", "custom_module_every_second_input"); err != nil { log.Panic(err) } ``` diff --git a/pkg/modules/cron/module.go b/pkg/modules/cron/module.go index 64ff80e..6b4bee3 100644 --- a/pkg/modules/cron/module.go +++ b/pkg/modules/cron/module.go @@ -4,17 +4,17 @@ import ( "context" "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" "github.com/robfig/cron/v3" ) // Module - cron module type Module struct { cron *cron.Cron - - outputs map[string]*modules.Output + modules.BaseModule } +var _ modules.Module = &Module{} + // NewModule - creates cron module func NewModule(cfg *Config) (*Module, error) { module := &Module{ @@ -24,10 +24,12 @@ func NewModule(cfg *Config) (*Module, error) { )), // cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger)), ), - outputs: make(map[string]*modules.Output), } + + module.Init("cron") + for job, pattern := range cfg.Jobs { - module.outputs[job] = modules.NewOutput(job) + module.CreateOutput(job) if _, err := module.cron.AddFunc( pattern, @@ -40,11 +42,6 @@ func NewModule(cfg *Config) (*Module, error) { return module, nil } -// Name - -func (*Module) Name() string { - return "cron" -} - // Start - starts cron scheduler func (module *Module) Start(ctx context.Context) { module.cron.Start() @@ -56,33 +53,12 @@ func (module *Module) Close() error { return nil } -// Output - -func (module *Module) Output(name string) (*modules.Output, error) { - output, ok := module.outputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return output, nil -} - -// Input - -func (module *Module) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// AttachTo - -func (module *Module) AttachTo(name string, input *modules.Input) error { - output, err := module.Output(name) +func (module *Module) notify(job, pattern string) { + output, err := module.Output(job) if err != nil { - return err + module.Log.Panic().Msg("while getting output for notification") } - - output.Attach(input) - return nil -} - -func (module *Module) notify(job, pattern string) { - module.outputs[job].Push(struct{}{}) + output.Push(struct{}{}) } func newHandler(module *Module, job, pattern string) func() { From 3bbb08e8632ef3248dcf7c269a00d54daceeee47 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 12:29:59 +0300 Subject: [PATCH 13/18] Removed dublicator module. --- examples/duplicator/main.go | 68 --------------- pkg/modules/README.md | 4 - pkg/modules/duplicator/README.md | 58 ------------- pkg/modules/duplicator/functions.go | 13 --- pkg/modules/duplicator/module.go | 127 ---------------------------- 5 files changed, 270 deletions(-) delete mode 100644 examples/duplicator/main.go delete mode 100644 pkg/modules/duplicator/README.md delete mode 100644 pkg/modules/duplicator/functions.go delete mode 100644 pkg/modules/duplicator/module.go diff --git a/examples/duplicator/main.go b/examples/duplicator/main.go deleted file mode 100644 index cfde062..0000000 --- a/examples/duplicator/main.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/dipdup-net/indexer-sdk/pkg/modules/cron" - "github.com/dipdup-net/indexer-sdk/pkg/modules/duplicator" - "github.com/dipdup-net/indexer-sdk/pkg/modules/printer" -) - -func main() { - cronModule1, err := cron.NewModule(&cron.Config{ - Jobs: map[string]string{ - "ticker": "@every 5s", - }, - }) - if err != nil { - log.Panic(err) - } - cronModule2, err := cron.NewModule(&cron.Config{ - Jobs: map[string]string{ - "ticker": "* * * * * *", - }, - }) - if err != nil { - log.Panic(err) - } - - dup := duplicator.NewDuplicator(2, 1) - - print := printer.NewPrinter() - - if err := modules.Connect(cronModule1, dup, "ticker", duplicator.GetInputName(0)); err != nil { - log.Panic(err) - } - if err := modules.Connect(cronModule2, dup, "ticker", duplicator.GetInputName(1)); err != nil { - log.Panic(err) - } - if err := modules.Connect(dup, print, duplicator.GetOutputName(0), printer.InputName); err != nil { - log.Panic(err) - } - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - print.Start(ctx) - dup.Start(ctx) - cronModule2.Start(ctx) - cronModule1.Start(ctx) - - <-ctx.Done() - cancel() - - if err := cronModule1.Close(); err != nil { - log.Print(err) - } - if err := cronModule2.Close(); err != nil { - log.Print(err) - } - if err := dup.Close(); err != nil { - log.Print(err) - } - if err := print.Close(); err != nil { - log.Print(err) - } -} diff --git a/pkg/modules/README.md b/pkg/modules/README.md index fb9fbd7..c6ee717 100644 --- a/pkg/modules/README.md +++ b/pkg/modules/README.md @@ -97,7 +97,3 @@ gRPC module where realized default client and server. Detailed docs can be found ### Cron Cron module implements cron scheduler. Detailed docs can be found [here](/pkg/modules/cron/). - -### Duplicator - -Duplicator module repeats signal from one of inputs to all outputs. Detailed docs can be found [here](/pkg/modules/duplicator/). \ No newline at end of file diff --git a/pkg/modules/duplicator/README.md b/pkg/modules/duplicator/README.md deleted file mode 100644 index ddbe956..0000000 --- a/pkg/modules/duplicator/README.md +++ /dev/null @@ -1,58 +0,0 @@ -# Duplicator - -Duplicator module repeats signal from one of inputs to all outputs - -## Usage - -Usage of duplicator module is described by the [example](/examples/duplicator/). - -To import module in your code write following line: - -```go -import "github.com/dipdup-net/indexer-sdk/pkg/modules/duplicator" -``` - -Duplicator module implements interface `Module`. So you can use it like any other module. For example: - -```go -// create duplicator module with 2 inputs and 2 outputs. Any signal from one of 2 output will be passed to 2 outputs. -duplicatorModule, err := duplicator.NewModule(2, 2) -if err != nil { - log.Panic(err) -} -// start duplicator module -duplicatorModule.Start(ctx) - -// your code is here - -// close duplicator module -if err := duplicatorModule.Close(); err != nil { - log.Panic(err) -} -``` - -## Input - -Module reply to its outputs all signals from inputs. You should connect it with next code: - -```go -// with helper function. You should pass a input number to function GetInputName for receiving input name - -if err := modules.Connect(customModule, dup, "your_output_of_module", duplicator.GetInputName(0)); err != nil { - log.Panic(err) -} - -``` - -## Output - -You should connect it with next code: - -```go -// with helper function. You should pass a output number to function GetOutputName for receiving output name - -if err := modules.Connect(dup, customModule, duplicator.GetOutputName(0), "your_output_of_module"); err != nil { - log.Panic(err) -} - -``` diff --git a/pkg/modules/duplicator/functions.go b/pkg/modules/duplicator/functions.go deleted file mode 100644 index c9a1526..0000000 --- a/pkg/modules/duplicator/functions.go +++ /dev/null @@ -1,13 +0,0 @@ -package duplicator - -import "fmt" - -// GetInputName - -func GetInputName(idx int) string { - return fmt.Sprintf("%s_%d", InputName, idx) -} - -// GetOutputName - -func GetOutputName(idx int) string { - return fmt.Sprintf("%s_%d", OutputName, idx) -} diff --git a/pkg/modules/duplicator/module.go b/pkg/modules/duplicator/module.go deleted file mode 100644 index 3b88d87..0000000 --- a/pkg/modules/duplicator/module.go +++ /dev/null @@ -1,127 +0,0 @@ -package duplicator - -import ( - "context" - "reflect" - "sync" - - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" -) - -// names -const ( - InputName = "input" - OutputName = "output" -) - -// Duplicator - the structure which is responsible for duplicate signal from one of inputs to all outputs -type Duplicator struct { - inputs map[string]*modules.Input - outputs map[string]*modules.Output - - wg *sync.WaitGroup -} - -// NewDuplicator - constructor of Duplicator structure -func NewDuplicator(inputsCount, outputsCount int) *Duplicator { - d := &Duplicator{ - inputs: make(map[string]*modules.Input), - outputs: make(map[string]*modules.Output), - - wg: new(sync.WaitGroup), - } - - for i := 0; i < inputsCount; i++ { - name := GetInputName(i) - d.inputs[name] = modules.NewInput(name) - } - - for i := 0; i < outputsCount; i++ { - name := GetOutputName(i) - d.outputs[name] = modules.NewOutput(name) - } - - return d -} - -// Name - -func (duplicator *Duplicator) Name() string { - return "duplicator" -} - -// Input - returns input by name -func (duplicator *Duplicator) Input(name string) (*modules.Input, error) { - input, ok := duplicator.inputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return input, nil -} - -// Output - returns output by name -func (duplicator *Duplicator) Output(name string) (*modules.Output, error) { - output, ok := duplicator.outputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return output, nil -} - -// AttachTo - attach input to output with name -func (duplicator *Duplicator) AttachTo(name string, input *modules.Input) error { - output, err := duplicator.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - -// Close - gracefully stops module -func (duplicator *Duplicator) Close() error { - duplicator.wg.Wait() - - for _, i := range duplicator.inputs { - if err := i.Close(); err != nil { - return err - } - } - - return nil -} - -// Start - starts module -func (duplicator *Duplicator) Start(ctx context.Context) { - duplicator.wg.Add(1) - go duplicator.listen(ctx) -} - -func (duplicator *Duplicator) listen(ctx context.Context) { - defer duplicator.wg.Done() - - cases := []reflect.SelectCase{ - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, - } - for _, input := range duplicator.inputs { - cases = append(cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(input.Listen()), - }, - ) - } - - for { - _, value, ok := reflect.Select(cases) - if !ok { - return - } - for _, output := range duplicator.outputs { - output.Push(value.Interface()) - } - } -} From 3e93559583ecc374916058366f2347288093ceb4 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 12:32:47 +0300 Subject: [PATCH 14/18] Update grpc module with new AttachTo logic. --- examples/grpc/client.go | 12 +++++++++--- examples/grpc/custom.go | 15 +++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/examples/grpc/client.go b/examples/grpc/client.go index bd6bbaf..b579f78 100644 --- a/examples/grpc/client.go +++ b/examples/grpc/client.go @@ -113,12 +113,18 @@ func (client *Client) Output(name string) (*modules.Output, error) { } // AttachTo - -func (client *Client) AttachTo(name string, input *modules.Input) error { - output, err := client.Output(name) +func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error { + outputChannel, err := outputModule.Output(outputName) if err != nil { return err } - output.Attach(input) + + input, err := client.Input(inputName) + if err != nil { + return err + } + + outputChannel.Attach(input) return nil } diff --git a/examples/grpc/custom.go b/examples/grpc/custom.go index 190dd18..1f0431d 100644 --- a/examples/grpc/custom.go +++ b/examples/grpc/custom.go @@ -45,8 +45,19 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) { } // AttachTo - -func (m *CustomModule) AttachTo(name string, input *modules.Input) error { - return errors.Wrap(modules.ErrUnknownOutput, name) +func (m *CustomModule) AttachTo(outputModule modules.Module, outputName, inputName string) error { + outputChannel, err := outputModule.Output(outputName) + if err != nil { + return err + } + + input, err := m.Input(inputName) + if err != nil { + return err + } + + outputChannel.Attach(input) + return nil } func (m *CustomModule) listen(ctx context.Context) { From 905ad6ffea786b2bf1c3e1195cfaab7323f7d092 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Tue, 12 Sep 2023 12:38:58 +0300 Subject: [PATCH 15/18] Updated zipper with new AttachTo logic. --- examples/grpc/client.go | 2 ++ examples/zipper/custom.go | 12 +++++++++--- examples/zipper/main.go | 8 +++++--- pkg/modules/zipper/module.go | 12 +++++++++--- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/examples/grpc/client.go b/examples/grpc/client.go index b579f78..465ee3d 100644 --- a/examples/grpc/client.go +++ b/examples/grpc/client.go @@ -23,6 +23,8 @@ type Client struct { wg *sync.WaitGroup } +var _ modules.Module = &Client{} + // NewClient - func NewClient(server string) *Client { return &Client{ diff --git a/examples/zipper/custom.go b/examples/zipper/custom.go index fb607b9..e614e5c 100644 --- a/examples/zipper/custom.go +++ b/examples/zipper/custom.go @@ -43,12 +43,18 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) { } // AttachTo - -func (m *CustomModule) AttachTo(name string, input *modules.Input) error { - output, err := m.Output(name) +func (m *CustomModule) AttachTo(outputModule modules.Module, outputName, inputName string) error { + outputChannel, err := outputModule.Output(outputName) if err != nil { return err } - output.Attach(input) + + input, err := m.Input(inputName) + if err != nil { + return err + } + + outputChannel.Attach(input) return nil } diff --git a/examples/zipper/main.go b/examples/zipper/main.go index 0622105..9d44f24 100644 --- a/examples/zipper/main.go +++ b/examples/zipper/main.go @@ -15,17 +15,19 @@ func main() { first := NewCustomModule(10, -1, "first") second := NewCustomModule(0, 1, "second") - if err := modules.Connect(first, zip, "output", zipper.FirstInputName); err != nil { + if err := modules.Connect(first, zip, zipper.OutputName, zipper.FirstInputName); err != nil { log.Panic(err) } - if err := modules.Connect(second, zip, "output", zipper.SecondInputName); err != nil { + if err := modules.Connect(second, zip, zipper.OutputName, zipper.SecondInputName); err != nil { log.Panic(err) } fakeInput := modules.NewInput("fake") - if err := zip.AttachTo(zipper.OutputName, fakeInput); err != nil { + zipOutput, err := zip.Output(zipper.OutputName) + if err != nil { log.Panic(err) } + zipOutput.Attach(fakeInput) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() diff --git a/pkg/modules/zipper/module.go b/pkg/modules/zipper/module.go index ade085c..8916516 100644 --- a/pkg/modules/zipper/module.go +++ b/pkg/modules/zipper/module.go @@ -79,12 +79,18 @@ func (m *Module[Key]) Output(name string) (*modules.Output, error) { } // AttachTo - attach input to output with name -func (m *Module[Key]) AttachTo(name string, input *modules.Input) error { - output, err := m.Output(name) +func (m *Module[Key]) AttachTo(outputModule modules.Module, outputName, inputName string) error { + outputChannel, err := outputModule.Output(outputName) if err != nil { return err } - output.Attach(input) + + input, err := m.Input(inputName) + if err != nil { + return err + } + + outputChannel.Attach(input) return nil } From 6f9c9eb9a0fba29a615e5381565f003da1baebc4 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Wed, 13 Sep 2023 00:01:44 +0300 Subject: [PATCH 16/18] Refactor BaseModule with constructor New. --- pkg/modules/baseModule.go | 18 +++++++++++------- pkg/modules/baseModule_test.go | 15 +++++---------- pkg/modules/cron/module.go | 3 +-- pkg/modules/printer/printer.go | 3 +-- pkg/modules/stopper/stopper.go | 3 +-- pkg/modules/stopper/stopper_test.go | 5 ++--- 6 files changed, 21 insertions(+), 26 deletions(-) diff --git a/pkg/modules/baseModule.go b/pkg/modules/baseModule.go index 423aaca..a856749 100644 --- a/pkg/modules/baseModule.go +++ b/pkg/modules/baseModule.go @@ -9,7 +9,7 @@ import ( "github.com/rs/zerolog/log" ) -var _ Module = &BaseModule{} +var _ Module = (*BaseModule)(nil) type BaseModule struct { name string @@ -19,12 +19,16 @@ type BaseModule struct { G workerpool.Group } -func (m *BaseModule) Init(name string) { - m.name = name - m.inputs = sync.NewMap[string, *Input]() - m.outputs = sync.NewMap[string, *Output]() - m.Log = log.With().Str("module", name).Logger() - m.G = workerpool.NewGroup() +func New(name string) BaseModule { + m := BaseModule{ + name: name, + inputs: sync.NewMap[string, *Input](), + outputs: sync.NewMap[string, *Output](), + Log: log.With().Str("module", name).Logger(), + G: workerpool.NewGroup(), + } + + return m } func (m *BaseModule) Name() string { diff --git a/pkg/modules/baseModule_test.go b/pkg/modules/baseModule_test.go index a8dfee6..701389d 100644 --- a/pkg/modules/baseModule_test.go +++ b/pkg/modules/baseModule_test.go @@ -7,8 +7,7 @@ import ( ) func TestBaseModule_ExistingInput(t *testing.T) { - bm := &BaseModule{} - bm.Init("module") + bm := New("module") existingChannelName := "input-channel" bm.CreateInput(existingChannelName) @@ -19,8 +18,7 @@ func TestBaseModule_ExistingInput(t *testing.T) { } func TestBaseModule_NonExistingInput(t *testing.T) { - bm := &BaseModule{} - bm.Init("module") + bm := New("module") nonExistingChannelName := "non-existing-input-channel" // Act @@ -31,8 +29,7 @@ func TestBaseModule_NonExistingInput(t *testing.T) { } func TestBaseModule_ExistingOutput(t *testing.T) { - bm := &BaseModule{} - bm.Init("module") + bm := New("module") existingChannelName := "output-channel" bm.CreateOutput(existingChannelName) @@ -43,8 +40,7 @@ func TestBaseModule_ExistingOutput(t *testing.T) { } func TestBaseModule_NonExistingOutput(t *testing.T) { - bm := &BaseModule{} - bm.Init("module") + bm := New("module") nonExistingChannelName := "non-existing-output-channel" // Act @@ -85,8 +81,7 @@ func TestBaseModule_AttachToOnExistingChannel(t *testing.T) { } func TestBaseModule_ReturnsCorrectName(t *testing.T) { - bm := &BaseModule{} - bm.Init("module") + bm := New("module") name := bm.Name() assert.Equal(t, "module", name) diff --git a/pkg/modules/cron/module.go b/pkg/modules/cron/module.go index 6b4bee3..170daa7 100644 --- a/pkg/modules/cron/module.go +++ b/pkg/modules/cron/module.go @@ -18,6 +18,7 @@ var _ modules.Module = &Module{} // NewModule - creates cron module func NewModule(cfg *Config) (*Module, error) { module := &Module{ + BaseModule: modules.New("cron"), cron: cron.New( cron.WithParser(cron.NewParser( cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, @@ -26,8 +27,6 @@ func NewModule(cfg *Config) (*Module, error) { ), } - module.Init("cron") - for job, pattern := range cfg.Jobs { module.CreateOutput(job) diff --git a/pkg/modules/printer/printer.go b/pkg/modules/printer/printer.go index b15a82f..9901834 100644 --- a/pkg/modules/printer/printer.go +++ b/pkg/modules/printer/printer.go @@ -18,8 +18,7 @@ var _ modules.Module = &Printer{} // NewPrinter - constructor of printer structure func NewPrinter() Printer { - p := Printer{} - p.Init("printer") + p := Printer{BaseModule: modules.New("printer")} p.CreateInput(InputName) return p } diff --git a/pkg/modules/stopper/stopper.go b/pkg/modules/stopper/stopper.go index d6c4801..8b18378 100644 --- a/pkg/modules/stopper/stopper.go +++ b/pkg/modules/stopper/stopper.go @@ -26,10 +26,9 @@ const ( func NewModule(cancelFunc context.CancelFunc) Module { m := Module{ - BaseModule: modules.BaseModule{}, + BaseModule: modules.New("stopper"), stop: cancelFunc, } - m.Init("stopper") m.CreateInput(InputName) return m diff --git a/pkg/modules/stopper/stopper_test.go b/pkg/modules/stopper/stopper_test.go index 4781365..ad9c39d 100644 --- a/pkg/modules/stopper/stopper_test.go +++ b/pkg/modules/stopper/stopper_test.go @@ -43,11 +43,10 @@ func TestStopper_CallsStopFromAnotherModule(t *testing.T) { defer cancel() stopperModule := NewModule(stopFunc) - workerModule := &modules.BaseModule{} - workerModule.Init("worker") + workerModule := modules.New("module") workerModule.CreateOutput("stop") - err := stopperModule.AttachTo(workerModule, "stop", InputName) + err := stopperModule.AttachTo(&workerModule, "stop", InputName) assert.NoError(t, err) stopperModule.Start(ctx) From 38b291353fde5d740171e36b8767139c65f87e7d Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Wed, 13 Sep 2023 00:05:12 +0300 Subject: [PATCH 17/18] Fixed memory allocation on type check for modules. --- examples/cron/custom.go | 2 +- examples/grpc/client.go | 2 +- pkg/modules/cron/module.go | 2 +- pkg/modules/printer/printer.go | 2 +- pkg/modules/stopper/stopper.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/cron/custom.go b/examples/cron/custom.go index 399707c..645ff5b 100644 --- a/examples/cron/custom.go +++ b/examples/cron/custom.go @@ -9,7 +9,7 @@ import ( "github.com/rs/zerolog/log" ) -var _ modules.Module = &CustomModule{} +var _ modules.Module = (*CustomModule)(nil) // CustomModule - type CustomModule struct { diff --git a/examples/grpc/client.go b/examples/grpc/client.go index 465ee3d..15f1df5 100644 --- a/examples/grpc/client.go +++ b/examples/grpc/client.go @@ -23,7 +23,7 @@ type Client struct { wg *sync.WaitGroup } -var _ modules.Module = &Client{} +var _ modules.Module = (*Client)(nil) // NewClient - func NewClient(server string) *Client { diff --git a/pkg/modules/cron/module.go b/pkg/modules/cron/module.go index 170daa7..21a5193 100644 --- a/pkg/modules/cron/module.go +++ b/pkg/modules/cron/module.go @@ -13,7 +13,7 @@ type Module struct { modules.BaseModule } -var _ modules.Module = &Module{} +var _ modules.Module = (*Module)(nil) // NewModule - creates cron module func NewModule(cfg *Config) (*Module, error) { diff --git a/pkg/modules/printer/printer.go b/pkg/modules/printer/printer.go index 9901834..d3b472f 100644 --- a/pkg/modules/printer/printer.go +++ b/pkg/modules/printer/printer.go @@ -14,7 +14,7 @@ type Printer struct { const InputName = "input" -var _ modules.Module = &Printer{} +var _ modules.Module = (*Printer)(nil) // NewPrinter - constructor of printer structure func NewPrinter() Printer { diff --git a/pkg/modules/stopper/stopper.go b/pkg/modules/stopper/stopper.go index 8b18378..5fde3eb 100644 --- a/pkg/modules/stopper/stopper.go +++ b/pkg/modules/stopper/stopper.go @@ -18,7 +18,7 @@ type Module struct { stop context.CancelFunc } -var _ modules.Module = &Module{} +var _ modules.Module = (*Module)(nil) const ( InputName = "signal" From 48b21289c8f86d613253518a00c8827cf7924f75 Mon Sep 17 00:00:00 2001 From: Lavysh Alexander Date: Wed, 13 Sep 2023 21:18:25 +0300 Subject: [PATCH 18/18] Added run configurations. --- .run/examples_cron.run.xml | 21 +++++++++++++++++++++ .run/examples_grpc.run.xml | 21 +++++++++++++++++++++ .run/examples_zipper.run.xml | 21 +++++++++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 .run/examples_cron.run.xml create mode 100644 .run/examples_grpc.run.xml create mode 100644 .run/examples_zipper.run.xml diff --git a/.run/examples_cron.run.xml b/.run/examples_cron.run.xml new file mode 100644 index 0000000..a3faea3 --- /dev/null +++ b/.run/examples_cron.run.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.run/examples_grpc.run.xml b/.run/examples_grpc.run.xml new file mode 100644 index 0000000..ec832c0 --- /dev/null +++ b/.run/examples_grpc.run.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.run/examples_zipper.run.xml b/.run/examples_zipper.run.xml new file mode 100644 index 0000000..dc88a50 --- /dev/null +++ b/.run/examples_zipper.run.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + \ No newline at end of file