Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds BaseModule #7

Merged
merged 18 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions examples/cron/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/rs/zerolog/log"
)

var _ modules.Module = (*CustomModule)(nil)

// CustomModule -
type CustomModule struct {
everySecond *modules.Input
Expand Down Expand Up @@ -50,8 +52,22 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
func (m *CustomModule) AttachTo(outputM modules.Module, outputName, inputName string) error {
output, err := outputM.Output(outputName)
if err != nil {
return nil
}

switch inputName {
case "every_second":
output.Attach(m.everySecond)
return nil
case "every_five_second":
output.Attach(m.everyFiveSecond)
return nil
default:
return errors.Wrap(modules.ErrUnknownInput, inputName)
}
}

func (m *CustomModule) listen(ctx context.Context) {
Expand Down
68 changes: 0 additions & 68 deletions examples/duplicator/main.go

This file was deleted.

14 changes: 11 additions & 3 deletions examples/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Client struct {
wg *sync.WaitGroup
}

var _ modules.Module = (*Client)(nil)

// NewClient -
func NewClient(server string) *Client {
return &Client{
Expand Down Expand Up @@ -113,12 +115,18 @@ func (client *Client) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
output, err := client.Output(name)
func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}

input, err := client.Input(inputName)
if err != nil {
return err
}
output.Attach(input)

outputChannel.Attach(input)
return nil
}

Expand Down
15 changes: 13 additions & 2 deletions examples/grpc/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,19 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
func (m *CustomModule) AttachTo(outputModule modules.Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}

input, err := m.Input(inputName)
if err != nil {
return err
}

outputChannel.Attach(input)
return nil
}

func (m *CustomModule) listen(ctx context.Context) {
Expand Down
12 changes: 9 additions & 3 deletions examples/zipper/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
output, err := m.Output(name)
func (m *CustomModule) AttachTo(outputModule modules.Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}
output.Attach(input)

input, err := m.Input(inputName)
if err != nil {
return err
}

outputChannel.Attach(input)
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions examples/zipper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ func main() {
first := NewCustomModule(10, -1, "first")
second := NewCustomModule(0, 1, "second")

if err := modules.Connect(first, zip, "output", zipper.FirstInputName); err != nil {
if err := modules.Connect(first, zip, zipper.OutputName, zipper.FirstInputName); err != nil {
log.Panic(err)
}
if err := modules.Connect(second, zip, "output", zipper.SecondInputName); err != nil {
if err := modules.Connect(second, zip, zipper.OutputName, zipper.SecondInputName); err != nil {
log.Panic(err)
}

fakeInput := modules.NewInput("fake")
if err := zip.AttachTo(zipper.OutputName, fakeInput); err != nil {
zipOutput, err := zip.Output(zipper.OutputName)
if err != nil {
log.Panic(err)
}
zipOutput.Attach(fakeInput)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down
4 changes: 0 additions & 4 deletions pkg/modules/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,3 @@ gRPC module where realized default client and server. Detailed docs can be found
### Cron

Cron module implements cron scheduler. Detailed docs can be found [here](/pkg/modules/cron/).

### Duplicator

Duplicator module repeats signal from one of inputs to all outputs. Detailed docs can be found [here](/pkg/modules/duplicator/).
81 changes: 81 additions & 0 deletions pkg/modules/baseModule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package modules

import (
"context"
"github.com/dipdup-io/workerpool"
"github.com/dipdup-net/indexer-sdk/pkg/sync"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var _ Module = (*BaseModule)(nil)

type BaseModule struct {
name string
inputs *sync.Map[string, *Input]
outputs *sync.Map[string, *Output]
Log zerolog.Logger
G workerpool.Group
}

func New(name string) BaseModule {
m := BaseModule{
name: name,
inputs: sync.NewMap[string, *Input](),
outputs: sync.NewMap[string, *Output](),
Log: log.With().Str("module", name).Logger(),
G: workerpool.NewGroup(),
}

return m
}

func (m *BaseModule) Name() string {
return m.name
}

func (*BaseModule) Start(_ context.Context) {}

func (*BaseModule) Close() error {
return nil
}

func (m *BaseModule) Input(name string) (*Input, error) {
input, ok := m.inputs.Get(name)
if !ok {
return nil, errors.Wrap(ErrUnknownInput, name)
}
return input, nil
}

func (m *BaseModule) CreateInput(name string) {
m.inputs.Set(name, NewInput(name))
}

func (m *BaseModule) Output(name string) (*Output, error) {
output, ok := m.outputs.Get(name)
if !ok {
return nil, errors.Wrap(ErrUnknownOutput, name)
}
return output, nil
}

func (m *BaseModule) CreateOutput(name string) {
m.outputs.Set(name, NewOutput(name))
}

func (m *BaseModule) AttachTo(outputModule Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}

input, err := m.Input(inputName)
if err != nil {
return err
}

outputChannel.Attach(input)
return nil
}
88 changes: 88 additions & 0 deletions pkg/modules/baseModule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package modules

import (
"github.com/dipdup-net/indexer-sdk/pkg/sync"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBaseModule_ExistingInput(t *testing.T) {
bm := New("module")
existingChannelName := "input-channel"
bm.CreateInput(existingChannelName)

// Act
input, err := bm.Input(existingChannelName)
assert.NoError(t, err)
assert.Equal(t, existingChannelName, input.Name())
}

func TestBaseModule_NonExistingInput(t *testing.T) {
bm := New("module")
nonExistingChannelName := "non-existing-input-channel"

// Act
input, err := bm.Input(nonExistingChannelName)
assert.ErrorIs(t, err, ErrUnknownInput)
assert.Errorf(t, err, "%s: %s", ErrUnknownInput.Error(), nonExistingChannelName)
assert.Nil(t, input)
}

func TestBaseModule_ExistingOutput(t *testing.T) {
bm := New("module")
existingChannelName := "output-channel"
bm.CreateOutput(existingChannelName)

// Act
output, err := bm.Output(existingChannelName)
assert.NoError(t, err)
assert.Equal(t, existingChannelName, output.Name())
}

func TestBaseModule_NonExistingOutput(t *testing.T) {
bm := New("module")
nonExistingChannelName := "non-existing-output-channel"

// Act
output, err := bm.Output(nonExistingChannelName)
assert.ErrorIs(t, err, ErrUnknownOutput)
assert.Errorf(t, err, "%s: %s", ErrUnknownOutput.Error(), nonExistingChannelName)
assert.Nil(t, output)
}

func TestBaseModule_AttachToOnExistingChannel(t *testing.T) {
bmSrc := &BaseModule{outputs: sync.NewMap[string, *Output]()}
bmDst := &BaseModule{inputs: sync.NewMap[string, *Input]()}
inputName := "data-in"
outputName := "data-out"

bmSrc.CreateOutput(outputName)
bmDst.CreateInput(inputName)

input, err := bmDst.Input(inputName)
assert.NoError(t, err)

err = bmDst.AttachTo(bmSrc, outputName, inputName)
assert.NoError(t, err)

output, err := bmSrc.Output(outputName)
assert.NoError(t, err)

output.Push("hello")

msg := <-input.Listen()
assert.Equal(t, "hello", msg)

err = bmSrc.Close()
assert.NoError(t, err)

err = bmDst.Close()
assert.NoError(t, err)
}

func TestBaseModule_ReturnsCorrectName(t *testing.T) {
bm := New("module")

name := bm.Name()
assert.Equal(t, "module", name)
}
4 changes: 2 additions & 2 deletions pkg/modules/cron/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ So if your module should execute some work on `every_second` scheduled events fr
```go
// with helper function

if err := modules.Connect(cronModule, customModule, "every_second", "every_second"); err != nil {
if err := modules.Connect(cronModule, customModule, "every_second", "custom_module_every_second_input"); err != nil {
log.Panic(err)
}

// or directly to module

if err := cronModule.AttachTo("every_second", customModule.everySecond); err != nil {
if err := customModule.AttachTo(cronModule, "every_second", "custom_module_every_second_input"); err != nil {
log.Panic(err)
}
```
Expand Down
Loading
Loading