Skip to content

Commit

Permalink
Refactoring: typing input and outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Sep 2, 2023
1 parent d8035a8 commit 1385889
Show file tree
Hide file tree
Showing 33 changed files with 524 additions and 784 deletions.
47 changes: 7 additions & 40 deletions examples/cron/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,22 @@ import (
"context"
"sync"

"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/pkg/errors"
"github.com/dipdup-net/indexer-sdk/pkg/modules/cron"
"github.com/rs/zerolog/log"
)

// CustomModule -
type CustomModule struct {
everySecond *modules.Input
everyFiveSecond *modules.Input
Messages chan cron.Message

wg *sync.WaitGroup
}

// NewCustomModule -
func NewCustomModule() *CustomModule {
return &CustomModule{
everySecond: modules.NewInput("every_second"),
everyFiveSecond: modules.NewInput("every_five_second"),
wg: new(sync.WaitGroup),
Messages: make(chan cron.Message, 16),
wg: new(sync.WaitGroup),
}
}

Expand All @@ -32,39 +29,15 @@ func (m *CustomModule) Start(ctx context.Context) {
go m.listen(ctx)
}

// Input -
func (m *CustomModule) Input(name string) (*modules.Input, error) {
switch name {
case "every_second":
return m.everySecond, nil
case "every_five_second":
return m.everyFiveSecond, nil
default:
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}
}

// Output -
func (m *CustomModule) Output(name string) (*modules.Output, error) {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
}

