Skip to content

Commit

Permalink
Merge changes from #5377
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Dec 19, 2023
2 parents 049d58f + f84f882 commit 142f4c1
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 3 deletions.
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func MainnetConfig() Config {
Standalone: false,
GossipDuration: 50 * time.Second,
OutOfSyncThresholdLayers: 36, // 3h
DisableAtxSync: true,
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
13 changes: 11 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/localsql"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
Expand Down Expand Up @@ -1159,10 +1160,18 @@ func (app *App) listenToUpdates(ctx context.Context) {
app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)
app.proposalBuilder.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)

if err := app.fetcher.GetAtxs(ctx, update.Data.ActiveSet); err != nil {
app.eg.Go(func() error {
err := atxsync.Download(
ctx,
10*time.Second,
app.addLogger(SyncLogger, app.log).Zap(),
app.db,
app.fetcher,
update.Data.ActiveSet,
)
app.errCh <- err
return nil
}
})
}
}
}
Expand Down
75 changes: 75 additions & 0 deletions syncer/atxsync/atxsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package atxsync

import (
"context"
"math/rand"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go
type atxFetcher interface {
GetAtxs(context.Context, []types.ATXID) error
}

func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) {
missing := []types.ATXID{}
for _, atx := range set {
exist, err := atxs.Has(db, atx)
if err != nil {
return nil, err
}
if !exist {
missing = append(missing, atx)
}
}
return missing, nil
}

// Download specified set of atxs from peers in the network.
//
// actual retry interval will be between [retryInterval, 2*retryInterval]
func Download(
ctx context.Context,
retryInterval time.Duration,
logger *zap.Logger,
db *sql.Database,
fetcher atxFetcher,
set []types.ATXID,
) error {
total := len(set)
for {
missing, err := getMissing(db, set)
if err != nil {
return err
}
set = missing
downloaded := total - len(missing)
logger.Info("downloaded atxs",
zap.Int("total", total),
zap.Int("downloaded", downloaded),
zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, atx := range missing {
enc.AppendString(atx.ShortString())
}
return nil
})))
if downloaded == total {
return nil
}
if err := fetcher.GetAtxs(ctx, missing); err != nil {
logger.Debug("failed to fetch atxs", zap.Error(err))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))):
}
}
}
}
122 changes: 122 additions & 0 deletions syncer/atxsync/atxsync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package atxsync

import (
"context"
"errors"
"testing"
"time"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func atx(id types.ATXID) *types.VerifiedActivationTx {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(1)
atx.SetReceived(time.Now())
copy(atx.SmesherID[:], id[:])
vatx, err := atx.Verify(0, 1)
if err != nil {
panic(err)
}
return vatx
}

func id(id ...byte) types.ATXID {
var atxid types.ATXID
copy(atxid[:], id)
return atxid
}

type fetchRequest struct {
request []types.ATXID
result []*types.VerifiedActivationTx
error error
}

func TestDownload(t *testing.T) {
canceled, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range []struct {
desc string
ctx context.Context
retry time.Duration
existing []*types.VerifiedActivationTx
set []types.ATXID
fetched []fetchRequest
rst error
}{
{
desc: "all existing",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "with multiple requests",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2), id(3)}, result: []*types.VerifiedActivationTx{atx(id(2))}},
{request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}},
},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "continue on error",
ctx: context.Background(),
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
{request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}},
},
set: []types.ATXID{id(1), id(2)},
},
{
desc: "exit on context",
ctx: canceled,
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
},
set: []types.ATXID{id(1), id(2)},
rst: context.Canceled,
},
} {
t.Run(tc.desc, func(t *testing.T) {
logger := logtest.New(t)
db := sql.InMemory()
ctrl := gomock.NewController(t)
fetcher := mocks.NewMockatxFetcher(ctrl)
for _, atx := range tc.existing {
require.NoError(t, atxs.Add(db, atx))
}
for i := range tc.fetched {
req := tc.fetched[i]
fetcher.EXPECT().
GetAtxs(tc.ctx, req.request).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.ATXID) error {
for _, atx := range req.result {
require.NoError(t, atxs.Add(db, atx))
}
return req.error
})
}
require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set))
})
}
}
78 changes: 78 additions & 0 deletions syncer/atxsync/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
MaxStaleDuration time.Duration
Standalone bool
GossipDuration time.Duration
DisableAtxSync bool `mapstructure:"disable-atx-sync"`
OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"`
}

Expand Down Expand Up @@ -440,7 +441,10 @@ func (s *Syncer) syncAtx(ctx context.Context) error {
return err
}
}

if s.cfg.DisableAtxSync {
s.logger.Debug("atx sync disabled")
return nil
}
// steady state atx syncing
curr := s.ticker.CurrentLayer()
if float64(
Expand Down

0 comments on commit 142f4c1

Please sign in to comment.