Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds BaseModule #7

Merged
merged 18 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .run/examples_cron.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="examples/cron" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="indexer-sdk" />
<working_directory value="$PROJECT_DIR$/examples/cron" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<kind value="DIRECTORY" />
<package value="github.com/dipdup-net/indexer-sdk/examples/cron" />
<directory value="$PROJECT_DIR$/examples/cron" />
<filePath value="$PROJECT_DIR$/examples/cron/main.go" />
<method v="2" />
</configuration>
</component>
21 changes: 21 additions & 0 deletions .run/examples_grpc.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="examples/grpc" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="indexer-sdk" />
<working_directory value="$PROJECT_DIR$/examples/grpc" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<kind value="DIRECTORY" />
<package value="github.com/dipdup-net/indexer-sdk/examples/grpc" />
<directory value="$PROJECT_DIR$/examples/grpc" />
<filePath value="$PROJECT_DIR$/examples/grpc/main.go" />
<method v="2" />
</configuration>
</component>
21 changes: 21 additions & 0 deletions .run/examples_zipper.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="examples/zipper" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="indexer-sdk" />
<working_directory value="$PROJECT_DIR$/examples/zipper" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<kind value="DIRECTORY" />
<package value="github.com/dipdup-net/indexer-sdk/examples/zipper/examples/zipper" />
<directory value="$PROJECT_DIR$/examples/zipper" />
<filePath value="$PROJECT_DIR$/examples/zipper/main.go" />
<method v="2" />
</configuration>
</component>
20 changes: 18 additions & 2 deletions examples/cron/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/rs/zerolog/log"
)

var _ modules.Module = (*CustomModule)(nil)

// CustomModule -
type CustomModule struct {
everySecond *modules.Input
Expand Down Expand Up @@ -50,8 +52,22 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
func (m *CustomModule) AttachTo(outputM modules.Module, outputName, inputName string) error {
output, err := outputM.Output(outputName)
if err != nil {
return nil
}

switch inputName {
case "every_second":
output.Attach(m.everySecond)
return nil
case "every_five_second":
output.Attach(m.everyFiveSecond)
return nil
default:
return errors.Wrap(modules.ErrUnknownInput, inputName)
}
}

func (m *CustomModule) listen(ctx context.Context) {
Expand Down
68 changes: 0 additions & 68 deletions examples/duplicator/main.go

This file was deleted.

14 changes: 11 additions & 3 deletions examples/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Client struct {
wg *sync.WaitGroup
}

var _ modules.Module = (*Client)(nil)

// NewClient -
func NewClient(server string) *Client {
return &Client{
Expand Down Expand Up @@ -113,12 +115,18 @@ func (client *Client) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
output, err := client.Output(name)
func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}

input, err := client.Input(inputName)
if err != nil {
return err
}
output.Attach(input)

outputChannel.Attach(input)
return nil
}

Expand Down
15 changes: 13 additions & 2 deletions examples/grpc/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,19 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
func (m *CustomModule) AttachTo(outputModule modules.Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}

input, err := m.Input(inputName)
if err != nil {
return err
}

outputChannel.Attach(input)
return nil
}

func (m *CustomModule) listen(ctx context.Context) {
Expand Down
12 changes: 9 additions & 3 deletions examples/zipper/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ func (m *CustomModule) Output(name string) (*modules.Output, error) {
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
output, err := m.Output(name)
func (m *CustomModule) AttachTo(outputModule modules.Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}
output.Attach(input)

input, err := m.Input(inputName)
if err != nil {
return err
}

outputChannel.Attach(input)
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions examples/zipper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ func main() {
first := NewCustomModule(10, -1, "first")
second := NewCustomModule(0, 1, "second")

if err := modules.Connect(first, zip, "output", zipper.FirstInputName); err != nil {
if err := modules.Connect(first, zip, zipper.OutputName, zipper.FirstInputName); err != nil {
log.Panic(err)
}
if err := modules.Connect(second, zip, "output", zipper.SecondInputName); err != nil {
if err := modules.Connect(second, zip, zipper.OutputName, zipper.SecondInputName); err != nil {
log.Panic(err)
}

fakeInput := modules.NewInput("fake")
if err := zip.AttachTo(zipper.OutputName, fakeInput); err != nil {
zipOutput, err := zip.Output(zipper.OutputName)
if err != nil {
log.Panic(err)
}
zipOutput.Attach(fakeInput)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down
4 changes: 0 additions & 4 deletions pkg/modules/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,3 @@ gRPC module where realized default client and server. Detailed docs can be found
### Cron

Cron module implements cron scheduler. Detailed docs can be found [here](/pkg/modules/cron/).

### Duplicator

Duplicator module repeats signal from one of inputs to all outputs. Detailed docs can be found [here](/pkg/modules/duplicator/).
81 changes: 81 additions & 0 deletions pkg/modules/baseModule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package modules

import (
"context"
"github.com/dipdup-io/workerpool"
"github.com/dipdup-net/indexer-sdk/pkg/sync"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var _ Module = (*BaseModule)(nil)

type BaseModule struct {
name string
inputs *sync.Map[string, *Input]
outputs *sync.Map[string, *Output]
Log zerolog.Logger
G workerpool.Group
}

func New(name string) BaseModule {
m := BaseModule{
name: name,
inputs: sync.NewMap[string, *Input](),
outputs: sync.NewMap[string, *Output](),
Log: log.With().Str("module", name).Logger(),
G: workerpool.NewGroup(),
}

return m
}

func (m *BaseModule) Name() string {
return m.name
}

func (*BaseModule) Start(_ context.Context) {}

func (*BaseModule) Close() error {
return nil
}

func (m *BaseModule) Input(name string) (*Input, error) {
input, ok := m.inputs.Get(name)
if !ok {
return nil, errors.Wrap(ErrUnknownInput, name)
}
return input, nil
}

func (m *BaseModule) CreateInput(name string) {
m.inputs.Set(name, NewInput(name))
}

func (m *BaseModule) Output(name string) (*Output, error) {
output, ok := m.outputs.Get(name)
if !ok {
return nil, errors.Wrap(ErrUnknownOutput, name)
}
return output, nil
}

func (m *BaseModule) CreateOutput(name string) {
m.outputs.Set(name, NewOutput(name))
}

func (m *BaseModule) AttachTo(outputModule Module, outputName, inputName string) error {
outputChannel, err := outputModule.Output(outputName)
if err != nil {
return err
}

input, err := m.Input(inputName)
if err != nil {
return err
}

outputChannel.Attach(input)
return nil
}
Loading
Loading