Skip to content

Commit

Permalink
Avoid processing same ATX in parallel (#5379)
Browse files Browse the repository at this point in the history
## Motivation
There is no mechanism preventing duplicated processing of ATX in parallel. An ATX might be gossiped multiple times while the same one is already being processed. Related issue: #4426

## Changes
The ATX handler now registers the ATXID when it starts processing it. The subsequent calls to `handleATX()` will find that the handler is already processing this ATX and will register themself for the result, which is sent over a channel after the ATX processing is finished.

## Test Plan
Added a UT in which an ATX is gossiped 10x in parallel. The expectation is that it's processed only once.
  • Loading branch information
poszu committed Dec 20, 2023
1 parent 0e55663 commit fce85bf
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
42 changes: 40 additions & 2 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type Handler struct {
mu sync.Mutex
fetcher system.Fetcher
poetCfg PoetConfig

// inProgress map gathers ATXs that are currently being processed.
// It's used to avoid processing the same ATX twice.
inProgress map[types.ATXID][]chan error
inProgressMu sync.Mutex
}

// NewHandler returns a data handler for ATX.
Expand Down Expand Up @@ -79,6 +84,8 @@ func NewHandler(
beacon: beacon,
tortoise: tortoise,
poetCfg: poetCfg,

inProgress: make(map[types.ATXID][]chan error),
}
}

Expand Down Expand Up @@ -458,12 +465,43 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
if err := codec.Decode(msg, &atx); err != nil {
return fmt.Errorf("%w: %w", errMalformedData, err)
}

atx.SetReceived(receivedTime.Local())
if err := atx.Initialize(); err != nil {
return fmt.Errorf("failed to derive ID from atx: %w", err)
}

// Check if processing is already in progress
h.inProgressMu.Lock()
if sub, ok := h.inProgress[atx.ID()]; ok {
ch := make(chan error, 1)
h.inProgress[atx.ID()] = append(sub, ch)
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Debug("atx is already being processed. waiting for result", atx.ID())
select {
case err := <-ch:
h.log.WithContext(ctx).With().Debug("atx processed in other task", atx.ID(), log.Err(err))
return err
case <-ctx.Done():
return ctx.Err()

Check warning on line 485 in activation/handler.go

View check run for this annotation

Codecov / codecov/patch

activation/handler.go#L484-L485

Added lines #L484 - L485 were not covered by tests
}
}

h.inProgress[atx.ID()] = []chan error{}
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Info("handling incoming atx", atx.ID(), log.Int("size", len(msg)))

err := h.processAtx(ctx, expHash, peer, atx)
h.inProgressMu.Lock()
defer h.inProgressMu.Unlock()
for _, ch := range h.inProgress[atx.ID()] {
ch <- err
close(ch)
}
delete(h.inProgress, atx.ID())
return err
}

func (h *Handler) processAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, atx types.ActivationTx) error {
if !h.edVerifier.Verify(signing.ATX, atx.SmesherID, atx.SignedBytes(), atx.Signature) {
return fmt.Errorf("failed to verify atx signature: %w", errMalformedData)
}
Expand Down Expand Up @@ -495,7 +533,7 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
return fmt.Errorf("cannot process atx %v: %w", atx.ShortString(), err)
}
events.ReportNewActivation(vAtx)
h.log.WithContext(ctx).With().Info("new atx", log.Inline(vAtx), log.Int("size", len(msg)))
h.log.WithContext(ctx).With().Info("new atx", log.Inline(vAtx))
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -906,6 +907,74 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), "", secondData))
}

func TestHandler_HandleParallelGossipAtx(t *testing.T) {
goldenATXID := types.ATXID{2, 3, 4}
atxHdlr := newTestHandler(t, goldenATXID)

sig, err := signing.NewEdSigner()
require.NoError(t, err)
nodeID := sig.NodeID()
nipost := newNIPostWithChallenge(t, types.HexToHash32("0x3333"), []byte{0xba, 0xbe})
vrfNonce := types.VRFPostIndex(12345)
atx := &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
PrevATXID: types.EmptyATXID,
PositioningATX: goldenATXID,
CommitmentATX: &goldenATXID,
InitialPost: nipost.Post,
},
Coinbase: types.Address{2, 3, 4},
NumUnits: 2,
NIPost: nipost,
NodeID: &nodeID,
VRFNonce: &vrfNonce,
},
SmesherID: nodeID,
}
atx.Signature = sig.Sign(signing.ATX, atx.SignedBytes())
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
_, err = atx.Verify(0, 2)
require.NoError(t, err)

atxData, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().CurrentLayer().Return(atx.PublishEpoch.FirstLayer())
atxHdlr.mValidator.EXPECT().VRFNonce(nodeID, goldenATXID, &vrfNonce, gomock.Any(), atx.NumUnits)
atxHdlr.mValidator.EXPECT().Post(
gomock.Any(),
atx.SmesherID,
goldenATXID,
atx.InitialPost,
gomock.Any(),
atx.NumUnits,
).DoAndReturn(
func(_ context.Context, _ types.NodeID, _ types.ATXID, _ *types.Post, _ *types.PostMetadata, _ uint32) error {
time.Sleep(100 * time.Millisecond)
return nil
},
)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mValidator.EXPECT().InitialNIPostChallenge(&atx.NIPostChallenge, gomock.Any(), goldenATXID)
atxHdlr.mValidator.EXPECT().PositioningAtx(goldenATXID, gomock.Any(), goldenATXID, atx.PublishEpoch)
atxHdlr.mValidator.EXPECT().NIPost(gomock.Any(), nodeID, goldenATXID, atx.NIPost, gomock.Any(), atx.NumUnits)
atxHdlr.mbeacon.EXPECT().OnAtx(gomock.Any())
atxHdlr.mtortoise.EXPECT().OnAtx(gomock.Any())

var eg errgroup.Group
for i := 0; i < 10; i++ {
eg.Go(func() error {
return atxHdlr.HandleGossipAtx(context.Background(), "", atxData)
})
}

require.NoError(t, eg.Wait())
}

func TestHandler_HandleSyncedAtx(t *testing.T) {
// Arrange
goldenATXID := types.ATXID{2, 3, 4}
Expand Down

0 comments on commit fce85bf

Please sign in to comment.