From 1385889f0fe8d602b5fd0e3ed2a0b374003b85a6 Mon Sep 17 00:00:00 2001 From: Artem Date: Sat, 2 Sep 2023 21:26:59 +0400 Subject: [PATCH] Refactoring: typing input and outputs --- examples/cron/custom.go | 47 ++-------- examples/cron/main.go | 5 +- examples/duplicator/main.go | 68 --------------- examples/grpc/client.go | 54 +++--------- examples/grpc/custom.go | 42 +++------ examples/grpc/main.go | 6 +- examples/grpc/server.go | 11 +-- examples/zipper/custom.go | 46 +++------- examples/zipper/main.go | 32 +++---- examples/zipper/printer.go | 63 ++++++++++++++ go.mod | 2 +- go.sum | 2 + pkg/modules/README.md | 40 ++++----- pkg/modules/cron/README.md | 45 +++++----- pkg/modules/cron/module.go | 40 ++------- pkg/modules/cron/register_test.go | 13 +++ pkg/modules/duplicator/README.md | 58 ------------- pkg/modules/duplicator/functions.go | 13 --- pkg/modules/duplicator/module.go | 127 ---------------------------- pkg/modules/grpc/README.md | 34 ++------ pkg/modules/grpc/client.go | 20 ++--- pkg/modules/input.go | 36 -------- pkg/modules/module.go | 5 -- pkg/modules/output.go | 54 +++++++----- pkg/modules/printer/module.go | 77 +++++------------ pkg/modules/reflect.go | 64 ++++++++++++++ pkg/modules/reflect_test.go | 92 ++++++++++++++++++++ pkg/modules/registry.go | 33 ++++++++ pkg/modules/workflow.go | 42 ++++----- pkg/modules/zipper/README.md | 21 +++-- pkg/modules/zipper/data.go | 10 +-- pkg/modules/zipper/module.go | 93 ++++++-------------- pkg/modules/zipper/register_test.go | 13 +++ 33 files changed, 524 insertions(+), 784 deletions(-) delete mode 100644 examples/duplicator/main.go create mode 100644 examples/zipper/printer.go create mode 100644 pkg/modules/cron/register_test.go delete mode 100644 pkg/modules/duplicator/README.md delete mode 100644 pkg/modules/duplicator/functions.go delete mode 100644 pkg/modules/duplicator/module.go delete mode 100644 pkg/modules/input.go create mode 100644 pkg/modules/reflect.go create mode 100644 pkg/modules/reflect_test.go create mode 100644 pkg/modules/registry.go create mode 100644 pkg/modules/zipper/register_test.go diff --git a/examples/cron/custom.go b/examples/cron/custom.go index 12844c6..03e2f40 100644 --- a/examples/cron/custom.go +++ b/examples/cron/custom.go @@ -4,15 +4,13 @@ import ( "context" "sync" - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" + "github.com/dipdup-net/indexer-sdk/pkg/modules/cron" "github.com/rs/zerolog/log" ) // CustomModule - type CustomModule struct { - everySecond *modules.Input - everyFiveSecond *modules.Input + Messages chan cron.Message wg *sync.WaitGroup } @@ -20,9 +18,8 @@ type CustomModule struct { // NewCustomModule - func NewCustomModule() *CustomModule { return &CustomModule{ - everySecond: modules.NewInput("every_second"), - everyFiveSecond: modules.NewInput("every_five_second"), - wg: new(sync.WaitGroup), + Messages: make(chan cron.Message, 16), + wg: new(sync.WaitGroup), } } @@ -32,28 +29,6 @@ func (m *CustomModule) Start(ctx context.Context) { go m.listen(ctx) } -// Input - -func (m *CustomModule) Input(name string) (*modules.Input, error) { - switch name { - case "every_second": - return m.everySecond, nil - case "every_five_second": - return m.everyFiveSecond, nil - default: - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } -} - -// Output - -func (m *CustomModule) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// AttachTo - -func (m *CustomModule) AttachTo(name string, input *modules.Input) error { - return errors.Wrap(modules.ErrUnknownOutput, name) -} - func (m *CustomModule) listen(ctx context.Context) { defer m.wg.Done() @@ -61,10 +36,8 @@ func (m *CustomModule) listen(ctx context.Context) { select { case <-ctx.Done(): return - case <-m.everySecond.Listen(): - log.Info().Msg("arrived from cron module") - case <-m.everyFiveSecond.Listen(): - log.Info().Msg("arrived from cron module") + case msg := <-m.Messages: + log.Info().Str("job", msg.Job).Msg("arrived from cron module") } } } @@ -73,13 +46,7 @@ func (m *CustomModule) listen(ctx context.Context) { func (m *CustomModule) Close() error { m.wg.Wait() - if err := m.everyFiveSecond.Close(); err != nil { - return err - } - if err := m.everySecond.Close(); err != nil { - return err - } - + close(m.Messages) return nil } diff --git a/examples/cron/main.go b/examples/cron/main.go index 863ec5e..b53a22e 100644 --- a/examples/cron/main.go +++ b/examples/cron/main.go @@ -25,10 +25,11 @@ func main() { } customModule := NewCustomModule() - if err := modules.Connect(cronModule, customModule, "every_second", "every_second"); err != nil { + if err := modules.Register(cronModule, customModule); err != nil { log.Panic(err) } - if err := modules.Connect(cronModule, customModule, "every_five_second", "every_five_second"); err != nil { + + if err := modules.Connect(cronModule.Name(), customModule.Name(), "Output", "Messages"); err != nil { log.Panic(err) } diff --git a/examples/duplicator/main.go b/examples/duplicator/main.go deleted file mode 100644 index cfde062..0000000 --- a/examples/duplicator/main.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/dipdup-net/indexer-sdk/pkg/modules/cron" - "github.com/dipdup-net/indexer-sdk/pkg/modules/duplicator" - "github.com/dipdup-net/indexer-sdk/pkg/modules/printer" -) - -func main() { - cronModule1, err := cron.NewModule(&cron.Config{ - Jobs: map[string]string{ - "ticker": "@every 5s", - }, - }) - if err != nil { - log.Panic(err) - } - cronModule2, err := cron.NewModule(&cron.Config{ - Jobs: map[string]string{ - "ticker": "* * * * * *", - }, - }) - if err != nil { - log.Panic(err) - } - - dup := duplicator.NewDuplicator(2, 1) - - print := printer.NewPrinter() - - if err := modules.Connect(cronModule1, dup, "ticker", duplicator.GetInputName(0)); err != nil { - log.Panic(err) - } - if err := modules.Connect(cronModule2, dup, "ticker", duplicator.GetInputName(1)); err != nil { - log.Panic(err) - } - if err := modules.Connect(dup, print, duplicator.GetOutputName(0), printer.InputName); err != nil { - log.Panic(err) - } - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - print.Start(ctx) - dup.Start(ctx) - cronModule2.Start(ctx) - cronModule1.Start(ctx) - - <-ctx.Done() - cancel() - - if err := cronModule1.Close(); err != nil { - log.Print(err) - } - if err := cronModule2.Close(); err != nil { - log.Print(err) - } - if err := dup.Close(); err != nil { - log.Print(err) - } - if err := print.Close(); err != nil { - log.Print(err) - } -} diff --git a/examples/grpc/client.go b/examples/grpc/client.go index bd6bbaf..0664379 100644 --- a/examples/grpc/client.go +++ b/examples/grpc/client.go @@ -2,13 +2,12 @@ package main import ( "context" - "sync" + "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/examples/grpc/pb" "github.com/dipdup-net/indexer-sdk/pkg/modules" "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb" - "github.com/pkg/errors" "github.com/rs/zerolog/log" ) @@ -16,19 +15,19 @@ import ( type Client struct { *grpc.Client - output *modules.Output - stream *grpc.Stream[*pb.Response] + Output *modules.Output[string] + stream *grpc.Stream[pb.Response] client pb.TimeServiceClient - wg *sync.WaitGroup + g workerpool.Group } // NewClient - func NewClient(server string) *Client { return &Client{ Client: grpc.NewClient(server), - output: modules.NewOutput("time"), - wg: new(sync.WaitGroup), + Output: modules.NewOutput[string](), + g: workerpool.NewGroup(), } } @@ -36,8 +35,7 @@ func NewClient(server string) *Client { func (client *Client) Start(ctx context.Context) { client.client = pb.NewTimeServiceClient(client.Connection()) - client.wg.Add(1) - go client.reconnect(ctx) + client.g.GoCtx(ctx, client.reconnect) } // SubscribeOnTime - @@ -46,17 +44,13 @@ func (client *Client) SubscribeOnTime(ctx context.Context) (uint64, error) { if err != nil { return 0, err } - client.stream = grpc.NewStream[*pb.Response](stream) - - client.wg.Add(1) - go client.handleTime(ctx) + client.stream = grpc.NewStream[pb.Response](stream) + client.g.GoCtx(ctx, client.handleTime) return client.stream.Subscribe(ctx) } func (client *Client) reconnect(ctx context.Context) { - defer client.wg.Done() - for { select { case <-ctx.Done(): @@ -77,14 +71,13 @@ func (client *Client) reconnect(ctx context.Context) { } func (client *Client) handleTime(ctx context.Context) { - defer client.wg.Done() - for { select { case <-ctx.Done(): return + case msg := <-client.stream.Listen(): - client.output.Push(msg) + client.Output.Push(msg.Time) } } } @@ -99,32 +92,9 @@ func (client *Client) UnsubscribeFromTime(ctx context.Context, id uint64) error return nil } -// Input - -func (client *Client) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// Output - -func (client *Client) Output(name string) (*modules.Output, error) { - if name != "time" { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return client.output, nil -} - -// AttachTo - -func (client *Client) AttachTo(name string, input *modules.Input) error { - output, err := client.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - // Close - func (client *Client) Close() error { - client.wg.Wait() + client.g.Wait() if client.stream != nil { if err := client.stream.Close(); err != nil { diff --git a/examples/grpc/custom.go b/examples/grpc/custom.go index 190dd18..4bf0571 100644 --- a/examples/grpc/custom.go +++ b/examples/grpc/custom.go @@ -3,60 +3,37 @@ package main import ( "context" "encoding/json" - "sync" - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" + "github.com/dipdup-io/workerpool" "github.com/rs/zerolog/log" ) // CustomModule - type CustomModule struct { - input *modules.Input + Input chan string - wg *sync.WaitGroup + g workerpool.Group } // NewCustomModule - func NewCustomModule() *CustomModule { return &CustomModule{ - input: modules.NewInput("input"), - wg: new(sync.WaitGroup), + Input: make(chan string), + g: workerpool.NewGroup(), } } // Start - func (m *CustomModule) Start(ctx context.Context) { - m.wg.Add(1) - go m.listen(ctx) -} - -// Input - -func (m *CustomModule) Input(name string) (*modules.Input, error) { - if name != "input" { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return m.input, nil -} - -// Output - -func (m *CustomModule) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// AttachTo - -func (m *CustomModule) AttachTo(name string, input *modules.Input) error { - return errors.Wrap(modules.ErrUnknownOutput, name) + m.g.GoCtx(ctx, m.listen) } func (m *CustomModule) listen(ctx context.Context) { - defer m.wg.Done() - for { select { case <-ctx.Done(): return - case msg := <-m.input.Listen(): + case msg := <-m.Input: b, _ := json.Marshal(msg) log.Info().Str("msg", string(b)).Msg("arrived from grpc module") } @@ -65,8 +42,9 @@ func (m *CustomModule) listen(ctx context.Context) { // Close - func (m *CustomModule) Close() error { - m.wg.Wait() - return m.input.Close() + m.g.Wait() + close(m.Input) + return nil } // Name - diff --git a/examples/grpc/main.go b/examples/grpc/main.go index 5b06cd3..a4d2c99 100644 --- a/examples/grpc/main.go +++ b/examples/grpc/main.go @@ -55,7 +55,11 @@ func main() { // creating custom module which receives notification from client and log it to console. module := NewCustomModule() - if err := modules.Connect(client, module, "time", "input"); err != nil { + if err := modules.Register(server, client, module); err != nil { + log.Panic().Err(err).Msg("register modules") + } + + if err := modules.Connect(client.Name(), module.Name(), "Output", "Input"); err != nil { log.Panic().Err(err).Msg("module connection error") return } diff --git a/examples/grpc/server.go b/examples/grpc/server.go index b3440f7..994f480 100644 --- a/examples/grpc/server.go +++ b/examples/grpc/server.go @@ -2,9 +2,9 @@ package main import ( "context" - "sync" "time" + "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/examples/grpc/pb" "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb" @@ -17,7 +17,7 @@ type Server struct { subscriptions *grpc.Subscriptions[time.Time, *pb.Response] - wg *sync.WaitGroup + g workerpool.Group } // NewServer - @@ -30,7 +30,7 @@ func NewServer(cfg *grpc.ServerConfig) (*Server, error) { return &Server{ Server: server, subscriptions: grpc.NewSubscriptions[time.Time, *pb.Response](), - wg: new(sync.WaitGroup), + g: workerpool.NewGroup(), }, nil } @@ -40,13 +40,10 @@ func (server *Server) Start(ctx context.Context) { server.Server.Start(ctx) - server.wg.Add(1) - go server.listen(ctx) + server.g.GoCtx(ctx, server.listen) } func (server *Server) listen(ctx context.Context) { - defer server.wg.Done() - ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() diff --git a/examples/zipper/custom.go b/examples/zipper/custom.go index fb607b9..0bb636b 100644 --- a/examples/zipper/custom.go +++ b/examples/zipper/custom.go @@ -2,59 +2,38 @@ package main import ( "context" - "sync" "time" + "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" + "github.com/dipdup-net/indexer-sdk/pkg/modules/zipper" ) // CustomModule - type CustomModule struct { + Output *modules.Output[zipper.Zippable[int]] + additional int startValue int name string - output *modules.Output - wg *sync.WaitGroup + g workerpool.Group } // NewCustomModule - func NewCustomModule(startValue, additional int, name string) *CustomModule { return &CustomModule{ + Output: modules.NewOutput[zipper.Zippable[int]](), + additional: additional, startValue: startValue, name: name, - output: modules.NewOutput("output"), - wg: new(sync.WaitGroup), - } -} - -// Input - -func (m *CustomModule) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// Output - -func (m *CustomModule) Output(name string) (*modules.Output, error) { - if name != "output" { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return m.output, nil -} - -// AttachTo - -func (m *CustomModule) AttachTo(name string, input *modules.Input) error { - output, err := m.Output(name) - if err != nil { - return err + g: workerpool.NewGroup(), } - output.Attach(input) - return nil } // Close - func (m *CustomModule) Close() error { - m.wg.Wait() + m.g.Wait() return nil } @@ -65,13 +44,10 @@ func (m *CustomModule) Name() string { // Start - func (m *CustomModule) Start(ctx context.Context) { - m.wg.Add(1) - go m.generate(ctx) + m.g.GoCtx(ctx, m.generate) } func (m *CustomModule) generate(ctx context.Context) { - defer m.wg.Done() - ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -82,7 +58,7 @@ func (m *CustomModule) generate(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - m.output.Push(data) + m.Output.Push(data) data.key += m.additional } } diff --git a/examples/zipper/main.go b/examples/zipper/main.go index 0622105..e62e05d 100644 --- a/examples/zipper/main.go +++ b/examples/zipper/main.go @@ -6,48 +6,38 @@ import ( "time" "github.com/dipdup-net/indexer-sdk/pkg/modules" + "github.com/dipdup-net/indexer-sdk/pkg/modules/printer" "github.com/dipdup-net/indexer-sdk/pkg/modules/zipper" ) func main() { zip := zipper.NewModule[int]() - first := NewCustomModule(10, -1, "first") second := NewCustomModule(0, 1, "second") + printerModule := NewPrinter() - if err := modules.Connect(first, zip, "output", zipper.FirstInputName); err != nil { + if err := modules.Register(zip, first, second, printerModule); err != nil { log.Panic(err) } - if err := modules.Connect(second, zip, "output", zipper.SecondInputName); err != nil { + + if err := modules.Connect("first", zip.Name(), "Output", zipper.FirstInputName); err != nil { log.Panic(err) } - - fakeInput := modules.NewInput("fake") - if err := zip.AttachTo(zipper.OutputName, fakeInput); err != nil { + if err := modules.Connect("second", zip.Name(), "Output", zipper.SecondInputName); err != nil { + log.Panic(err) + } + if err := modules.Connect(zip.Name(), printerModule.Name(), zipper.OutputName, printer.InputName); err != nil { log.Panic(err) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() + printerModule.Start(ctx) zip.Start(ctx) second.Start(ctx) first.Start(ctx) - go func() { - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-fakeInput.Listen(): - if !ok { - return - } - log.Println(msg) - } - } - }() - <-ctx.Done() if err := first.Close(); err != nil { @@ -59,7 +49,7 @@ func main() { if err := zip.Close(); err != nil { log.Panic(err) } - if err := fakeInput.Close(); err != nil { + if err := printerModule.Close(); err != nil { log.Panic(err) } } diff --git a/examples/zipper/printer.go b/examples/zipper/printer.go new file mode 100644 index 0000000..b27fa47 --- /dev/null +++ b/examples/zipper/printer.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "fmt" + + "github.com/dipdup-io/workerpool" + "github.com/dipdup-net/indexer-sdk/pkg/modules/zipper" + "github.com/rs/zerolog/log" +) + +// predefined constants +const ( + ModuleName = "printer" + InputName = "Input" +) + +// Printer - the structure which is responsible for print received messages +type Printer struct { + Input chan *zipper.Result[int] + + g workerpool.Group +} + +// NewPrinter - constructor of printer structure +func NewPrinter() *Printer { + return &Printer{ + Input: make(chan *zipper.Result[int], 16), + g: workerpool.NewGroup(), + } +} + +// Name - +func (printer *Printer) Name() string { + return ModuleName +} + +// Close - gracefully stops module +func (printer *Printer) Close() error { + printer.g.Wait() + + close(printer.Input) + return nil +} + +// Start - starts module +func (printer *Printer) Start(ctx context.Context) { + printer.g.GoCtx(ctx, printer.listen) +} + +func (printer *Printer) listen(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-printer.Input: + if !ok { + return + } + log.Info().Str("obj_type", fmt.Sprintf("%T", msg)).Msgf("%##v", msg) + } + } +} diff --git a/go.mod b/go.mod index d4db251..51032f3 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,8 @@ module github.com/dipdup-net/indexer-sdk go 1.20 - require ( + github.com/dipdup-io/workerpool v0.0.4 github.com/dipdup-net/go-lib v0.3.0 github.com/ethereum/go-ethereum v1.12.0 github.com/go-testfixtures/testfixtures/v3 v3.9.0 diff --git a/go.sum b/go.sum index e1cd2b2..ad68b9d 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= +github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s= +github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= github.com/dipdup-net/go-lib v0.3.0 h1:56OImCLIHyLL4Da7UI5H2xyTP65i2FUXSO6Gyxll460= github.com/dipdup-net/go-lib v0.3.0/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= diff --git a/pkg/modules/README.md b/pkg/modules/README.md index fb9fbd7..132f7bb 100644 --- a/pkg/modules/README.md +++ b/pkg/modules/README.md @@ -12,9 +12,6 @@ type Module interface { Name() string Start(ctx context.Context) - Input(name string) (*Input, error) - Output(name string) (*Output, error) - AttachTo(outputName string, input *Input) error } ``` @@ -23,34 +20,31 @@ Interface contains following methods: * `Start` - starts asynchronous waiting of messages in inputs and initialize module state. * `Close` - gracefully stops all module activities (inherited from `io.Closer` interface). * `Name` - returns name of module which will be used in workflow construction. - * `Input` - returns input by its name. - * `Output` - returns output by its name. - * `AttachTo` - connects output with name to passed input of another module. + +You have to register all usig modules with function `Register`. It applies modules array as arguments. + +```go +func Register(modules ...Module) error +``` + +Package contains global registry of module to manage connections between them. ## Inputs and outputs -All communication between modules is implemented via inputs/outputs. Input is the structure contains channel with `any` as data. It also has name which will be its identity in module's scope. +All communication between modules is implemented via inputs/outputs. Input is the public field type of channel. All such field will be registered as input. Following exemple contains 2 inputs: `Input1` and `Input2`. ```go -type Input struct { - data chan any - name string +type Module struct { + Input1 chan int + Input2 chan int } ``` -It has following methods: - -* `Close` - closes channel of input -* `Push` - sends message to channel -* `Listen` - waits new message -* `Name` - returns input name Output is the set of inputs which connected to it. When module send message to output it iterates over all connected inputs and pushes message to them. Output also has name which identifies it. ```go type Output struct { - connectedInputs []*Input - name string - + connectedInputs []chan T mx sync.RWMutex } ``` @@ -65,10 +59,10 @@ It has following methods: SDK has helper function `Connect`: ```go -func Connect(outputModule, inputModule Module, outputName, inputName string) error +func Connect(outputModuleName, inputModuleName, outputName, inputName string) error ``` -The function receives `outputModule` and `inputModule`: modules which will be connected. Also it receives input and output names in that modules which will be connected. +The function receives `inputModuleName` and `outputModuleName`: modules which will be connected. Also it receives input and output names in that modules which will be connected. ## Workflow @@ -97,7 +91,3 @@ gRPC module where realized default client and server. Detailed docs can be found ### Cron Cron module implements cron scheduler. Detailed docs can be found [here](/pkg/modules/cron/). - -### Duplicator - -Duplicator module repeats signal from one of inputs to all outputs. Detailed docs can be found [here](/pkg/modules/duplicator/). \ No newline at end of file diff --git a/pkg/modules/cron/README.md b/pkg/modules/cron/README.md index 050aaef..1c77d46 100644 --- a/pkg/modules/cron/README.md +++ b/pkg/modules/cron/README.md @@ -20,6 +20,12 @@ cronModule, err := cron.NewModule(cfg.Cron) if err != nil { log.Panic(err) } + +// register module +if err := modules.Register(cronModule, customModule); err != nil { + log.Panic(err) +} + // start cron module cronModule.Start(ctx) @@ -46,35 +52,32 @@ cron: ## Output -Module sends to its outputs empty struct which notifies all connected modules about scheduled event. Each job of cron module has own output with names pointed in configuration file. So if your module should execute some work on `every_second` scheduled events from example you should connect it: +Module sends to its output struct `Message` which notifies all connected modules about scheduled event. Struct contains job name which initiated event. ```go -// with helper function - -if err := modules.Connect(cronModule, customModule, "every_second", "every_second"); err != nil { - log.Panic(err) +type Message struct { + Job string } +``` -// or directly to module +You can connect module with following code: -if err := cronModule.AttachTo("every_second", customModule.everySecond); err != nil { +```go +// with helper function +if err := modules.Connect(cronModule, customModule, "Output", "Messages"); err != nil { log.Panic(err) } ``` -Example of handling message from cron's outputs: +Example of handling message from cron's output: ```go -for { - select { - case <-ctx.Done(): - return - case <-m.everySecond.Listen(): - log.Info().Msg("arrived from cron module") - case <-m.everyFiveSecond.Listen(): - log.Info().Msg("arrived from cron module") - } -} -``` - -`everySecond` and `everyFiveSecond` are inputs of your modules. \ No newline at end of file + for { + select { + case <-ctx.Done(): + return + case msg := <-m.Messages: + log.Info().Str("job", msg.Job).Msg("arrived from cron module") + } + } +``` \ No newline at end of file diff --git a/pkg/modules/cron/module.go b/pkg/modules/cron/module.go index 64ff80e..a8026e0 100644 --- a/pkg/modules/cron/module.go +++ b/pkg/modules/cron/module.go @@ -4,15 +4,18 @@ import ( "context" "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" "github.com/robfig/cron/v3" ) +type Message struct { + Job string +} + // Module - cron module type Module struct { cron *cron.Cron - outputs map[string]*modules.Output + Output *modules.Output[Message] } // NewModule - creates cron module @@ -24,11 +27,9 @@ func NewModule(cfg *Config) (*Module, error) { )), // cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger)), ), - outputs: make(map[string]*modules.Output), + Output: modules.NewOutput[Message](), } for job, pattern := range cfg.Jobs { - module.outputs[job] = modules.NewOutput(job) - if _, err := module.cron.AddFunc( pattern, newHandler(module, job, pattern), @@ -56,33 +57,10 @@ func (module *Module) Close() error { return nil } -// Output - -func (module *Module) Output(name string) (*modules.Output, error) { - output, ok := module.outputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return output, nil -} - -// Input - -func (module *Module) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// AttachTo - -func (module *Module) AttachTo(name string, input *modules.Input) error { - output, err := module.Output(name) - if err != nil { - return err - } - - output.Attach(input) - return nil -} - func (module *Module) notify(job, pattern string) { - module.outputs[job].Push(struct{}{}) + module.Output.Push(Message{ + Job: job, + }) } func newHandler(module *Module, job, pattern string) func() { diff --git a/pkg/modules/cron/register_test.go b/pkg/modules/cron/register_test.go new file mode 100644 index 0000000..380f292 --- /dev/null +++ b/pkg/modules/cron/register_test.go @@ -0,0 +1,13 @@ +package cron + +import ( + "testing" + + "github.com/dipdup-net/indexer-sdk/pkg/modules" + "github.com/stretchr/testify/require" +) + +func TestRegisterModule(t *testing.T) { + err := modules.Register(&Module{}) + require.NoError(t, err) +} diff --git a/pkg/modules/duplicator/README.md b/pkg/modules/duplicator/README.md deleted file mode 100644 index ddbe956..0000000 --- a/pkg/modules/duplicator/README.md +++ /dev/null @@ -1,58 +0,0 @@ -# Duplicator - -Duplicator module repeats signal from one of inputs to all outputs - -## Usage - -Usage of duplicator module is described by the [example](/examples/duplicator/). - -To import module in your code write following line: - -```go -import "github.com/dipdup-net/indexer-sdk/pkg/modules/duplicator" -``` - -Duplicator module implements interface `Module`. So you can use it like any other module. For example: - -```go -// create duplicator module with 2 inputs and 2 outputs. Any signal from one of 2 output will be passed to 2 outputs. -duplicatorModule, err := duplicator.NewModule(2, 2) -if err != nil { - log.Panic(err) -} -// start duplicator module -duplicatorModule.Start(ctx) - -// your code is here - -// close duplicator module -if err := duplicatorModule.Close(); err != nil { - log.Panic(err) -} -``` - -## Input - -Module reply to its outputs all signals from inputs. You should connect it with next code: - -```go -// with helper function. You should pass a input number to function GetInputName for receiving input name - -if err := modules.Connect(customModule, dup, "your_output_of_module", duplicator.GetInputName(0)); err != nil { - log.Panic(err) -} - -``` - -## Output - -You should connect it with next code: - -```go -// with helper function. You should pass a output number to function GetOutputName for receiving output name - -if err := modules.Connect(dup, customModule, duplicator.GetOutputName(0), "your_output_of_module"); err != nil { - log.Panic(err) -} - -``` diff --git a/pkg/modules/duplicator/functions.go b/pkg/modules/duplicator/functions.go deleted file mode 100644 index c9a1526..0000000 --- a/pkg/modules/duplicator/functions.go +++ /dev/null @@ -1,13 +0,0 @@ -package duplicator - -import "fmt" - -// GetInputName - -func GetInputName(idx int) string { - return fmt.Sprintf("%s_%d", InputName, idx) -} - -// GetOutputName - -func GetOutputName(idx int) string { - return fmt.Sprintf("%s_%d", OutputName, idx) -} diff --git a/pkg/modules/duplicator/module.go b/pkg/modules/duplicator/module.go deleted file mode 100644 index 3b88d87..0000000 --- a/pkg/modules/duplicator/module.go +++ /dev/null @@ -1,127 +0,0 @@ -package duplicator - -import ( - "context" - "reflect" - "sync" - - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" -) - -// names -const ( - InputName = "input" - OutputName = "output" -) - -// Duplicator - the structure which is responsible for duplicate signal from one of inputs to all outputs -type Duplicator struct { - inputs map[string]*modules.Input - outputs map[string]*modules.Output - - wg *sync.WaitGroup -} - -// NewDuplicator - constructor of Duplicator structure -func NewDuplicator(inputsCount, outputsCount int) *Duplicator { - d := &Duplicator{ - inputs: make(map[string]*modules.Input), - outputs: make(map[string]*modules.Output), - - wg: new(sync.WaitGroup), - } - - for i := 0; i < inputsCount; i++ { - name := GetInputName(i) - d.inputs[name] = modules.NewInput(name) - } - - for i := 0; i < outputsCount; i++ { - name := GetOutputName(i) - d.outputs[name] = modules.NewOutput(name) - } - - return d -} - -// Name - -func (duplicator *Duplicator) Name() string { - return "duplicator" -} - -// Input - returns input by name -func (duplicator *Duplicator) Input(name string) (*modules.Input, error) { - input, ok := duplicator.inputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return input, nil -} - -// Output - returns output by name -func (duplicator *Duplicator) Output(name string) (*modules.Output, error) { - output, ok := duplicator.outputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return output, nil -} - -// AttachTo - attach input to output with name -func (duplicator *Duplicator) AttachTo(name string, input *modules.Input) error { - output, err := duplicator.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - -// Close - gracefully stops module -func (duplicator *Duplicator) Close() error { - duplicator.wg.Wait() - - for _, i := range duplicator.inputs { - if err := i.Close(); err != nil { - return err - } - } - - return nil -} - -// Start - starts module -func (duplicator *Duplicator) Start(ctx context.Context) { - duplicator.wg.Add(1) - go duplicator.listen(ctx) -} - -func (duplicator *Duplicator) listen(ctx context.Context) { - defer duplicator.wg.Done() - - cases := []reflect.SelectCase{ - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, - } - for _, input := range duplicator.inputs { - cases = append(cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(input.Listen()), - }, - ) - } - - for { - _, value, ok := reflect.Select(cases) - if !ok { - return - } - for _, output := range duplicator.outputs { - output.Push(value.Interface()) - } - } -} diff --git a/pkg/modules/grpc/README.md b/pkg/modules/grpc/README.md index f6571c5..f5b26e7 100644 --- a/pkg/modules/grpc/README.md +++ b/pkg/modules/grpc/README.md @@ -174,32 +174,7 @@ func (client *Client) UnsubscribeFromTime(ctx context.Context, id uint64) error `handleTime` is a hadler which called on receiving new event from server. -Also you need implement `Module` interface. It's described [here](/pkg/modules/). For example: - -```go -// Input - -func (client *Client) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// Output - -func (client *Client) Output(name string) (*modules.Output, error) { - if name != "time" { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return client.output, nil -} - -// AttachTo - -func (client *Client) AttachTo(name string, input *modules.Input) error { - output, err := client.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} -``` +Also you need register your module. ### Inputs ans outputs @@ -267,7 +242,11 @@ func main() { // creating custom module which receives notification from client and log it to console. module := NewCustomModule() - if err := modules.Connect(client, module, "time", "input"); err != nil { + if err := modules.Register(server, client, module); err != nil { + log.Panic().Err(err).Msg("register modules") + } + + if err := modules.Connect(client.Name(), module.Name(), "Output", "Input"); err != nil { log.Panic().Err(err).Msg("module connection error") return } @@ -295,4 +274,5 @@ func main() { log.Panic().Err(err).Msg("closing server error") } } + ``` \ No newline at end of file diff --git a/pkg/modules/grpc/client.go b/pkg/modules/grpc/client.go index 1284547..b0d9366 100644 --- a/pkg/modules/grpc/client.go +++ b/pkg/modules/grpc/client.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -24,7 +25,7 @@ type Client struct { serverAddress string reconnect chan struct{} - wg *sync.WaitGroup + g workerpool.Group } // NewClient - constructor of client structure @@ -32,7 +33,7 @@ func NewClient(server string) *Client { return &Client{ serverAddress: server, reconnect: make(chan struct{}, 1), - wg: new(sync.WaitGroup), + g: workerpool.NewGroup(), } } @@ -86,8 +87,7 @@ func (client *Client) Connect(ctx context.Context, opts ...ConnectOption) error // Start - starts authentication client module func (client *Client) Start(ctx context.Context) { - client.wg.Add(1) - go client.checkConnectionState(ctx) + client.g.GoCtx(ctx, client.checkConnectionState) } // Reconnect - returns channel with reconnection events @@ -97,7 +97,7 @@ func (client *Client) Reconnect() <-chan struct{} { // Close - closes authentication client module func (client *Client) Close() error { - client.wg.Wait() + client.g.Wait() if err := client.conn.Close(); err != nil { return err @@ -107,8 +107,6 @@ func (client *Client) Close() error { } func (client *Client) checkConnectionState(ctx context.Context) { - defer client.wg.Done() - ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() @@ -143,7 +141,7 @@ func (client *Client) Connection() *grpc.ClientConn { // Stream - type Stream[T any] struct { stream grpc.ClientStream - data chan *T + data chan T wg *sync.WaitGroup } @@ -152,7 +150,7 @@ type Stream[T any] struct { func NewStream[T any](stream grpc.ClientStream) *Stream[T] { return &Stream[T]{ stream: stream, - data: make(chan *T, 1024), + data: make(chan T, 1024), wg: new(sync.WaitGroup), } @@ -172,7 +170,7 @@ func (s *Stream[T]) Subscribe(ctx context.Context) (uint64, error) { } // Listen - channel with received messages -func (s *Stream[T]) Listen() <-chan *T { +func (s *Stream[T]) Listen() <-chan T { return s.data } @@ -195,7 +193,7 @@ func (s *Stream[T]) listen(ctx context.Context, id uint64) { log.Err(err).Msg("receiving subscription error") return default: - s.data <- &msg + s.data <- msg } } } diff --git a/pkg/modules/input.go b/pkg/modules/input.go deleted file mode 100644 index 7b32625..0000000 --- a/pkg/modules/input.go +++ /dev/null @@ -1,36 +0,0 @@ -package modules - -// Input - -type Input struct { - data chan any - name string -} - -// NewInput - -func NewInput(name string) *Input { - return &Input{ - data: make(chan any, 1024), - name: name, - } -} - -// Close - -func (input *Input) Close() error { - close(input.data) - return nil -} - -// Push - -func (input *Input) Push(msg any) { - input.data <- msg -} - -// Listen - -func (input *Input) Listen() <-chan any { - return input.data -} - -// Name - -func (input *Input) Name() string { - return input.name -} diff --git a/pkg/modules/module.go b/pkg/modules/module.go index 319b44f..55cacc8 100644 --- a/pkg/modules/module.go +++ b/pkg/modules/module.go @@ -17,10 +17,5 @@ type Module interface { io.Closer Name() string - Start(ctx context.Context) - - Input(name string) (*Input, error) - Output(name string) (*Output, error) - AttachTo(outputName string, input *Input) error } diff --git a/pkg/modules/output.go b/pkg/modules/output.go index 8ae04b8..d4e305c 100644 --- a/pkg/modules/output.go +++ b/pkg/modules/output.go @@ -1,44 +1,44 @@ package modules import ( - "log" + "reflect" "sync" + + "github.com/pkg/errors" ) // Output - -type Output struct { - connectedInputs []*Input - name string +type Output[T any] struct { + connectedInputs []chan T mx sync.RWMutex } // NewOutput - -func NewOutput(name string) *Output { - return &Output{ - connectedInputs: make([]*Input, 0), - name: name, +func NewOutput[T any]() *Output[T] { + return &Output[T]{ + connectedInputs: make([]chan T, 0), } } // ConnectedInputs - -func (output *Output) ConnectedInputs() []*Input { +func (output *Output[T]) ConnectedInputs() []chan T { return output.connectedInputs } // Push - -func (output *Output) Push(msg any) { +func (output *Output[T]) Push(msg T) { output.mx.RLock() { for i := range output.connectedInputs { - output.connectedInputs[i].Push(msg) + output.connectedInputs[i] <- msg } } output.mx.RUnlock() } // Attach - -func (output *Output) Attach(input *Input) { +func (output *Output[T]) Attach(input chan T) { output.mx.Lock() { output.connectedInputs = append(output.connectedInputs, input) @@ -46,16 +46,26 @@ func (output *Output) Attach(input *Input) { output.mx.Unlock() } -// Name - -func (output *Output) Name() string { - return output.name -} - // Connect - -func Connect(outputModule, inputModule Module, outputName, inputName string) error { - input, err := inputModule.Input(inputName) - if err != nil { - log.Panic(err) +func Connect(outputModule, inputModule, outputName, inputName string) error { + outPorts, ok := globalRegistry.modules[outputModule] + if !ok { + return errors.Errorf("unregistered output module: %s", outputModule) + } + inPorts, ok := globalRegistry.modules[inputModule] + if !ok { + return errors.Errorf("unregistered input module: %s", inputModule) } - return outputModule.AttachTo(outputName, input) + out, ok := outPorts.outputs[outputName] + if !ok { + return errors.Errorf("unregistered output %s in module %s", outputName, outputModule) + } + in, ok := inPorts.inputs[inputName] + if !ok { + return errors.Errorf("unregistered input %s in module %s", inputName, inputModule) + } + + attach := out.MethodByName("Attach") + attach.Call([]reflect.Value{in}) + return nil } diff --git a/pkg/modules/printer/module.go b/pkg/modules/printer/module.go index 1c6a3e1..cb04b2f 100644 --- a/pkg/modules/printer/module.go +++ b/pkg/modules/printer/module.go @@ -3,89 +3,56 @@ package printer import ( "context" "fmt" - "sync" - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" + "github.com/dipdup-io/workerpool" "github.com/rs/zerolog/log" ) -// input name +// predefined constants const ( - InputName = "input" + ModuleName = "printer" + InputName = "Input" ) -// Printer - the structure which is responsible for print received messages -type Printer struct { - input *modules.Input +// Module - the structure which is responsible for print received messages +type Module struct { + Input chan string - wg *sync.WaitGroup + g workerpool.Group } -// NewPrinter - constructor of printer structure -func NewPrinter() *Printer { - return &Printer{ - input: modules.NewInput(InputName), - - wg: new(sync.WaitGroup), +// NewModule - constructor of printer structure +func NewModule() *Module { + return &Module{ + Input: make(chan string, 16), + g: workerpool.NewGroup(), } } // Name - -func (printer *Printer) Name() string { - return "printer" -} - -// Input - returns input by name -func (printer *Printer) Input(name string) (*modules.Input, error) { - switch name { - case InputName: - return printer.input, nil - default: - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } -} - -// Output - returns output by name -func (printer *Printer) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// AttachTo - attach input to output with name -func (printer *Printer) AttachTo(name string, input *modules.Input) error { - output, err := printer.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil +func (printer *Module) Name() string { + return ModuleName } // Close - gracefully stops module -func (printer *Printer) Close() error { - printer.wg.Wait() - - if err := printer.input.Close(); err != nil { - return err - } +func (printer *Module) Close() error { + printer.g.Wait() + close(printer.Input) return nil } // Start - starts module -func (printer *Printer) Start(ctx context.Context) { - printer.wg.Add(1) - go printer.listen(ctx) +func (printer *Module) Start(ctx context.Context) { + printer.g.GoCtx(ctx, printer.listen) } -func (printer *Printer) listen(ctx context.Context) { - defer printer.wg.Done() - +func (printer *Module) listen(ctx context.Context) { for { select { case <-ctx.Done(): return - case msg, ok := <-printer.input.Listen(): + case msg, ok := <-printer.Input: if !ok { return } diff --git a/pkg/modules/reflect.go b/pkg/modules/reflect.go new file mode 100644 index 0000000..468de5f --- /dev/null +++ b/pkg/modules/reflect.go @@ -0,0 +1,64 @@ +package modules + +import ( + "reflect" + + "github.com/pkg/errors" +) + +type ports map[string]reflect.Value + +type modulePorts struct { + inputs ports + outputs ports +} + +func newModulePorts() modulePorts { + return modulePorts{ + inputs: make(ports), + outputs: make(ports), + } +} + +func getModulePorts(module any) (modulePorts, error) { + result := newModulePorts() + + value := reflect.ValueOf(module) + typ := reflect.TypeOf(module) + + for typ.Kind() == reflect.Ptr { + typ = typ.Elem() + value = value.Elem() + } + + if typ.Kind() != reflect.Struct { + return result, errors.New("module should be struct") + } + + for i := 0; i < typ.NumField(); i++ { + field := value.Field(i) + fieldType := typ.Field(i) + + switch field.Kind() { + case reflect.Chan: + switch fieldType.Type.ChanDir() { + case reflect.RecvDir, reflect.BothDir: + result.inputs[fieldType.Name] = field + } + case reflect.Pointer: + elemType := fieldType.Type.Elem() + if elemType.Kind() != reflect.Struct { + continue + } + if fieldType.Name != "Output" { + continue + } + result.outputs[fieldType.Name] = field + default: + continue + } + + } + + return result, nil +} diff --git a/pkg/modules/reflect_test.go b/pkg/modules/reflect_test.go new file mode 100644 index 0000000..1ba949e --- /dev/null +++ b/pkg/modules/reflect_test.go @@ -0,0 +1,92 @@ +package modules + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/require" +) + +type testModule struct { + Field1 int + Field2 string + Field3 chan int + + Output *Output[int] +} + +type testDirectionalModule struct { + Field1 int + Field2 string + Field3 chan int + Field4 <-chan bool + + Output *Output[int] + + Struct struct{} + StructPtr *struct{} + + Int *int + + Array []int + Array2 []<-chan int +} + +func Test_getModulePorts(t *testing.T) { + tests := []struct { + name string + module any + want modulePorts + wantErr bool + }{ + { + name: "test 1", + module: &testModule{}, + want: modulePorts{ + inputs: ports{ + "Field3": reflect.Value{}, + }, + outputs: ports{ + "Output": reflect.Value{}, + }, + }, + wantErr: false, + }, { + name: "test 2", + module: &testDirectionalModule{}, + want: modulePorts{ + inputs: ports{ + "Field3": reflect.Value{}, + "Field4": reflect.Value{}, + }, + outputs: ports{ + "Output": reflect.Value{}, + }, + }, + wantErr: false, + }, { + name: "test 3", + module: 1000, + want: newModulePorts(), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getModulePorts(tt.module) + require.Equal(t, (err != nil), tt.wantErr, "getModulePorts") + for key := range tt.want.inputs { + require.Contains(t, got.inputs, key) + } + for key := range got.inputs { + require.Contains(t, tt.want.inputs, key) + } + for key := range tt.want.outputs { + require.Contains(t, got.outputs, key) + } + for key := range got.outputs { + require.Contains(t, tt.want.outputs, key) + } + }) + } +} diff --git a/pkg/modules/registry.go b/pkg/modules/registry.go new file mode 100644 index 0000000..3208443 --- /dev/null +++ b/pkg/modules/registry.go @@ -0,0 +1,33 @@ +package modules + +var ( + globalRegistry *registry +) + +type modules map[string]modulePorts + +type registry struct { + modules modules +} + +func newRegistry() *registry { + return ®istry{ + modules: make(modules), + } +} + +// Register - call this function to be able to connect modules to each other +func Register(modules ...Module) error { + if globalRegistry == nil { + globalRegistry = newRegistry() + } + + for i := range modules { + ports, err := getModulePorts(modules[i]) + if err != nil { + return err + } + globalRegistry.modules[modules[i].Name()] = ports + } + return nil +} diff --git a/pkg/modules/workflow.go b/pkg/modules/workflow.go index 1875078..924a2f4 100644 --- a/pkg/modules/workflow.go +++ b/pkg/modules/workflow.go @@ -51,29 +51,29 @@ func (wf *Workflow) Get(name string) (Module, error) { return module, nil } -// Connect - connect destination nodule input to source module output -func (wf *Workflow) Connect(srcModule, srcOutput, destModule, destInput string) error { - src, ok := wf.modules[srcModule] - if !ok { - return errors.Wrap(ErrUnknownModule, srcModule) - } - output, err := src.Output(srcOutput) - if err != nil { - return err - } +// // Connect - connect destination nodule input to source module output +// func (wf *Workflow) Connect(srcModule, srcOutput, destModule, destInput string) error { +// src, ok := wf.modules[srcModule] +// if !ok { +// return errors.Wrap(ErrUnknownModule, srcModule) +// } +// output, err := src.Output(srcOutput) +// if err != nil { +// return err +// } - dest, ok := wf.modules[destModule] - if !ok { - return errors.Wrap(ErrUnknownModule, destModule) - } - input, err := dest.Input(destInput) - if err != nil { - return err - } +// dest, ok := wf.modules[destModule] +// if !ok { +// return errors.Wrap(ErrUnknownModule, destModule) +// } +// input, err := dest.Input(destInput) +// if err != nil { +// return err +// } - output.Attach(input) - return nil -} +// output.Attach(input) +// return nil +// } // Start - starts workflow func (wf *Workflow) Start(ctx context.Context) { diff --git a/pkg/modules/zipper/README.md b/pkg/modules/zipper/README.md index d28ac09..489e059 100644 --- a/pkg/modules/zipper/README.md +++ b/pkg/modules/zipper/README.md @@ -18,6 +18,11 @@ Zipper module implements interface `Module`. So you can use it like any other mo // create zip module zip := zipper.NewModule[int]() +// register module +if err := modules.Register(zip); err != nil { + log.Panic(err) +} + // start zip module zip.Start(ctx) @@ -33,10 +38,10 @@ Zipper is the generic structure. Type parameter `Key` is `comparable` constraint ```go type Module[Key comparable] struct { - firstInput *modules.Input - secondInput *modules.Input + First chan Zippable[Key] + Second chan Zippable[Key] - output *modules.Output + Output *modules.Output[*Result[Key]] firstStream map[Key]Zippable[Key] secondStream map[Key]Zippable[Key] @@ -44,7 +49,7 @@ type Module[Key comparable] struct { zipFunc ZipFunction[Key] mx *sync.RWMutex - wg *sync.WaitGroup + g workerpool.Group } ``` @@ -109,13 +114,13 @@ In the example zip module created with `Key` type `int`. That's why to use the m Package contains declared constants of inputs name: ```go -FirstInputName = "first" -SecondInputName = "second" +FirstInputName = "First" +SecondInputName = "Second" ``` ## Output -When module zipped data stream it sends `Result` structure to output. +When module zipped data stream it sends `*Result` structure to output. ```go type Result[Type comparable] struct { @@ -130,5 +135,5 @@ It contains key which was used to zip. Also it contains data of first and second Package contains declared constant of output name: ```go -OutputName = "output" +OutputName = "Output" ``` \ No newline at end of file diff --git a/pkg/modules/zipper/data.go b/pkg/modules/zipper/data.go index 0f6e276..2e95788 100644 --- a/pkg/modules/zipper/data.go +++ b/pkg/modules/zipper/data.go @@ -4,12 +4,10 @@ import "github.com/pkg/errors" // predefined names const ( - FirstInputName = "first" - SecondInputName = "second" - - OutputName = "output" - - ModuleName = "zipper" + FirstInputName = "First" + SecondInputName = "Second" + OutputName = "Output" + ModuleName = "zipper" ) // Zippable - interface of data which can be zipped diff --git a/pkg/modules/zipper/module.go b/pkg/modules/zipper/module.go index ade085c..ac39dda 100644 --- a/pkg/modules/zipper/module.go +++ b/pkg/modules/zipper/module.go @@ -4,16 +4,16 @@ import ( "context" "sync" + "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" ) // Module - zip module type Module[Key comparable] struct { - firstInput *modules.Input - secondInput *modules.Input + First chan Zippable[Key] + Second chan Zippable[Key] - output *modules.Output + Output *modules.Output[*Result[Key]] firstStream map[Key]Zippable[Key] secondStream map[Key]Zippable[Key] @@ -21,20 +21,20 @@ type Module[Key comparable] struct { zipFunc ZipFunction[Key] mx *sync.RWMutex - wg *sync.WaitGroup + g workerpool.Group } // NewModule - creates zip module func NewModule[Key comparable]() *Module[Key] { return &Module[Key]{ - firstInput: modules.NewInput(FirstInputName), - secondInput: modules.NewInput(SecondInputName), - output: modules.NewOutput(OutputName), + First: make(chan Zippable[Key], 1024), + Second: make(chan Zippable[Key], 1024), + Output: modules.NewOutput[*Result[Key]](), firstStream: make(map[Key]Zippable[Key]), secondStream: make(map[Key]Zippable[Key]), zipFunc: defaultZip[Key], mx: new(sync.RWMutex), - wg: new(sync.WaitGroup), + g: workerpool.NewGroup(), } } @@ -44,12 +44,12 @@ func NewModuleWithFunc[Key comparable](f ZipFunction[Key]) (*Module[Key], error) return nil, ErrNilZipFunc } return &Module[Key]{ - firstInput: modules.NewInput(FirstInputName), - secondInput: modules.NewInput(SecondInputName), - output: modules.NewOutput(OutputName), - zipFunc: f, - mx: new(sync.RWMutex), - wg: new(sync.WaitGroup), + First: make(chan Zippable[Key], 1024), + Second: make(chan Zippable[Key], 1024), + Output: modules.NewOutput[*Result[Key]](), + zipFunc: f, + mx: new(sync.RWMutex), + g: workerpool.NewGroup(), }, nil } @@ -58,81 +58,36 @@ func (*Module[Key]) Name() string { return ModuleName } -// Input - returns input by name -func (m *Module[Key]) Input(name string) (*modules.Input, error) { - switch name { - case FirstInputName: - return m.firstInput, nil - case SecondInputName: - return m.secondInput, nil - default: - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } -} - -// Output - returns output by name -func (m *Module[Key]) Output(name string) (*modules.Output, error) { - if name != OutputName { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return m.output, nil -} - -// AttachTo - attach input to output with name -func (m *Module[Key]) AttachTo(name string, input *modules.Input) error { - output, err := m.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - // Close - gracefully stops module func (m *Module[Key]) Close() error { - m.wg.Wait() + m.g.Wait() - if err := m.firstInput.Close(); err != nil { - return err - } - if err := m.secondInput.Close(); err != nil { - return err - } + close(m.First) + close(m.Second) return nil } // Start - starts module func (m *Module[Key]) Start(ctx context.Context) { - m.wg.Add(1) - go m.listen(ctx) + m.g.GoCtx(ctx, m.listen) } func (m *Module[Key]) listen(ctx context.Context) { - defer m.wg.Done() - for { select { case <-ctx.Done(): return - case msg, ok := <-m.firstInput.Listen(): + case msg, ok := <-m.First: if !ok { return } - value, ok := msg.(Zippable[Key]) - if !ok { - continue - } - m.zip(value, m.firstStream, m.secondStream) - case msg, ok := <-m.secondInput.Listen(): + m.zip(msg, m.firstStream, m.secondStream) + case msg, ok := <-m.Second: if !ok { return } - value, ok := msg.(Zippable[Key]) - if !ok { - continue - } - m.zip(value, m.secondStream, m.firstStream) + m.zip(msg, m.secondStream, m.firstStream) } } } @@ -142,7 +97,7 @@ func (m *Module[Key]) zip(value Zippable[Key], first, second map[Key]Zippable[Ke first[value.Key()] = value } else { if result := m.zipFunc(value, data); result != nil { - m.output.Push(result) + m.Output.Push(result) delete(second, value.Key()) } } diff --git a/pkg/modules/zipper/register_test.go b/pkg/modules/zipper/register_test.go new file mode 100644 index 0000000..3dc8413 --- /dev/null +++ b/pkg/modules/zipper/register_test.go @@ -0,0 +1,13 @@ +package zipper + +import ( + "testing" + + "github.com/dipdup-net/indexer-sdk/pkg/modules" + "github.com/stretchr/testify/require" +) + +func TestRegisterModule(t *testing.T) { + err := modules.Register(NewModule[int]()) + require.NoError(t, err) +}