func (m *CustomModule) listen(ctx context.Context) {
defer m.wg.Done()

for {
select {
case <-ctx.Done():
return
case <-m.everySecond.Listen():
log.Info().Msg("arrived from cron module")
case <-m.everyFiveSecond.Listen():
log.Info().Msg("arrived from cron module")
case msg := <-m.Messages:
log.Info().Str("job", msg.Job).Msg("arrived from cron module")
}
}
}
Expand All @@ -73,13 +46,7 @@ func (m *CustomModule) listen(ctx context.Context) {
func (m *CustomModule) Close() error {
m.wg.Wait()

if err := m.everyFiveSecond.Close(); err != nil {
return err
}
if err := m.everySecond.Close(); err != nil {
return err
}

close(m.Messages)
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions examples/cron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ func main() {
}
customModule := NewCustomModule()

if err := modules.Connect(cronModule, customModule, "every_second", "every_second"); err != nil {
if err := modules.Register(cronModule, customModule); err != nil {
log.Panic(err)
}
if err := modules.Connect(cronModule, customModule, "every_five_second", "every_five_second"); err != nil {

if err := modules.Connect(cronModule.Name(), customModule.Name(), "Output", "Messages"); err != nil {
log.Panic(err)
}

Expand Down
68 changes: 0 additions & 68 deletions examples/duplicator/main.go

This file was deleted.

54 changes: 12 additions & 42 deletions examples/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,40 @@ package main

import (
"context"
"sync"

"github.com/dipdup-io/workerpool"
"github.com/dipdup-net/indexer-sdk/examples/grpc/pb"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

// Client -
type Client struct {
*grpc.Client

output *modules.Output
stream *grpc.Stream[*pb.Response]
Output *modules.Output[string]

stream *grpc.Stream[pb.Response]
client pb.TimeServiceClient
wg *sync.WaitGroup
g workerpool.Group
}

// NewClient -
func NewClient(server string) *Client {
return &Client{
Client: grpc.NewClient(server),
output: modules.NewOutput("time"),
wg: new(sync.WaitGroup),
Output: modules.NewOutput[string](),
g: workerpool.NewGroup(),
}
}

// Start -
func (client *Client) Start(ctx context.Context) {
client.client = pb.NewTimeServiceClient(client.Connection())

client.wg.Add(1)
go client.reconnect(ctx)
client.g.GoCtx(ctx, client.reconnect)
}

// SubscribeOnTime -
Expand All @@ -46,17 +44,13 @@ func (client *Client) SubscribeOnTime(ctx context.Context) (uint64, error) {
if err != nil {
return 0, err
}
client.stream = grpc.NewStream[*pb.Response](stream)

client.wg.Add(1)
go client.handleTime(ctx)
client.stream = grpc.NewStream[pb.Response](stream)

client.g.GoCtx(ctx, client.handleTime)
return client.stream.Subscribe(ctx)
}

func (client *Client) reconnect(ctx context.Context) {
defer client.wg.Done()

for {
select {
case <-ctx.Done():
Expand All @@ -77,14 +71,13 @@ func (client *Client) reconnect(ctx context.Context) {
}

func (client *Client) handleTime(ctx context.Context) {
defer client.wg.Done()

for {
select {
case <-ctx.Done():
return

case msg := <-client.stream.Listen():
client.output.Push(msg)
client.Output.Push(msg.Time)
}
}
}
Expand All @@ -99,32 +92,9 @@ func (client *Client) UnsubscribeFromTime(ctx context.Context, id uint64) error
return nil
}

// Input -
func (client *Client) Input(name string) (*modules.Input, error) {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}

// Output -
func (client *Client) Output(name string) (*modules.Output, error) {
if name != "time" {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}
return client.output, nil
}

// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
output, err := client.Output(name)
if err != nil {
return err
}
output.Attach(input)
return nil
}

// Close -
func (client *Client) Close() error {
client.wg.Wait()
client.g.Wait()

if client.stream != nil {
if err := client.stream.Close(); err != nil {
Expand Down
42 changes: 10 additions & 32 deletions examples/grpc/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,37 @@ package main
import (
"context"
"encoding/json"
"sync"

"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/pkg/errors"
"github.com/dipdup-io/workerpool"
"github.com/rs/zerolog/log"
)

// CustomModule -
type CustomModule struct {
input *modules.Input
Input chan string

wg *sync.WaitGroup
g workerpool.Group
}

// NewCustomModule -
func NewCustomModule() *CustomModule {
return &CustomModule{
input: modules.NewInput("input"),
wg: new(sync.WaitGroup),
Input: make(chan string),
g: workerpool.NewGroup(),
}
}

// Start -
func (m *CustomModule) Start(ctx context.Context) {
m.wg.Add(1)
go m.listen(ctx)
}

// Input -
func (m *CustomModule) Input(name string) (*modules.Input, error) {
if name != "input" {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}
return m.input, nil
}

// Output -
func (m *CustomModule) Output(name string) (*modules.Output, error) {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}

// AttachTo -
func (m *CustomModule) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
m.g.GoCtx(ctx, m.listen)
}

func (m *CustomModule) listen(ctx context.Context) {
defer m.wg.Done()

for {
select {
case <-ctx.Done():
return
case msg := <-m.input.Listen():
case msg := <-m.Input:
b, _ := json.Marshal(msg)
log.Info().Str("msg", string(b)).Msg("arrived from grpc module")
}
Expand All @@ -65,8 +42,9 @@ func (m *CustomModule) listen(ctx context.Context) {

// Close -
func (m *CustomModule) Close() error {
m.wg.Wait()
return m.input.Close()
m.g.Wait()
close(m.Input)
return nil
}

// Name -
Expand Down
6 changes: 5 additions & 1 deletion examples/grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func main() {
// creating custom module which receives notification from client and log it to console.
module := NewCustomModule()

if err := modules.Connect(client, module, "time", "input"); err != nil {
if err := modules.Register(server, client, module); err != nil {
log.Panic().Err(err).Msg("register modules")
}

if err := modules.Connect(client.Name(), module.Name(), "Output", "Input"); err != nil {
log.Panic().Err(err).Msg("module connection error")
return
}
Expand Down
Loading

0 comments on commit 1385889

Please sign in to comment.