diff --git a/ego.go b/ego.go index 393d426..c68d8f6 100644 --- a/ego.go +++ b/ego.go @@ -11,7 +11,7 @@ import ( "github.com/tochemey/goakt/actors" "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/log" - "github.com/tochemey/goakt/pkg/telemetry" + "github.com/tochemey/goakt/telemetry" "go.uber.org/atomic" ) @@ -81,8 +81,11 @@ func (x *Ego) Stop(ctx context.Context) error { return x.actorSystem.Stop(ctx) } -// SendCommand sends command to a given entity ref -func (x *Ego) SendCommand(ctx context.Context, command entity.Command, entityID string) (*egopb.CommandReply, error) { +// SendCommand sends command to a given entity ref. This will return: +// 1. the resulting state after the command has been handled and the emitted event persisted +// 2. nil when there is no resulting state or no event persisted +// 3. an error in case of error +func (x *Ego) SendCommand(ctx context.Context, command entity.Command, entityID string) (entity.State, error) { // first check whether we are running in cluster mode or not if x.enableCluster.Load() { // grab the remote address of the actor @@ -104,8 +107,13 @@ func (x *Ego) SendCommand(ctx context.Context, command entity.Command, entityID } // let us unmarshall the reply commandReply := new(egopb.CommandReply) - err = reply.UnmarshalTo(commandReply) - return commandReply, err + // parse the reply and return the error when there is one + if err = reply.UnmarshalTo(commandReply); err != nil { + return nil, err + } + + // parse the command reply and return the appropriate responses + return parseCommandReply(commandReply) } // locate the given actor pid, err := x.actorSystem.LocalActor(ctx, entityID) @@ -125,8 +133,33 @@ func (x *Ego) SendCommand(ctx context.Context, command entity.Command, entityID return nil, err } - // TODO use ok, comma idiom to cast - return reply.(*egopb.CommandReply), nil + // cast the reply to a command reply because that is the expected return type + commandReply, ok := reply.(*egopb.CommandReply) + // when casting is successful + if ok { + // parse the command reply and return the appropriate responses + return parseCommandReply(commandReply) + } + // casting failed + return nil, errors.New("failed to parse command reply") +} + +// parseCommandReply parses the command reply +func parseCommandReply(reply *egopb.CommandReply) (entity.State, error) { + var ( + state entity.State + err error + ) + // parse the command reply + switch r := reply.GetReply().(type) { + case *egopb.CommandReply_StateReply: + state, err = r.StateReply.GetState().UnmarshalNew() + case *egopb.CommandReply_NoReply: + // nothing to be done here + case *egopb.CommandReply_ErrorReply: + err = errors.New(r.ErrorReply.GetMessage()) + } + return state, err } // NewEntity creates an entity and return the entity reference diff --git a/ego_test.go b/ego_test.go index d0aeb09..9951a8f 100644 --- a/ego_test.go +++ b/ego_test.go @@ -11,12 +11,11 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tochemey/ego/egopb" "github.com/tochemey/ego/entity" "github.com/tochemey/ego/eventstore/memory" samplepb "github.com/tochemey/ego/example/pbs/sample/pb/v1" "github.com/tochemey/goakt/discovery" - mockdisco "github.com/tochemey/goakt/goaktmocks/discovery" + mockdisco "github.com/tochemey/goakt/testkit/discovery" "github.com/travisjeffery/go-dynaport" "google.golang.org/protobuf/proto" ) @@ -87,11 +86,8 @@ func TestEgo(t *testing.T) { // send the command to the actor. Please don't ignore the error in production grid code reply, err := e.SendCommand(ctx, command, entityID) require.NoError(t, err) - require.IsType(t, new(egopb.CommandReply_StateReply), reply.GetReply()) - assert.EqualValues(t, 1, reply.GetStateReply().GetSequenceNumber()) - resultingState := new(samplepb.Account) - assert.NoError(t, reply.GetStateReply().GetState().UnmarshalTo(resultingState)) + resultingState := reply.(*samplepb.Account) assert.EqualValues(t, 500.00, resultingState.GetAccountBalance()) assert.Equal(t, entityID, resultingState.GetAccountId()) @@ -102,11 +98,8 @@ func TestEgo(t *testing.T) { } reply, err = e.SendCommand(ctx, command, entityID) require.NoError(t, err) - require.IsType(t, new(egopb.CommandReply_StateReply), reply.GetReply()) - assert.EqualValues(t, 2, reply.GetStateReply().GetSequenceNumber()) - newState := new(samplepb.Account) - assert.NoError(t, reply.GetStateReply().GetState().UnmarshalTo(newState)) + newState := reply.(*samplepb.Account) assert.EqualValues(t, 750.00, newState.GetAccountBalance()) assert.Equal(t, entityID, newState.GetAccountId()) @@ -138,11 +131,8 @@ func TestEgo(t *testing.T) { // send the command to the actor. Please don't ignore the error in production grid code reply, err := e.SendCommand(ctx, command, entityID) require.NoError(t, err) - require.IsType(t, new(egopb.CommandReply_StateReply), reply.GetReply()) - assert.EqualValues(t, 1, reply.GetStateReply().GetSequenceNumber()) - resultingState := new(samplepb.Account) - assert.NoError(t, reply.GetStateReply().GetState().UnmarshalTo(resultingState)) + resultingState := reply.(*samplepb.Account) assert.EqualValues(t, 500.00, resultingState.GetAccountBalance()) assert.Equal(t, entityID, resultingState.GetAccountId()) @@ -153,11 +143,8 @@ func TestEgo(t *testing.T) { } reply, err = e.SendCommand(ctx, command, entityID) require.NoError(t, err) - require.IsType(t, new(egopb.CommandReply_StateReply), reply.GetReply()) - assert.EqualValues(t, 2, reply.GetStateReply().GetSequenceNumber()) - newState := new(samplepb.Account) - assert.NoError(t, reply.GetStateReply().GetState().UnmarshalTo(newState)) + newState := reply.(*samplepb.Account) assert.EqualValues(t, 750.00, newState.GetAccountBalance()) assert.Equal(t, entityID, newState.GetAccountId()) diff --git a/entity/behavior.go b/entity/behavior.go index b563bdd..e79c9b0 100644 --- a/entity/behavior.go +++ b/entity/behavior.go @@ -26,7 +26,8 @@ type Behavior[T State] interface { // ID defines the id that will be used in the event journal. // This helps track the entity in the events store. ID() string - // InitialState returns the event sourced actor initial state + // InitialState returns the event sourced actor initial state. + // This is set as the initial state when there are no snapshots found the entity InitialState() T // HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command, // which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can @@ -120,8 +121,8 @@ func (entity *Entity[T]) Receive(ctx actors.ReceiveContext) { // PostStop prepares the actor to gracefully shutdown func (entity *Entity[T]) PostStop(ctx context.Context) error { // add a span context - //ctx, span := telemetry.SpanContext(ctx, "PostStop") - //defer span.End() + ctx, span := telemetry.SpanContext(ctx, "PostStop") + defer span.End() // acquire the lock entity.mu.Lock() @@ -139,8 +140,8 @@ func (entity *Entity[T]) PostStop(ctx context.Context) error { // this is vital when the entity actor is restarting. func (entity *Entity[T]) recoverFromSnapshot(ctx context.Context) error { // add a span context - //ctx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot") - //defer span.End() + ctx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot") + defer span.End() // check whether there is a snapshot to recover from event, err := entity.eventsStore.GetLatestEvent(ctx, entity.ID()) @@ -278,6 +279,7 @@ func (entity *Entity[T]) processCommandAndReply(ctx actors.ReceiveContext, comma return } + // create the command reply to send reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_StateReply{ StateReply: &egopb.StateReply{ diff --git a/entity/behavior_test.go b/entity/behavior_test.go index c5944f5..522f5a9 100644 --- a/entity/behavior_test.go +++ b/entity/behavior_test.go @@ -327,10 +327,8 @@ func TestAccountAggregate(t *testing.T) { AccountId: persistenceID, AccountBalance: 750.00, } - assert.True(t, proto.Equal(expected, resultingState)) - // shutdown the persistent actor - assert.NoError(t, actorSystem.Kill(ctx, behavior.ID())) + assert.True(t, proto.Equal(expected, resultingState)) // wait a while time.Sleep(time.Second) diff --git a/example/main.go b/example/main.go index 359815c..bf38b9e 100644 --- a/example/main.go +++ b/example/main.go @@ -8,12 +8,10 @@ import ( "os/signal" "syscall" - "github.com/tochemey/ego/entity" - "github.com/tochemey/ego/eventstore/memory" - "github.com/google/uuid" "github.com/tochemey/ego" - "github.com/tochemey/ego/egopb" + "github.com/tochemey/ego/entity" + "github.com/tochemey/ego/eventstore/memory" samplepb "github.com/tochemey/ego/example/pbs/sample/pb/v1" "google.golang.org/protobuf/proto" ) @@ -43,11 +41,8 @@ func main() { } // send the command to the actor. Please don't ignore the error in production grid code reply, _ := e.SendCommand(ctx, command, entityID) - state := reply.GetReply().(*egopb.CommandReply_StateReply) - log.Printf("resulting sequence number: %d", state.StateReply.GetSequenceNumber()) - account := new(samplepb.Account) - _ = state.StateReply.GetState().UnmarshalTo(account) + account := reply.(*samplepb.Account) log.Printf("current balance: %v", account.GetAccountBalance()) @@ -57,12 +52,7 @@ func main() { Balance: 250, } reply, _ = e.SendCommand(ctx, command, entityID) - state = reply.GetReply().(*egopb.CommandReply_StateReply) - log.Printf("resulting sequence number: %d", state.StateReply.GetSequenceNumber()) - - account = new(samplepb.Account) - _ = state.StateReply.GetState().UnmarshalTo(account) - + account = reply.(*samplepb.Account) log.Printf("current balance: %v", account.GetAccountBalance()) // capture ctrl+c diff --git a/go.mod b/go.mod index 5dbeb63..b79c0d3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/hashicorp/go-memdb v1.3.4 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 - github.com/tochemey/goakt v0.5.1 + github.com/tochemey/goakt v0.8.0 github.com/tochemey/gopack v0.0.0-20230901205014-06584bf05eb5 github.com/travisjeffery/go-dynaport v1.0.0 go.opentelemetry.io/otel v1.17.0 @@ -92,16 +92,16 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.25.0 // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.14.0 // indirect + golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.11.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/term v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/term v0.12.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.12.0 // indirect + golang.org/x/tools v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect - google.golang.org/grpc v1.57.0 // indirect + google.golang.org/grpc v1.58.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 10800fb..20b982e 100644 --- a/go.sum +++ b/go.sum @@ -384,10 +384,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow= github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y= -github.com/tochemey/goakt v0.5.1 h1:8H5IYObTxkK72JM00TeFNnrEGWaABWytL1372BINKUQ= -github.com/tochemey/goakt v0.5.1/go.mod h1:dUfzisO9UYJ2iVaCrl1/s7E1ftH612bo5Fm1bQDpX3I= -github.com/tochemey/gopack v0.0.0-20230831231518-a5a26f3fd887 h1:IZuzt2IURB5V0pTQ7knYBI9+EqJ4FIGo5YxTevnYuxw= -github.com/tochemey/gopack v0.0.0-20230831231518-a5a26f3fd887/go.mod h1:gWpWJXsIEBlompjd0pvOc61kHJp0/dUiCXK/56tYX9I= +github.com/tochemey/goakt v0.8.0 h1:s5DtH4LXmFnTOLuoofMH0W273hlLfiaFSIQudOIw5m8= +github.com/tochemey/goakt v0.8.0/go.mod h1:nbd+GPq7hTxIQeHSIvrN8dyae6AkIDjydK0ZwShIVss= github.com/tochemey/gopack v0.0.0-20230901205014-06584bf05eb5 h1:ZpUGyBPXHr7KPqJzLYqQdyTARnOVpUGy4LZx7Jgcle0= github.com/tochemey/gopack v0.0.0-20230901205014-06584bf05eb5/go.mod h1:O8C7dX8QMSDPbIjYXma+pe5pk9tg3vo6OxpDU0cFCh8= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= @@ -436,7 +434,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -460,8 +458,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -506,18 +504,18 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= -golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -533,8 +531,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss= -golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= +golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -546,8 +544,8 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6 google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.58.0 h1:32JY8YpPMSR45K+c3o6b8VL73V+rR8k+DeMIr4vRH8o= +google.golang.org/grpc v1.58.0/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/offsetstore/iface.go b/offsetstore/iface.go deleted file mode 100644 index 0719ac2..0000000 --- a/offsetstore/iface.go +++ /dev/null @@ -1,23 +0,0 @@ -package offsetstore - -import ( - "context" - - "github.com/tochemey/ego/egopb" -) - -// OffsetStore defines the contract needed to persist offsets -type OffsetStore interface { - // Connect connects to the offset store - Connect(ctx context.Context) error - // Disconnect disconnects the offset store - Disconnect(ctx context.Context) error - // WriteOffset writes the current offset of the event consumed for a given projection ID - // Note: persistence id and the projection name make a record in the journal store unique. Failure to ensure that - // can lead to some un-wanted behaviors and data inconsistency - WriteOffset(ctx context.Context, offset *egopb.Offset) error - // GetCurrentOffset returns the current offset of a given projection ID - GetCurrentOffset(ctx context.Context, projectionID *ProjectionID) (current *egopb.Offset, err error) - // Ping verifies a connection to the database is still alive, establishing a connection if necessary. - Ping(ctx context.Context) error -} diff --git a/offsetstore/memory/memory.go b/offsetstore/memory/memory.go deleted file mode 100644 index 23053c1..0000000 --- a/offsetstore/memory/memory.go +++ /dev/null @@ -1,208 +0,0 @@ -package memory - -import ( - "context" - "fmt" - - "github.com/google/uuid" - "github.com/hashicorp/go-memdb" - "github.com/pkg/errors" - "github.com/tochemey/ego/egopb" - "github.com/tochemey/ego/internal/telemetry" - "github.com/tochemey/ego/offsetstore" - "go.uber.org/atomic" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/reflect/protoregistry" - "google.golang.org/protobuf/types/known/anypb" -) - -// OffsetStore implements the offset store interface -// NOTE: NOT RECOMMENDED FOR PRODUCTION CODE because all records are in memory and there is no durability. -// This is recommended for tests or PoC -type OffsetStore struct { - // specifies the underlying database - db *memdb.MemDB - // this is only useful for tests - KeepRecordsAfterDisconnect bool - // hold the connection state to avoid multiple connection of the same instance - connected *atomic.Bool -} - -var _ offsetstore.OffsetStore = &OffsetStore{} - -// NewOffsetStore creates an instance of OffsetStore -func NewOffsetStore() *OffsetStore { - return &OffsetStore{ - KeepRecordsAfterDisconnect: false, - connected: atomic.NewBool(false), - } -} - -// Connect connects to the offset store -func (s *OffsetStore) Connect(ctx context.Context) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "OffsetStore.Connect") - defer span.End() - - // check whether this instance of the journal is connected or not - if s.connected.Load() { - return nil - } - - // create an instance of the database - db, err := memdb.NewMemDB(offsetSchema) - // handle the eventual error - if err != nil { - return err - } - // set the journal store underlying database - s.db = db - - // set the connection status - s.connected.Store(true) - - return nil -} - -// Disconnect disconnects the offset store -func (s *OffsetStore) Disconnect(ctx context.Context) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "OffsetStore.Disconnect") - defer span.End() - - // check whether this instance of the journal is connected or not - if !s.connected.Load() { - return nil - } - - // clear all records - if !s.KeepRecordsAfterDisconnect { - // spawn a db transaction for read-only - txn := s.db.Txn(true) - - // free memory resource - if _, err := txn.DeleteAll(offsetTableName, offsetPK); err != nil { - txn.Abort() - return errors.Wrap(err, "failed to free memory resource") - } - txn.Commit() - } - // set the connection status - s.connected.Store(false) - - return nil -} - -// Ping verifies a connection to the database is still alive, establishing a connection if necessary. -func (s *OffsetStore) Ping(ctx context.Context) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "OffsetStore.Ping") - defer span.End() - - // check whether we are connected or not - if !s.connected.Load() { - return s.Connect(ctx) - } - - return nil -} - -// WriteOffset writes an offset to the offset store -func (s *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "OffsetStore.WriteOffset") - defer span.End() - - // check whether this instance of the journal is connected or not - if !s.connected.Load() { - return errors.New("offset store is not connected") - } - - // spawn a db transaction - txn := s.db.Txn(true) - // create an offset row - record := &offsetRow{ - Ordering: uuid.NewString(), - ProjectionName: offset.GetProjectionName(), - PersistenceID: offset.GetPersistenceId(), - CurrentOffset: offset.GetCurrentOffset(), - LastUpdated: offset.GetTimestamp(), - } - - // persist the record - if err := txn.Insert(offsetTableName, record); err != nil { - // abort the transaction - txn.Abort() - // return the error - return errors.Wrap(err, "failed to persist offset record on to the offset store") - } - // commit the transaction - txn.Commit() - - return nil -} - -// GetCurrentOffset return the offset of a projection -func (s *OffsetStore) GetCurrentOffset(ctx context.Context, projectionID *offsetstore.ProjectionID) (current *egopb.Offset, err error) { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "OffsetStore.GetCurrentOffset") - defer span.End() - - // check whether this instance of the journal is connected or not - if !s.connected.Load() { - return nil, errors.New("offset store is not connected") - } - - // spawn a db transaction for read-only - txn := s.db.Txn(false) - defer txn.Abort() - // let us fetch the last record - raw, err := txn.Last(offsetTableName, rowIndex, projectionID.ProjectionName(), projectionID.PersistenceID()) - if err != nil { - // if the error is not found then return nil - if err == memdb.ErrNotFound { - return nil, nil - } - return nil, errors.Wrapf(err, "failed to get the current offset for persistenceId=%s given projection=%s", - projectionID.PersistenceID(), projectionID.ProjectionName()) - } - - // no record found - if raw == nil { - return nil, nil - } - - // cast the record - if offsetRow, ok := raw.(*offsetRow); ok { - current = &egopb.Offset{ - PersistenceId: offsetRow.PersistenceID, - ProjectionName: offsetRow.ProjectionName, - CurrentOffset: offsetRow.CurrentOffset, - Timestamp: offsetRow.LastUpdated, - } - return - } - - return nil, fmt.Errorf("failed to get the current offset for persistenceId=%s given projection=%s", - projectionID.PersistenceID(), projectionID.ProjectionName()) -} - -// toProto converts a byte array given its manifest into a valid proto message -func toProto(manifest string, bytea []byte) (*anypb.Any, error) { - mt, err := protoregistry.GlobalTypes.FindMessageByName(protoreflect.FullName(manifest)) - if err != nil { - return nil, err - } - - pm := mt.New().Interface() - err = proto.Unmarshal(bytea, pm) - if err != nil { - return nil, err - } - - if cast, ok := pm.(*anypb.Any); ok { - return cast, nil - } - return nil, fmt.Errorf("failed to unpack message=%s", manifest) -} diff --git a/offsetstore/memory/memory_test.go b/offsetstore/memory/memory_test.go deleted file mode 100644 index ba117d5..0000000 --- a/offsetstore/memory/memory_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package memory - -import ( - "context" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tochemey/ego/egopb" - "github.com/tochemey/ego/offsetstore" - "google.golang.org/protobuf/proto" -) - -func TestOffsetStore(t *testing.T) { - t.Run("testNew", func(t *testing.T) { - store := NewOffsetStore() - assert.NotNil(t, store) - var p interface{} = store - _, ok := p.(offsetstore.OffsetStore) - assert.True(t, ok) - }) - t.Run("testConnect", func(t *testing.T) { - ctx := context.TODO() - store := NewOffsetStore() - assert.NotNil(t, store) - err := store.Connect(ctx) - assert.NoError(t, err) - }) - t.Run("testWriteOffset", func(t *testing.T) { - ctx := context.TODO() - - store := NewOffsetStore() - assert.NotNil(t, store) - require.NoError(t, store.Connect(ctx)) - - persistenceID := uuid.NewString() - projectionName := "DB_WRITER" - timestamp := time.Now().UnixMilli() - - offset := &egopb.Offset{ - PersistenceId: persistenceID, - ProjectionName: projectionName, - CurrentOffset: 15, - Timestamp: timestamp, - } - - require.NoError(t, store.WriteOffset(ctx, offset)) - - err := store.Disconnect(ctx) - assert.NoError(t, err) - }) - - t.Run("testGetCurrentOffset: happy path", func(t *testing.T) { - ctx := context.TODO() - - store := NewOffsetStore() - assert.NotNil(t, store) - require.NoError(t, store.Connect(ctx)) - - persistenceID := uuid.NewString() - projectionName := "DB_WRITER" - timestamp := time.Now().UnixMilli() - - offset := &egopb.Offset{ - PersistenceId: persistenceID, - ProjectionName: projectionName, - CurrentOffset: 15, - Timestamp: timestamp, - } - - require.NoError(t, store.WriteOffset(ctx, offset)) - - offset = &egopb.Offset{ - PersistenceId: persistenceID, - ProjectionName: projectionName, - CurrentOffset: 24, - Timestamp: timestamp, - } - - require.NoError(t, store.WriteOffset(ctx, offset)) - - actual, err := store.GetCurrentOffset(ctx, offsetstore.NewProjectionID(projectionName, persistenceID)) - assert.NoError(t, err) - assert.NotNil(t, actual) - assert.True(t, proto.Equal(offset, actual)) - - assert.NoError(t, store.Disconnect(ctx)) - }) - t.Run("testGetCurrentOffset: not found", func(t *testing.T) { - ctx := context.TODO() - - store := NewOffsetStore() - assert.NotNil(t, store) - require.NoError(t, store.Connect(ctx)) - - persistenceID := uuid.NewString() - projectionName := "DB_WRITER" - - actual, err := store.GetCurrentOffset(ctx, offsetstore.NewProjectionID(projectionName, persistenceID)) - assert.NoError(t, err) - assert.Nil(t, actual) - - assert.NoError(t, store.Disconnect(ctx)) - }) -} diff --git a/offsetstore/memory/schemas.go b/offsetstore/memory/schemas.go deleted file mode 100644 index f4d9eed..0000000 --- a/offsetstore/memory/schemas.go +++ /dev/null @@ -1,91 +0,0 @@ -package memory - -import "github.com/hashicorp/go-memdb" - -// offsetRow represent the offset entry in the offset store -type offsetRow struct { - // Ordering basically used as PK - Ordering string - // ProjectionName is the projection name - ProjectionName string - // PersistenceID is the persistence ID - PersistenceID string - // CurrentOffset is the current offset - CurrentOffset uint64 - // Specifies the last update time - LastUpdated int64 -} - -const ( - offsetTableName = "offsets" - offsetPK = "id" - currentOffsetIndex = "currentOffset" - projectionNameIndex = "projectionName" - rowIndex = "rowIndex" - persistenceIDIndex = "persistenceId" -) - -var ( - // offsetSchema defines the offset schema - offsetSchema = &memdb.DBSchema{ - Tables: map[string]*memdb.TableSchema{ - offsetTableName: { - Name: offsetTableName, - Indexes: map[string]*memdb.IndexSchema{ - offsetPK: { - Name: offsetPK, - AllowMissing: false, - Unique: true, - Indexer: &memdb.StringFieldIndex{ - Field: "Ordering", - Lowercase: false, - }, - }, - persistenceIDIndex: { - Name: persistenceIDIndex, - AllowMissing: false, - Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "PersistenceID", - Lowercase: false, - }, - }, - currentOffsetIndex: { - Name: currentOffsetIndex, - AllowMissing: false, - Unique: false, - Indexer: &memdb.UintFieldIndex{ - Field: "CurrentOffset", - }, - }, - projectionNameIndex: { - Name: projectionNameIndex, - AllowMissing: false, - Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "ProjectionName", - }, - }, - rowIndex: { - Name: rowIndex, - AllowMissing: false, - Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "ProjectionName", - Lowercase: false, - }, - &memdb.StringFieldIndex{ - Field: "PersistenceID", - Lowercase: false, - }, - }, - AllowMissing: false, - }, - }, - }, - }, - }, - } -) diff --git a/offsetstore/projectionid.go b/offsetstore/projectionid.go deleted file mode 100644 index bed342c..0000000 --- a/offsetstore/projectionid.go +++ /dev/null @@ -1,29 +0,0 @@ -package offsetstore - -// ProjectionID is composed by the projection name and a given persistence ID -type ProjectionID struct { - // the projection projectionName. This must be unique within an actor system - // the same name is used in all projection IDs for a given a projection - projectionName string - // specifies the actual persistence ID - persistenceID string -} - -// NewProjectionID creates an instance of the ProjectionID given the name and a persistence ID -func NewProjectionID(name string, persistenceID string) *ProjectionID { - return &ProjectionID{ - projectionName: name, - persistenceID: persistenceID, - } -} - -// ProjectionName returns the projection name of a ProjectionID -// The projection name is shared across multiple instances of ProjectionID with different persistence ID -func (x ProjectionID) ProjectionName() string { - return x.projectionName -} - -// PersistenceID returns the persistence ID of a ProjectionID -func (x ProjectionID) PersistenceID() string { - return x.persistenceID -} diff --git a/option.go b/option.go index 2cd044b..6761558 100644 --- a/option.go +++ b/option.go @@ -3,7 +3,7 @@ package ego import ( "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/log" - "github.com/tochemey/goakt/pkg/telemetry" + "github.com/tochemey/goakt/telemetry" "go.uber.org/atomic" ) diff --git a/option_test.go b/option_test.go index 8e8a6ef..aca206d 100644 --- a/option_test.go +++ b/option_test.go @@ -3,13 +3,12 @@ package ego import ( "testing" - "go.uber.org/atomic" - "github.com/stretchr/testify/assert" "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/discovery/kubernetes" "github.com/tochemey/goakt/log" - "github.com/tochemey/goakt/pkg/telemetry" + "github.com/tochemey/goakt/telemetry" + "go.uber.org/atomic" ) func TestOptions(t *testing.T) { diff --git a/projection/projection.go b/projection/projection.go deleted file mode 100644 index 0bba01f..0000000 --- a/projection/projection.go +++ /dev/null @@ -1,361 +0,0 @@ -package projection - -import ( - "context" - "fmt" - "math" - "time" - - "github.com/flowchartsman/retry" - "github.com/pkg/errors" - "github.com/tochemey/ego/egopb" - "github.com/tochemey/ego/eventstore" - "github.com/tochemey/ego/internal/telemetry" - "github.com/tochemey/ego/offsetstore" - "github.com/tochemey/goakt/log" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/timestamppb" -) - -// Handler is used to handle event and state consumed from the event store -type Handler func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, offset uint64) error - -// Projection defines the projection -type Projection struct { - // Name specifies the projection Name - name string - // Logger specifies the logger - logger log.Logger - // Handler specifies the projection handler - handler Handler - // JournalStore specifies the journal store for reading events - eventsStore eventstore.EventsStore - // OffsetStore specifies the offset store to commit offsets - offsetsStore offsetstore.OffsetStore - // Specifies the recovery setting - recovery *Recovery - // stop signal - stopSignal chan struct{} - isStarted *atomic.Bool -} - -// New create an instance of Projection given the name of the projection, the handler and the offsets store -func New(name string, handler Handler, - eventsStore eventstore.EventsStore, - offsetStore offsetstore.OffsetStore, - recovery *Recovery, - logger log.Logger) *Projection { - return &Projection{ - handler: handler, - offsetsStore: offsetStore, - name: name, - eventsStore: eventsStore, - logger: logger, - recovery: recovery, - stopSignal: make(chan struct{}, 1), - isStarted: atomic.NewBool(false), - } -} - -// Start starts the projection -func (p *Projection) Start(ctx context.Context) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "PreStart") - defer span.End() - - if p.isStarted.Load() { - return nil - } - - // connect to the offset store - if p.offsetsStore == nil { - return errors.New("offsets store is not defined") - } - - // call the connect method of the journal store - if err := p.offsetsStore.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to the offsets store: %v", err) - } - - // connect to the events store - if p.eventsStore == nil { - return errors.New("events store is not defined") - } - - // call the connect method of the journal store - if err := p.eventsStore.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to the events store: %v", err) - } - - // we will ping the stores 5 times to see whether there have started successfully or not. - // The operation will be done in an exponential backoff mechanism with an initial delay of a second and a maximum delay of a second. - // Once the retries have completed and still not connected we fail the start process of the projection. - const ( - maxRetries = 5 - initialDelay = time.Second - maxDelay = time.Second - ) - // create a new instance of retrier that will try a maximum of five times, with - // an initial delay of 100 ms and a maximum delay of 1 second - retrier := retry.NewRetrier(maxRetries, initialDelay, maxDelay) - err := retrier.RunContext(ctx, func(ctx context.Context) error { - // we ping both stores in parallel - g, ctx := errgroup.WithContext(ctx) - // ping the journal store - g.Go(func() error { - return p.eventsStore.Ping(ctx) - }) - // ping the offset store - g.Go(func() error { - return p.offsetsStore.Ping(ctx) - }) - // return the result of operations - return g.Wait() - }) - - if err != nil { - return errors.Wrap(err, "failed to start the projection") - } - - // set the started status - p.isStarted.Store(true) - - // start processing - go p.processingLoop(ctx) - - return nil -} - -// Stop stops the projection -func (p *Projection) Stop(ctx context.Context) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "PostStop") - defer span.End() - - // check whether it is stopped or not - if !p.isStarted.Load() { - return nil - } - - // send the stop - close(p.stopSignal) - - // disconnect the events store - if err := p.eventsStore.Disconnect(ctx); err != nil { - return fmt.Errorf("failed to disconnect the events store: %v", err) - } - - // disconnect the offset store - if err := p.offsetsStore.Disconnect(ctx); err != nil { - return fmt.Errorf("failed to disconnect the offsets store: %v", err) - } - - // set the started status to false - p.isStarted.Store(false) - return nil -} - -// processingLoop is a loop that continuously runs to process events persisted onto the journal store until the projection is stopped -func (p *Projection) processingLoop(ctx context.Context) { - for { - select { - case <-p.stopSignal: - return - default: - g, ctx := errgroup.WithContext(ctx) - idsChan := make(chan string, 1) - - // let us push the id into the channel - g.Go(func() error { - // close the ids channel - defer close(idsChan) - - // pagination setting - pageSize := uint64(100) - pageToken := "" - // let us fetch all the persistence persistenceIDs - persistenceIDs, nextPageToken, err := p.eventsStore.PersistenceIDs(ctx, pageSize, pageToken) - // handle the error - if err != nil { - // log the error - p.logger.Error(errors.Wrap(err, "failed to fetch the list of persistence IDs")) - // here we stop the projection - err := p.Stop(ctx) - // handle the error - if err != nil { - // log the error - p.logger.Error(err) - return err - } - return err - } - - // finish fetching the list of persistence ids - if len(persistenceIDs) == 0 && nextPageToken == "" { - return nil - } - - // let us push the persistence id into the channel - for _, persistenceID := range persistenceIDs { - select { - case idsChan <- persistenceID: - case <-ctx.Done(): - return ctx.Err() - } - } - - // set the next page token - pageToken = nextPageToken - return nil - }) - - // Start a fixed number of goroutines process the persistenceIDs. - for i := 0; i < 20; i++ { - g.Go(func() error { - for persistenceID := range idsChan { - select { - case <-ctx.Done(): - return ctx.Err() - default: - return p.doProcess(ctx, persistenceID) - } - } - return nil - }) - } - - // wait for all the processing to be done - if err := g.Wait(); err != nil { - // log the error - p.logger.Error(err) - switch p.recovery.RecoveryStrategy() { - case egopb.ProjectionRecoveryStrategy_FAIL, - egopb.ProjectionRecoveryStrategy_RETRY_AND_FAIL: - // here we stop the projection - err := p.Stop(ctx) - // handle the error - if err != nil { - // log the error - p.logger.Error(err) - return - } - return - default: - // pass - } - } - } - } -} - -// doProcess processes all events of a given persistent entity and hand them over to the handler -func (p *Projection) doProcess(ctx context.Context, persistenceID string) error { - // add a span context - ctx, span := telemetry.SpanContext(ctx, "HandlePersistenceID") - defer span.End() - - if !p.isStarted.Load() { - return nil - } - - // get the latest offset persisted for the persistence id - offset, err := p.offsetsStore.GetCurrentOffset(ctx, offsetstore.NewProjectionID(p.name, persistenceID)) - if err != nil { - return err - } - - // define the fromSequenceNumber - fromSequenceNumber := offset.GetCurrentOffset() + 1 - - // fetch events - events, err := p.eventsStore.ReplayEvents(ctx, persistenceID, fromSequenceNumber, math.MaxUint64, math.MaxUint64) - if err != nil { - return err - } - - // grab the length of events - length := len(events) - - // there is nothing to process - if length == 0 { - return nil - } - - // define a variable that hold the number of events successfully processed - // iterate the events - for i := 0; i < length; i++ { - // grab the envelope - envelope := events[i] - // grab the data to pass to the projection handler - state := envelope.GetResultingState() - event := envelope.GetEvent() - seqNr := envelope.GetSequenceNumber() - - // send the request to the handler based upon the recovery strategy in place - switch p.recovery.RecoveryStrategy() { - case egopb.ProjectionRecoveryStrategy_FAIL: - // send the data to the handler. In case of error we log the error and fail the projection - if err := p.handler(ctx, persistenceID, event, state, seqNr); err != nil { - p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr)) - return err - } - - case egopb.ProjectionRecoveryStrategy_RETRY_AND_FAIL: - // grab the max retries amd delay - retries := p.recovery.Retries() - delay := p.recovery.RetryDelay() - // create a new exponential backoff that will try a maximum of retries times, with - // an initial delay of 100 ms and a maximum delay - backoff := retry.NewRetrier(int(retries), 100*time.Millisecond, delay) - // pass the data to the projection handler - if err := backoff.Run(func() error { - // handle the projection handler error - if err := p.handler(ctx, persistenceID, event, state, seqNr); err != nil { - p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr)) - return err - } - return nil - }); err != nil { - // because we fail we return the error without committing the offset - return err - } - - case egopb.ProjectionRecoveryStrategy_RETRY_AND_SKIP: - // grab the max retries amd delay - retries := p.recovery.Retries() - delay := p.recovery.RetryDelay() - // create a new exponential backoff that will try a maximum of retries times, with - // an initial delay of 100 ms and a maximum delay - backoff := retry.NewRetrier(int(retries), 100*time.Millisecond, delay) - // pass the data to the projection handler - if err := backoff.Run(func() error { - return p.handler(ctx, persistenceID, event, state, seqNr) - }); err != nil { - // here we just log the error, but we skip the event and commit the offset - p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr)) - } - - case egopb.ProjectionRecoveryStrategy_SKIP: - // send the data to the handler. In case of error we just log the error and skip the event by committing the offset - if err := p.handler(ctx, persistenceID, event, state, seqNr); err != nil { - p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr)) - } - } - // the envelope has been successfully processed - // here we commit the offset to the offset store and continue the next event - offset = &egopb.Offset{ - PersistenceId: persistenceID, - ProjectionName: p.name, - CurrentOffset: seqNr, - Timestamp: timestamppb.Now().AsTime().UnixMilli(), - } - // write the given offset and return any possible error - if err := p.offsetsStore.WriteOffset(ctx, offset); err != nil { - return errors.Wrapf(err, "failed to persist offset for persistence id=%s", persistenceID) - } - } - - return nil -} diff --git a/projection/projection_test.go b/projection/projection_test.go deleted file mode 100644 index 157b407..0000000 --- a/projection/projection_test.go +++ /dev/null @@ -1,343 +0,0 @@ -package projection - -import ( - "context" - "testing" - "time" - - "github.com/google/uuid" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tochemey/ego/egopb" - memeventstore "github.com/tochemey/ego/eventstore/memory" - "github.com/tochemey/ego/offsetstore" - "github.com/tochemey/ego/offsetstore/memory" - testpb "github.com/tochemey/ego/test/data/pb/v1" - "github.com/tochemey/goakt/log" - "go.uber.org/goleak" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/timestamppb" -) - -func TestProjection(t *testing.T) { - t.Run("with happy path", func(t *testing.T) { - defer goleak.VerifyNone(t) - ctx := context.TODO() - projectionName := "db-writer" - persistenceID := uuid.NewString() - logger := log.DefaultLogger - - // set up the event store - journalStore := memeventstore.NewEventsStore() - assert.NotNil(t, journalStore) - - // set up the offset store - offsetStore := memory.NewOffsetStore() - assert.NotNil(t, offsetStore) - - // set up the projection - // create a handler that return successfully - handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error { - return nil - } - - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger) - // start the projection - err := projection.Start(ctx) - require.NoError(t, err) - - // persist some events - state, err := anypb.New(new(testpb.Account)) - assert.NoError(t, err) - event, err := anypb.New(&testpb.AccountCredited{}) - assert.NoError(t, err) - - count := 10 - timestamp := timestamppb.Now() - journals := make([]*egopb.Event, count) - for i := 0; i < count; i++ { - seqNr := i + 1 - journals[i] = &egopb.Event{ - PersistenceId: persistenceID, - SequenceNumber: uint64(seqNr), - IsDeleted: false, - Event: event, - ResultingState: state, - Timestamp: timestamp.AsTime().Unix(), - } - } - - require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) - - // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) - - // let us grab the current offset - actual, err := offsetStore.GetCurrentOffset(ctx, offsetstore.NewProjectionID(projectionName, persistenceID)) - assert.NoError(t, err) - assert.NotNil(t, actual) - assert.EqualValues(t, 10, actual.GetCurrentOffset()) - - // free resources - assert.NoError(t, projection.Stop(ctx)) - }) - t.Run("with failed handler with fail strategy", func(t *testing.T) { - defer goleak.VerifyNone(t) - ctx := context.TODO() - projectionName := "db-writer" - persistenceID := uuid.NewString() - logger := log.DefaultLogger - - // set up the event store - journalStore := memeventstore.NewEventsStore() - assert.NotNil(t, journalStore) - - // set up the offset store - offsetStore := memory.NewOffsetStore() - assert.NotNil(t, offsetStore) - - // set up the projection - // create a handler that return successfully - handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error { - return errors.New("damn") - } - - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger) - // start the projection - err := projection.Start(ctx) - require.NoError(t, err) - - // persist some events - state, err := anypb.New(new(testpb.Account)) - assert.NoError(t, err) - event, err := anypb.New(&testpb.AccountCredited{}) - assert.NoError(t, err) - - count := 10 - timestamp := timestamppb.Now() - journals := make([]*egopb.Event, count) - for i := 0; i < count; i++ { - seqNr := i + 1 - journals[i] = &egopb.Event{ - PersistenceId: persistenceID, - SequenceNumber: uint64(seqNr), - IsDeleted: false, - Event: event, - ResultingState: state, - Timestamp: timestamp.AsTime().Unix(), - } - } - - require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) - - // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) - - // here due to the default recovery strategy the projection is stopped - require.False(t, projection.isStarted.Load()) - // free resources - assert.NoError(t, projection.Stop(ctx)) - }) - t.Run("with failed handler and retry_fail strategy", func(t *testing.T) { - defer goleak.VerifyNone(t) - ctx := context.TODO() - projectionName := "db-writer" - persistenceID := uuid.NewString() - logger := log.DefaultLogger - - // set up the event store - journalStore := memeventstore.NewEventsStore() - assert.NotNil(t, journalStore) - - // set up the offset store - offsetStore := memory.NewOffsetStore() - assert.NotNil(t, offsetStore) - - // set up the projection - // create a handler that return successfully - handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error { - return errors.New("damn") - } - - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery( - WithRecoveryStrategy(egopb.ProjectionRecoveryStrategy_RETRY_AND_FAIL), - WithRetries(2), - WithRetryDelay(100*time.Millisecond)), logger) - - // start the projection - err := projection.Start(ctx) - require.NoError(t, err) - - // persist some events - state, err := anypb.New(new(testpb.Account)) - assert.NoError(t, err) - event, err := anypb.New(&testpb.AccountCredited{}) - assert.NoError(t, err) - - count := 10 - timestamp := timestamppb.Now() - journals := make([]*egopb.Event, count) - for i := 0; i < count; i++ { - seqNr := i + 1 - journals[i] = &egopb.Event{ - PersistenceId: persistenceID, - SequenceNumber: uint64(seqNr), - IsDeleted: false, - Event: event, - ResultingState: state, - Timestamp: timestamp.AsTime().Unix(), - } - } - - require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) - - // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(1 * time.Second) - - // let us grab the current offset - require.False(t, projection.isStarted.Load()) - - // free resources - assert.NoError(t, projection.Stop(ctx)) - }) - t.Run("with failed handler and skip strategy", func(t *testing.T) { - defer goleak.VerifyNone(t) - ctx := context.TODO() - projectionName := "db-writer" - persistenceID := uuid.NewString() - logger := log.DefaultLogger - - // set up the event store - journalStore := memeventstore.NewEventsStore() - assert.NotNil(t, journalStore) - - // set up the offset store - offsetStore := memory.NewOffsetStore() - assert.NotNil(t, offsetStore) - - // set up the projection - // create a handler that return successfully - handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error { - if (int(sequenceNumber) % 2) == 0 { - return errors.New("failed handler") - } - return nil - } - - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery( - WithRecoveryStrategy(egopb.ProjectionRecoveryStrategy_SKIP), - WithRetries(2), - WithRetryDelay(100*time.Millisecond)), logger) - // start the projection - err := projection.Start(ctx) - require.NoError(t, err) - - // persist some events - state, err := anypb.New(new(testpb.Account)) - assert.NoError(t, err) - event, err := anypb.New(&testpb.AccountCredited{}) - assert.NoError(t, err) - - count := 10 - timestamp := timestamppb.Now() - journals := make([]*egopb.Event, count) - for i := 0; i < count; i++ { - seqNr := i + 1 - journals[i] = &egopb.Event{ - PersistenceId: persistenceID, - SequenceNumber: uint64(seqNr), - IsDeleted: false, - Event: event, - ResultingState: state, - Timestamp: timestamp.AsTime().Unix(), - } - } - - require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) - - // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) - - // let us grab the current offset - actual, err := offsetStore.GetCurrentOffset(ctx, offsetstore.NewProjectionID(projectionName, persistenceID)) - assert.NoError(t, err) - assert.NotNil(t, actual) - assert.EqualValues(t, 10, actual.GetCurrentOffset()) - - // free resource - assert.NoError(t, projection.Stop(ctx)) - }) - t.Run("with failed handler and skip retry strategy", func(t *testing.T) { - defer goleak.VerifyNone(t) - ctx := context.TODO() - projectionName := "db-writer" - persistenceID := uuid.NewString() - logger := log.DefaultLogger - - // set up the event store - journalStore := memeventstore.NewEventsStore() - assert.NotNil(t, journalStore) - - // set up the offset store - offsetStore := memory.NewOffsetStore() - assert.NotNil(t, offsetStore) - - // set up the projection - // create a handler that return successfully - handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error { - if (int(sequenceNumber) % 2) == 0 { - return errors.New("failed handler") - } - return nil - } - - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery( - WithRecoveryStrategy(egopb.ProjectionRecoveryStrategy_RETRY_AND_SKIP), - WithRetries(2), - WithRetryDelay(100*time.Millisecond)), logger) - // start the projection - err := projection.Start(ctx) - require.NoError(t, err) - - // persist some events - state, err := anypb.New(new(testpb.Account)) - assert.NoError(t, err) - event, err := anypb.New(&testpb.AccountCredited{}) - assert.NoError(t, err) - - count := 10 - timestamp := timestamppb.Now() - journals := make([]*egopb.Event, count) - for i := 0; i < count; i++ { - seqNr := i + 1 - journals[i] = &egopb.Event{ - PersistenceId: persistenceID, - SequenceNumber: uint64(seqNr), - IsDeleted: false, - Event: event, - ResultingState: state, - Timestamp: timestamp.AsTime().Unix(), - } - } - - require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) - - // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) - - // let us grab the current offset - actual, err := offsetStore.GetCurrentOffset(ctx, offsetstore.NewProjectionID(projectionName, persistenceID)) - assert.NoError(t, err) - assert.NotNil(t, actual) - assert.EqualValues(t, 10, actual.GetCurrentOffset()) - - // free resource - assert.NoError(t, projection.Stop(ctx)) - }) -} diff --git a/projection/recovery.go b/projection/recovery.go deleted file mode 100644 index 563e540..0000000 --- a/projection/recovery.go +++ /dev/null @@ -1,89 +0,0 @@ -package projection - -import ( - "time" - - "github.com/tochemey/ego/egopb" -) - -// Recovery specifies the various recovery settings of a projection -// The option helps defines what happens when the projection handler fails to process -// the consumed event for a given persistence ID -type Recovery struct { - // retries specifies the number of times to retry handler function. - // This is only applicable to `RETRY_AND_FAIL` and `RETRY_AND_SKIP` recovery strategies - // The default value is 5 - retries uint64 - // retryDelay specifies the delay between retry attempts - // This is only applicable to `RETRY_AND_FAIL` and `RETRY_AND_SKIP` recovery strategies - // The default value is 1 second - retryDelay time.Duration - // strategy specifies strategy to use to recover from unhandled exceptions without causing the projection to fail - strategy egopb.ProjectionRecoveryStrategy -} - -// NewRecovery creates an instance of Recovery -func NewRecovery(options ...Option) *Recovery { - cfg := &Recovery{ - retries: 5, - retryDelay: time.Second, - strategy: egopb.ProjectionRecoveryStrategy_FAIL, - } - // apply the various options - for _, opt := range options { - opt.Apply(cfg) - } - - return cfg -} - -// Retries returns the number of times to retry handler function. -func (c Recovery) Retries() uint64 { - return c.retries -} - -// RetryDelay returns the delay between retry attempts -func (c Recovery) RetryDelay() time.Duration { - return c.retryDelay -} - -// RecoveryStrategy returns the recovery strategy -func (c Recovery) RecoveryStrategy() egopb.ProjectionRecoveryStrategy { - return c.strategy -} - -// Option is the interface that applies a recoveryuration option. -type Option interface { - // Apply sets the Option value of a recovery. - Apply(recovery *Recovery) -} - -var _ Option = OptionFunc(nil) - -// OptionFunc implements the Option interface. -type OptionFunc func(recovery *Recovery) - -func (f OptionFunc) Apply(c *Recovery) { - f(c) -} - -// WithRetries sets the number of retries -func WithRetries(retries uint64) Option { - return OptionFunc(func(recovery *Recovery) { - recovery.retries = retries - }) -} - -// WithRetryDelay sets the retry delay -func WithRetryDelay(delay time.Duration) Option { - return OptionFunc(func(recovery *Recovery) { - recovery.retryDelay = delay - }) -} - -// WithRecoveryStrategy sets the recovery strategy -func WithRecoveryStrategy(strategy egopb.ProjectionRecoveryStrategy) Option { - return OptionFunc(func(recovery *Recovery) { - recovery.strategy = strategy - }) -} diff --git a/protos/ego/v1/ego.proto b/protos/ego/v1/ego.proto index 6d4ae14..0d86346 100644 --- a/protos/ego/v1/ego.proto +++ b/protos/ego/v1/ego.proto @@ -29,7 +29,7 @@ message Event { message CommandReply { // the actual command reply oneof reply { - // actual state is wrapped with meta data + // actual state is wrapped with metadata StateReply state_reply = 1; // gRPC failure ErrorReply error_reply = 2; diff --git a/readme.md b/readme.md index 19b149f..bd34e52 100644 --- a/readme.md +++ b/readme.md @@ -25,42 +25,38 @@ Under the hood, ego leverages [goakt](https://github.com/Tochemey/goakt) to scal - the event timestamp - [NoReply](protos): this message is returned when the command does not need a reply. - [ErrorReply](protos): is used when a command processing has failed. This message contains the error message. - Once the events are persisted, they are applied to the state producing a new valid state. + Once the events are persisted, they are applied to the state producing a new valid state. - Events handler: The event handlers are used to mutate the state of the Aggregate by applying the events to it. Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store. - - Extensible events store + - Extensible events store - Built-in events store - - Postgres - - Memory + - Postgres + - Memory (only for testing purpose) - [Cluster Mode](https://github.com/Tochemey/goakt#clustering) -- Read Model: To enable the read model just start the projection engine with an event handler and one can build a read model of all events persisted by the write model. - At the moment the projection engine only come bundled with an in-memory offset store. One can easily implement an offset store using the interface provided. - Examples (check the [examples](./example)) -## TODOs -- [ ] Add cursor pagination for events store listing of persistence ids -- [ ] Add some compaction mechanism to the events store -- [ ] Enhance projection using sharding -- [ ] Rewrite the projection implementation - ## Installation + ```bash go get github.com/tochemey/ego ``` - ## Contribution + Contributions are welcome! The project adheres to [Semantic Versioning](https://semver.org) and [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/). This repo uses [Earthly](https://earthly.dev/get-earthly). To contribute please: + - Fork the repository - Create a feature branch - Submit a [pull request](https://help.github.com/articles/using-pull-requests) ### Test & Linter + Prior to submitting a [pull request](https://help.github.com/articles/using-pull-requests), please run: + ```bash earthly +test ```