Skip to content

Commit

Permalink
Fix swap and record index (#76)
Browse files Browse the repository at this point in the history
## Bugfixes
* `Swap...` was not properly swapping when an underlying buffer was
re-allocated due to `append()`
* Index for `Record` types was not working when strings are being merged
due to the fact that it could not see the newly appended values.

## Optimizations
* When swapping strings, if the string is of exactly the same size as
before, it will be swapped in-place (same as for other values).
  • Loading branch information
kelindar authored Dec 1, 2022
1 parent 47f62f7 commit 501708a
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 116 deletions.
3 changes: 2 additions & 1 deletion column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type Reader interface {
Index() uint32
String() string
Bytes() []byte
Float() float64
Int() int
Uint() uint
Expand Down Expand Up @@ -65,7 +66,7 @@ func (c *columnIndex) Apply(chunk commit.Chunk, r *commit.Reader) {
// on the actual column.
for r.Next() {
switch r.Type {
case commit.Put, commit.Merge:
case commit.Put:
if c.rule(r) {
c.fill.Set(uint32(r.Offset))
} else {
Expand Down
40 changes: 34 additions & 6 deletions column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,17 +580,30 @@ func TestRecord(t *testing.T) {
col.CreateColumn("ts", ForRecord(func() *time.Time {
return new(time.Time)
}))
col.CreateIndex("recent", "ts", func(r Reader) bool {
var ts time.Time
if err := ts.UnmarshalBinary(r.Bytes()); err == nil {
return ts.After(time.Unix(1667745800, 0))
}
return false
})

// Insert the time, it implements binary marshaler
idx, _ := col.Insert(func(r Row) error {
now := time.Unix(1667745766, 0)
now := time.Unix(1667745700, 0)
r.SetRecord("ts", &now)
return nil
})

// Index should not have any recent
col.Query(func(txn *Txn) error {
assert.Equal(t, 1, txn.Without("recent").Count())
return nil
})

// We should be able to read back the time
col.QueryAt(idx, func(r Row) error {
now := time.Unix(1667745766, 0)
now := time.Unix(1667745900, 0)
r.MergeRecord("ts", &now)
return nil
})
Expand All @@ -602,6 +615,12 @@ func TestRecord(t *testing.T) {
assert.Equal(t, "November", ts.(*time.Time).UTC().Month().String())
return nil
})

// Merge should have updated the index as well
col.Query(func(txn *Txn) error {
assert.Equal(t, 1, txn.With("recent").Count())
return nil
})
}

func TestRecord_Errors(t *testing.T) {
Expand Down Expand Up @@ -686,22 +705,31 @@ func TestNumberMerge(t *testing.T) {
return v
})))

col.CreateIndex("young", "age", func(r Reader) bool {
return r.Int() < 50
})

// Insert the time, it implements binary marshaler
idx, _ := col.Insert(func(r Row) error {
r.SetInt32("age", 10)
r.SetInt32("age", 100)
return nil
})

for i := 0; i < 10; i++ {
for i := 0; i < 7; i++ {
col.QueryAt(idx, func(r Row) error {
r.MergeInt32("age", 1)
r.MergeInt32("age", 10)
return nil
})
}

col.QueryAt(idx, func(r Row) error {
age, _ := r.Int32("age")
assert.Equal(t, int32(0), age)
assert.Equal(t, int32(30), age)
return nil
})

col.Query(func(txn *Txn) error {
assert.Equal(t, 1, txn.With("young").Count())
return nil
})
}
Expand Down
18 changes: 18 additions & 0 deletions commit/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ const (
Skip OpType = 4 // Skips the value
)

// String returns a string representation
func (o OpType) String() string {
switch o {
case Delete:
return "delete"
case Insert:
return "insert"
case Put:
return "put"
case Merge:
return "merge"
case Skip:
return "skip"
default:
return "unknown"
}
}

// --------------------------- Delta log ----------------------------

// Buffer represents a buffer of delta operations.
Expand Down
10 changes: 8 additions & 2 deletions commit/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func run(name string, b *testing.B, count int, fn func(buf *Buffer, r *Reader))
}

func TestSizeof(t *testing.T) {
assert.LessOrEqual(t, int(unsafe.Sizeof(Reader{})), 80)
assert.LessOrEqual(t, int(unsafe.Sizeof(Buffer{})), 80)
assert.Equal(t, 96, int(unsafe.Sizeof(Reader{})))
assert.Equal(t, 80, int(unsafe.Sizeof(Buffer{})))
}

func TestReadWrite(t *testing.T) {
Expand Down Expand Up @@ -320,3 +320,9 @@ func FuzzBufferString(f *testing.F) {
assert.Equal(t, v, r.String())
})
}

func TestOpString(t *testing.T) {
for i := 0; i < 255; i++ {
assert.NotEmpty(t, OpType(i).String())
}
}
6 changes: 3 additions & 3 deletions commit/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ func (c *Commit) WriteTo(dst io.Writer) (int64, error) {
// Write chunk information
offset := uint32(0)
reader.Range(buffer, c.Chunk, func(r *Reader) {
w.WriteUint32(uint32(r.Offset)) // Value
w.WriteUint32(offset) // Offset
_ = w.WriteUint32(uint32(r.Offset)) // Value
_ = w.WriteUint32(offset) // Offset
offset += uint32(len(r.buffer))
})

Expand All @@ -140,7 +140,7 @@ func (c *Commit) WriteTo(dst io.Writer) (int64, error) {

// Write all chunk bytes together
reader.Range(buffer, c.Chunk, func(r *Reader) {
w.Write(r.buffer)
_, _ = w.Write(r.buffer)
})
return nil
}); err != nil {
Expand Down
Loading

0 comments on commit 501708a

Please sign in to comment.