Skip to content

Commit

Permalink
CreateTrigger and DropTrigger (#79)
Browse files Browse the repository at this point in the history
This PR adds `trigger` functionality for the cases where built-in
capabilities aren't sufficient. Triggers allow you to add a callback
function that will get called whether a value is `inserted`, `updated`
or `deleted`. It functions similarly to the bitmap index.


```go
players.CreateTrigger("on_balance", "balance", func(r Reader) {
	switch {
	case r.IsDelete():
		updates = append(updates, fmt.Sprintf("delete %d", r.Index()))
	case r.IsUpsert():
		updates = append(updates, fmt.Sprintf("upsert %d=%v", r.Index(), r.Float()))
	}
})

// Perform a few deletions and insertions
for i := 0; i < 3; i++ {
	players.DeleteAt(uint32(i))
	players.Insert(func(r Row) error {
		r.SetFloat64("balance", 50.0)
		return nil
	})
}

// Must keep track of all operations
assert.Len(t, updates, 6)
assert.Equal(t, []string{
	"delete 0", 
	"upsert 500=50", 
	"delete 1", 
	"upsert 501=50", 
	"delete 2",
	"upsert 502=50",
}, updates)
```
  • Loading branch information
kelindar authored Dec 3, 2022
1 parent 501708a commit 6cf7de3
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 2 deletions.
42 changes: 42 additions & 0 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,48 @@ func (c *Collection) DropColumn(columnName string) {
c.cols.DeleteColumn(columnName)
}

// CreateTrigger creates an trigger column with a specified name which depends on a given
// column. The trigger function will be applied on the values of the column whenever
// a new row is added, updated or deleted.
func (c *Collection) CreateTrigger(triggerName, columnName string, fn func(r Reader)) error {
if fn == nil || columnName == "" || triggerName == "" {
return fmt.Errorf("column: create trigger must specify name, column and function")
}

// Prior to creating an index, we should have a column
column, ok := c.cols.Load(columnName)
if !ok {
return fmt.Errorf("column: unable to create trigger, column '%v' does not exist", columnName)
}

// Create and add the trigger column
trigger := newTrigger(triggerName, columnName, fn)
c.lock.Lock()
c.cols.Store(triggerName, trigger)
c.cols.Store(columnName, column, trigger)
c.lock.Unlock()
return nil
}

// DropTrigger removes the trigger column with the specified name. If the trigger with this
// name does not exist, this operation is a no-op.
func (c *Collection) DropTrigger(triggerName string) error {
column, exists := c.cols.Load(triggerName)
if !exists {
return fmt.Errorf("column: unable to drop index, index '%v' does not exist", triggerName)
}

if _, ok := column.Column.(computed); !ok {
return fmt.Errorf("column: unable to drop index, '%v' is not a trigger", triggerName)
}

// Figure out the associated column and delete the index from that
columnName := column.Column.(computed).Column()
c.cols.DeleteIndex(columnName, triggerName)
c.cols.DeleteColumn(triggerName)
return nil
}

// CreateIndex creates an index column with a specified name which depends on a given
// column. The index function will be applied on the values of the column whenever
// a new row is added or updated.
Expand Down
57 changes: 57 additions & 0 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,63 @@ func TestReplica(t *testing.T) {
})
}

// --------------------------- Create/Drop Trigger ----------------------------

func TestTriggerCreate(t *testing.T) {
updates := make([]string, 0, 128)
players := loadPlayers(500)
players.CreateTrigger("on_balance", "balance", func(r Reader) {
switch {
case r.IsDelete():
updates = append(updates, fmt.Sprintf("delete %d", r.Index()))
case r.IsUpsert():
updates = append(updates, fmt.Sprintf("upsert %d=%v", r.Index(), r.Float()))
}
})

// Perform a few deletions and insertions
for i := 0; i < 3; i++ {
players.DeleteAt(uint32(i))
players.Insert(func(r Row) error {
r.SetFloat64("balance", 50.0)
return nil
})
}

// Must keep track of all operations
assert.Len(t, updates, 6)
assert.Equal(t, []string{"delete 0", "upsert 500=50", "delete 1", "upsert 501=50", "delete 2", "upsert 502=50"}, updates)
assert.NoError(t, players.DropTrigger("on_balance"))

// Must not drop if doesn't exist or not a trigger
assert.Error(t, players.DropTrigger("on_balance"))
assert.Error(t, players.DropTrigger("balance"))

// After dropping, should not trigger anymore
players.DeleteAt(100)
assert.Len(t, updates, 6)
}

