Skip to content

Commit

Permalink
Implement read bench with refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Shuo Wu <[email protected]>
  • Loading branch information
shuo-wu committed Dec 4, 2023
1 parent da2fa48 commit 984ee02
Showing 1 changed file with 73 additions and 38 deletions.
111 changes: 73 additions & 38 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,13 @@ func RandStringRunes(n int) string {
}

func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte, int64) (int, error)) (output string, err error) {
lock := sync.Mutex{}
benchTypeInList := strings.Split(benchType, "-")
if len(benchTypeInList) != 3 ||
(benchTypeInList[0] != "seq" && benchTypeInList[0] != "rand") ||
(benchTypeInList[1] != "iops" && benchTypeInList[1] != "bandwidth" && benchTypeInList[1] != "latency") ||
(benchTypeInList[2] != "read" && benchTypeInList[2] != "write") {
return "", fmt.Errorf("invalid bench type %s", benchType)
}

if thread != 1 && strings.Contains(benchType, "-latency-") {
logrus.Warnf("Using single thread for latency related benchmark")
Expand All @@ -343,41 +349,92 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte
blockSize = 1 << 20 // 1MB
}

blockBytes := []byte(RandStringRunes(blockSize))
var duration time.Duration

// Prepare data before read
if benchTypeInList[2] == "read" {
// Typically 4-thread write is enough
if _, err := dataIOWithMultipleThread(false, 4, 1<<20, size, writeAt); err != nil {
return "", err
}

if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, readAt); err != nil {
return "", err
}
}

if benchTypeInList[2] == "write" {
if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, writeAt); err != nil {
return "", err
}
}

switch benchTypeInList[1] {
case "iops":
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
case "bandwidth":
res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10))
output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
case "latency":
res := float64(duration) / 1000 / (float64(size) / float64(blockSize))
output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
}
return output, nil
}

Check notice on line 385 in pkg/util/util.go

View check run for this annotation

codefactor.io / CodeFactor

pkg/util/util.go#L333-L385

Complex Method
func dataIOWithMultipleThread(isRandomIO bool, thread, blockSize int, size int64, ioAt func([]byte, int64) (int, error)) (duration time.Duration, err error) {
lock := sync.Mutex{}

chunkSize := int(math.Ceil(float64(size) / float64(thread)))
chunkBlocks := int(math.Ceil(float64(chunkSize) / float64(blockSize)))
var sequenceList []int
if isRandomIO {
sequenceList = make([]int, chunkBlocks)
for i := 0; i < chunkBlocks; i++ {
sequenceList[i] = i
}
rand.Shuffle(chunkBlocks, func(i, j int) { sequenceList[i], sequenceList[j] = sequenceList[j], sequenceList[i] })
}

if chunkSize < blockSize {
return 0, fmt.Errorf("the io thread count is too much so that each thread cannot operate a single block")
}

wg := sync.WaitGroup{}
wg.Add(thread)

startTime := time.Now()
defer func() {
duration = time.Since(startTime)
}()

for i := 0; i < thread; i++ {
idx := i
go func() {
defer wg.Done()

// Ignore this randomly generate data if the ioAt is readAt
blockBytes := []byte(RandStringRunes(blockSize))

start := int64(idx) * int64(chunkSize)
end := int64(idx+1) * int64(chunkSize)
offset := start
for cnt := 0; cnt < chunkBlocks; cnt++ {
if strings.HasPrefix(benchType, "seq-") {
offset = start + int64(cnt*blockSize)
if offset+int64(blockSize) > end {
blockBytes = blockBytes[:end-offset]
}
} else if strings.HasPrefix(benchType, "rand-") {
offset = start + int64(rand.Intn(cnt)*blockSize)
if isRandomIO {
offset = start + int64(sequenceList[cnt]*blockSize)
if offset+int64(blockSize) > end {
offset -= int64(blockSize)
}
} else {
lock.Lock()
err = fmt.Errorf("invalid bench type %s", benchType)
lock.Unlock()
return
offset = start + int64(cnt*blockSize)
if offset+int64(blockSize) > end {
blockBytes = blockBytes[:end-offset]
}
}
if _, writeErr := writeAt(blockBytes, offset); writeErr != nil {
if _, ioErr := ioAt(blockBytes, offset); ioErr != nil {
lock.Lock()
err = writeErr
err = ioErr
lock.Unlock()
return
}
Expand All @@ -386,27 +443,5 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte
}
wg.Wait()

if err != nil {
return "", err
}

duration := time.Since(startTime)
switch benchType {
case "seq-iops-write":
fallthrough
case "rand-iops-write":
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
case "seq-bandwidth-write":
fallthrough
case "rand-bandwidth-write":
res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10))
output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
case "seq-latency-write":
fallthrough
case "rand-latency-write":
res := float64(duration) / 1000 / (float64(size) / float64(blockSize))
output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
}
return output, nil
return
}

0 comments on commit 984ee02

Please sign in to comment.