Skip to content

Commit

Permalink
feat(pruner/light): add light pruning for shwap (#3896)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Nov 1, 2024
1 parent 0733c8d commit 1fbb040
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 209 deletions.
1 change: 0 additions & 1 deletion nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return fx.Module("prune",
baseComponents,
prunerService,
fx.Provide(light.NewPruner),
)
}
// We do not trigger DetectPreviousRun for Light nodes, to allow them to disable pruning at wish.
Expand Down
10 changes: 6 additions & 4 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/celestiaorg/celestia-node/nodebuilder/node"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
lightprune "github.com/celestiaorg/celestia-node/pruner/light"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
Expand Down Expand Up @@ -222,20 +223,21 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
case node.Light:
return fx.Options(
fx.Provide(fx.Annotate(
func(getter shwap.Getter, ds datastore.Batching) *light.ShareAvailability {
func(getter shwap.Getter, ds datastore.Batching, bs blockstore.Blockstore) *light.ShareAvailability {
return light.NewShareAvailability(
getter,
ds,
bs,
light.WithSampleAmount(cfg.LightAvailability.SampleAmount),
)
},
fx.As(fx.Self()),
fx.As(new(share.Availability)),
fx.As(new(pruner.Pruner)), // TODO(@walldiss): remove conversion after Availability and Pruner interfaces are merged
fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error {
return la.Close(ctx)
}),
)),
fx.Provide(func(avail *light.ShareAvailability) share.Availability {
return avail
}),
)
case node.Bridge, node.Full:
return fx.Options(
Expand Down
46 changes: 0 additions & 46 deletions pruner/light/pruner.go

This file was deleted.

66 changes: 64 additions & 2 deletions share/availability/light/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
"github.com/ipfs/go-datastore/namespace"
Expand All @@ -16,7 +17,9 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
)

var (
Expand All @@ -31,6 +34,7 @@ var (
// on the network doing sampling over the same Root to collectively verify its availability.
type ShareAvailability struct {
getter shwap.Getter
bs blockstore.Blockstore
params Parameters

activeHeights *utils.Sessions
Expand All @@ -42,6 +46,7 @@ type ShareAvailability struct {
func NewShareAvailability(
getter shwap.Getter,
ds datastore.Batching,
bs blockstore.Blockstore,
opts ...Option,
) *ShareAvailability {
params := *DefaultParameters()
Expand All @@ -54,6 +59,7 @@ func NewShareAvailability(

return &ShareAvailability{
getter: getter,
bs: bs,
params: params,
activeHeights: utils.NewSessions(),
ds: autoDS,
Expand All @@ -70,7 +76,7 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
return nil
}

// Prevent multiple sampling sessions for the same header height
// Prevent multiple sampling and pruning sessions for the same header height
release, err := la.activeHeights.StartSession(ctx, header.Height())
if err != nil {
return err
Expand Down Expand Up @@ -153,7 +159,7 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
err = la.ds.Put(ctx, key, updatedData)
la.dsLk.Unlock()
if err != nil {
return fmt.Errorf("failed to store sampling result: %w", err)
return fmt.Errorf("store sampling result: %w", err)
}

if errors.Is(ctx.Err(), context.Canceled) {
Expand All @@ -169,6 +175,62 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
return nil
}

// Prune deletes samples and all sampling data corresponding to provided header from store.
// The operation will remove all data that ShareAvailable might have created
func (la *ShareAvailability) Prune(ctx context.Context, h *header.ExtendedHeader) error {
dah := h.DAH
if share.DataHash(dah.Hash()).IsEmptyEDS() {
return nil
}

// Prevent multiple sampling and pruning sessions for the same header height
release, err := la.activeHeights.StartSession(ctx, h.Height())
if err != nil {
return err
}
defer release()

key := datastoreKeyForRoot(dah)
la.dsLk.RLock()
data, err := la.ds.Get(ctx, key)
la.dsLk.RUnlock()
if errors.Is(err, datastore.ErrNotFound) {
// nothing to prune
return nil
}
if err != nil {
return fmt.Errorf("get sampling result: %w", err)
}

var result SamplingResult
err = json.Unmarshal(data, &result)
if err != nil {
return fmt.Errorf("unmarshal sampling result: %w", err)
}

// delete stored samples
for _, sample := range result.Available {
blk, err := bitswap.NewEmptySampleBlock(h.Height(), sample.Row, sample.Col, len(h.DAH.RowRoots))
if err != nil {
return fmt.Errorf("marshal sample ID: %w", err)
}
err = la.bs.DeleteBlock(ctx, blk.CID())
if err != nil {
if !errors.Is(err, ipld.ErrNodeNotFound) {
return fmt.Errorf("delete sample: %w", err)
}
log.Warnf("can't delete sample: %v, height: %v, missing in blockstore", sample, h.Height())
}
}

// delete the sampling result
err = la.ds.Delete(ctx, key)
if err != nil {
return fmt.Errorf("delete sampling result: %w", err)
}
return nil
}

func datastoreKeyForRoot(root *share.AxisRoots) datastore.Key {
return datastore.NewKey(root.String())
}
Expand Down
Loading

0 comments on commit 1fbb040

Please sign in to comment.