func TestTriggerInvalid(t *testing.T) {
players := newEmpty(10)
assert.Error(t, players.CreateTrigger("on_balance", "invalid", func(r Reader) {}))
assert.Error(t, players.CreateTrigger("", "", nil))
}

func TestTriggerImpl(t *testing.T) {
column := newTrigger("test", "target", func(r Reader) {}).Column
v, ok := column.Value(0)

assert.Nil(t, v)
assert.False(t, ok)
assert.False(t, column.Contains(0))
assert.Nil(t, column.Index(0))
assert.NotPanics(t, func() {
column.Grow(100)
column.Snapshot(0, nil)
})
}

// --------------------------- Mocks & Fixtures ----------------------------

// loadPlayers loads a list of players from the fixture
Expand Down
61 changes: 59 additions & 2 deletions column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Reader represents a reader cursor for a specific row/column combination.
type Reader interface {
IsUpsert() bool
IsDelete() bool
Index() uint32
String() string
Bytes() []byte
Expand All @@ -25,13 +27,13 @@ type Reader interface {
// this so that we can feed it to the index transparently.
var _ Reader = new(commit.Reader)

// --------------------------- Index ----------------------------

// computed represents a computed column
type computed interface {
Column() string
}

// --------------------------- Index ----------------------------

// columnIndex represents the index implementation
type columnIndex struct {
fill bitmap.Bitmap // The fill list for the column
Expand Down Expand Up @@ -100,3 +102,58 @@ func (c *columnIndex) Index(chunk commit.Chunk) bitmap.Bitmap {
func (c *columnIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
dst.PutBitmap(commit.PutTrue, chunk, c.fill)
}

// --------------------------- Trigger ----------------------------

// columnTrigger represents the trigger implementation
type columnTrigger struct {
name string // The name of the target column
clbk func(Reader) // The trigger callback
}

// newTrigger creates a new trigger column.
func newTrigger(indexName, columnName string, callback func(r Reader)) *column {
return columnFor(indexName, &columnTrigger{
name: columnName,
clbk: callback,
})
}

// Grow grows the size of the column until we have enough to store
func (c *columnTrigger) Grow(idx uint32) {
// Noop
}

// Column returns the target name of the column on which this index should apply.
func (c *columnTrigger) Column() string {
return c.name
}

// Apply applies a set of operations to the column.
func (c *columnTrigger) Apply(chunk commit.Chunk, r *commit.Reader) {
for r.Next() {
if r.Type == commit.Put || r.Type == commit.Delete {
c.clbk(r)
}
}
}

// Value retrieves a value at a specified index.
func (c *columnTrigger) Value(idx uint32) (v any, ok bool) {
return nil, false
}

// Contains checks whether the column has a value at a specified index.
func (c *columnTrigger) Contains(idx uint32) bool {
return false
}

// Index returns the fill list for the column
func (c *columnTrigger) Index(chunk commit.Chunk) bitmap.Bitmap {
return nil
}

// Snapshot writes the entire column into the specified destination buffer
func (c *columnTrigger) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
// Noop
}
10 changes: 10 additions & 0 deletions commit/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ func (r *Reader) Bool() bool {
return r.Type == PutTrue
}

// IsUpsert returns true if the current operation is an insert or update
func (r *Reader) IsUpsert() bool {
return r.Type == Put
}

// IsDelete returns true if the current operation is a deletion
func (r *Reader) IsDelete() bool {
return r.Type == Delete
}

// --------------------------- Value Swap ----------------------------

// SwapInt16 swaps a uint16 value with a new one.
Expand Down
13 changes: 13 additions & 0 deletions commit/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,16 @@ func TestMergeStrings(t *testing.T) {
"(put) 36",
}, scanned)
}

func TestReaderIsUpsert(t *testing.T) {
buf := NewBuffer(0)
buf.PutFloat32(Put, 0, 10)
buf.PutFloat32(Delete, 0, 0)

r := NewReader()
r.Seek(buf)
assert.True(t, r.Next())
assert.True(t, r.IsUpsert())
assert.True(t, r.Next())
assert.True(t, r.IsDelete())
}

0 comments on commit 6cf7de3

Please sign in to comment.