Skip to content

Commit

Permalink
finagle/finagle-memcached: Buf framer for Memcached
Browse files Browse the repository at this point in the history
Problem

As part of the move to Netty 4, we want framing in terms of Buf.

Solution

Introduce Buf framers in Memcached.

RB_ID=848795
  • Loading branch information
jcrossley authored and jenkins committed Jul 7, 2016
1 parent 89ba5ee commit 06c69dc
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.twitter.finagle.netty3.codec

import com.twitter.finagle.Failure
import com.twitter.finagle.codec.FrameDecoder
import com.twitter.io.Buf
import org.jboss.netty.channel._

/**
* Frames Bufs into protocol frames
*/
private[finagle] class FrameDecoderHandler(
framer: FrameDecoder[Buf]) extends SimpleChannelUpstreamHandler {

override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit =
e.getMessage match {
case buf: Buf =>
val frames: IndexedSeq[Buf] = framer(buf)
var i = 0
while (i < frames.length) {
Channels.fireMessageReceived(ctx, frames(i))
i += 1
}
case msg => Channels.fireExceptionCaught(ctx, Failure(
"unexpected type when framing Buf." +
s"Expected Buf, got ${msg.getClass.getSimpleName}."))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.twitter.finagle.memcached.protocol.text

import com.twitter.finagle.codec.FrameDecoder
import com.twitter.finagle.memcached.util.ParserUtils
import com.twitter.io.Buf
import scala.collection.mutable.ArrayBuffer

private[memcached] object Framer {

private sealed trait State
private case object AwaitingTextFrame extends State
private case class AwaitingDataFrame(bytesNeeded: Int) extends State

private val EmptySeq = IndexedSeq.empty[Buf]

private val TokenDelimiter: Byte = ' '
}

/**
* Frames Bufs into Memcached frames. Memcached frames are one of two types;
* text frames and data frames. Text frames are delimited by `\r\n`. If a text
* frame starts with the token `VALUE`, a data frame will follow. The length of the
* data frame is given by the string representation of the third token in the
* text frame. The data frame also ends with `\r\n`.
*
* For more information, see https://github.com/memcached/memcached/blob/master/doc/protocol.txt.
*
* To simplify the decoding logic, we have decoupled framing and decoding; however, because of the
* complex framing logic, we must partially decode messages during framing to frame correctly.
*
* @note Class contains mutable state. Not thread-safe.
*/
private[memcached] trait Framer extends FrameDecoder[Buf] {
import Framer._

private[this] var accum: Buf = Buf.Empty

private[this] var state: State = AwaitingTextFrame

protected val byteArrayForBuf2Int: Array[Byte] = ParserUtils.newByteArrayForBuf2Int()

/**
* Return the number of bytes before `\r\n` (newline), or -1 if no newlines found
*/
private[this] def bytesBeforeLineEnd(bytes: Array[Byte]): Int = {
var i = 0
while (i < bytes.length - 1) {
if (bytes(i) == '\r' && bytes(i + 1) == '\n') {
return i
}
i += 1
}
-1
}

/**
* Using the current accumulation of Bufs, read the next frame. If no frame can be read,
* return null.
*/
private def extractFrame(): Buf =
state match {
case AwaitingTextFrame =>
val frameLength = bytesBeforeLineEnd(Buf.ByteArray.Owned.extract(accum))
if (frameLength < 0) {
null
} else {

// We have received a text frame. Extract the frame.
val frameBuf: Buf = accum.slice(0, frameLength)

// Remove the extracted frame from the accumulator, stripping the newline (2 chars)
accum = accum.slice(frameLength + 2, accum.length)

val tokens = ParserUtils.split(Buf.ByteArray.Owned.extract(frameBuf), TokenDelimiter)

val bytesNeeded = dataLength(tokens)

// If the frame starts with "VALUE", we expect a data frame to follow,
// of length `bytesNeeded`.
if (bytesNeeded != -1) state = AwaitingDataFrame(bytesNeeded)
frameBuf
}
case AwaitingDataFrame(bytesNeeded) =>

// A data frame ends with `\r\n', so we must wait for `bytesNeeded + 2` bytes.
if (accum.length >= bytesNeeded + 2) {

// Extract the data frame
val frameBuf: Buf = accum.slice(0, bytesNeeded)

// Remove the extracted frame from the accumulator, stripping the newline (2 chars)
accum = accum.slice(bytesNeeded + 2 , accum.length)
state = AwaitingTextFrame
frameBuf
} else {
null
}
}

/**
* Frame a Buf and any accumulated partial frames into as many Memcached frames as possible.
*/
def apply(buf: Buf): IndexedSeq[Buf] = {
accum = accum.concat(buf)
var frame = extractFrame()

if (frame != null) {
// The average Gizmoduck memcached pipeline has 0-1 requests pending, and the average server
// response is split into 2 memcached protocol frames, so we chose 2 as the initial array
// size.
val frames = new ArrayBuffer[Buf](2)
do {
frames += frame
frame = extractFrame()
} while (frame != null)

frames.toIndexedSeq
} else {
EmptySeq
}
}

/**
* Given a sequence of Buf tokens that comprise a Memcached frame,
* return the length of data expected in the next frame, or -1
* if the length cannot be extracted.
*/
def dataLength(tokens: IndexedSeq[Buf]): Int
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.twitter.finagle.memcached.protocol.text.client

import com.twitter.finagle.memcached.protocol.text.Framer
import com.twitter.finagle.memcached.util.ParserUtils._
import com.twitter.io.Buf

private object ClientFramer {
val Value = Buf.Utf8("VALUE")
}

private[finagle] class ClientFramer extends Framer {
import ClientFramer._

// The data length is the 4th token, interpreted as an Int.
def dataLength(tokens: IndexedSeq[Buf]): Int =
if (tokens.nonEmpty) {
val responseName = tokens.head
if (responseName == Value && tokens.length >= 4) {
val dataLengthAsBuf = tokens(3)
dataLengthAsBuf.write(byteArrayForBuf2Int, 0)
byteArrayStringToInt(byteArrayForBuf2Int, dataLengthAsBuf.length)
} else -1
} else -1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.twitter.finagle.memcached.protocol.text.server

import com.twitter.finagle.memcached.protocol.text.Framer
import com.twitter.finagle.memcached.util.ParserUtils
import com.twitter.io.Buf

private[finagle] class ServerFramer(storageCommands: Set[Buf]) extends Framer {

// The data length is the 5th token, interpreted as an Int.
def dataLength(tokens: IndexedSeq[Buf]): Int =
if (tokens.nonEmpty) {
val commandName = tokens.head
if (storageCommands.contains(commandName) && tokens.length >= 5) {
val dataLengthAsBuf = tokens(4)
dataLengthAsBuf.write(byteArrayForBuf2Int, 0)
ParserUtils.byteArrayStringToInt(byteArrayForBuf2Int, dataLengthAsBuf.length)
} else -1
} else -1
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.twitter.finagle.memcached.util
import com.twitter.io.Buf
import java.util.regex.Pattern
import org.jboss.netty.buffer.ChannelBuffer
import scala.collection.mutable.ArrayBuffer

object ParserUtils {

Expand All @@ -13,6 +14,9 @@ object ParserUtils {

val DigitsPattern = Pattern.compile(DIGITS)

// Used by byteArrayStringToInt. The maximum length of a non-negative Int in chars
private[this] val MaxLengthOfIntString = Int.MaxValue.toString.length

/**
* Returns true if every readable byte in the ChannelBuffer is a digit,
* false otherwise.
Expand Down Expand Up @@ -50,4 +54,49 @@ object ParserUtils {
true
}

private[memcached] def split(bytes: Array[Byte], delimiter: Byte): IndexedSeq[Buf] = {
val split = new ArrayBuffer[Buf](6)
var segmentStart = 0
var segmentEnd = 0
while (segmentEnd < bytes.length) {
if (bytes(segmentEnd) == delimiter) {
if (segmentEnd != 0)
split += Buf.ByteArray.Owned(bytes, segmentStart, segmentEnd)
segmentStart = segmentEnd + 1
segmentEnd = segmentStart
} else {
segmentEnd += 1
}
}
if (segmentStart != segmentEnd) {
split += Buf.ByteArray.Owned(bytes, segmentStart, segmentEnd)
}
split.toIndexedSeq
}

private[memcached] def newByteArrayForBuf2Int() = new Array[Byte](MaxLengthOfIntString)

/**
* Converts `length` characters of a Byte Array, representing a non-negative integer in chars,
* to a base 10 Int.
* Returns -1 if any of the characters are not digits, or the length is invalid
*/
private[memcached] def byteArrayStringToInt(bytes: Array[Byte], length: Int): Int = {
if (length < 0 || length > MaxLengthOfIntString || length > bytes.length) -1
else {
var num = 0
var multiple = 1
var i = length - 1 // Start at the least significant digit and move left
while (i >= 0) {
if (bytes(i) >= '0' && bytes(i) <= '9')
num += (bytes(i) - '0') * multiple
else
return -1
i -= 1
multiple *= 10
}
num
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class MemcachedTest extends FunSuite with BeforeAndAfter {
assert(Await.result(client.get("foo")).get == Buf.Utf8("bar"))
}

test("set with data containing newline") {
Await.result(client.set("foo", Buf.Utf8("bar\r\nbaz\r\nqux\r\n")), 2.seconds)
assert(Await.result(client.get("foo"), 2.seconds).get == Buf.Utf8("bar\r\nbaz\r\nqux\r\n"))
}

test("get") {
Await.result(client.set("foo", Buf.Utf8("bar")))
Await.result(client.set("baz", Buf.Utf8("boing")))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.twitter.finagle.memcached.unit.protocol.text

import com.twitter.finagle.memcached.protocol.text.client.ClientFramer
import com.twitter.io.Buf
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class FramerTest extends FunSuite {

test("return empty frame sequence on partial frame") {
val framer = new ClientFramer
assert(framer(Buf.Utf8("STO")) == Seq.empty)
}

test("frame response without data") {
val framer = new ClientFramer
assert(framer(Buf.Utf8("STORED\r\n")) == Seq(Buf.Utf8("STORED")))
}

test("accumulate partial response frame") {
val framer = new ClientFramer
framer(Buf.Utf8("ST"))
framer(Buf.Utf8("OR"))
framer(Buf.Utf8("ED\r"))
assert(framer(Buf.Utf8("\n")) == Seq(Buf.Utf8("STORED")))
}

test("accumulate response frame after returning frame") {
val framer = new ClientFramer
framer(Buf.Utf8("ST"))
assert(framer(Buf.Utf8("ORED\r\nNOT_ST")) == Seq(Buf.Utf8("STORED")))
assert(framer(Buf.Utf8("ORED\r\n")) == Seq(Buf.Utf8("NOT_STORED")))
}

test("Frame multiple frames") {
val framer = new ClientFramer
assert(framer(Buf.Utf8("STORED\r\nNOT_STORED\r\n")) ==
Seq(Buf.Utf8("STORED"), Buf.Utf8("NOT_STORED")))
}

test("Frame data frame") {
val framer = new ClientFramer
assert(framer(Buf.Utf8("VALUE foo 0 10\r\n")) == Seq(Buf.Utf8("VALUE foo 0 10")))
assert(framer(Buf.Utf8("abcdefghij\r\n")) == Seq(Buf.Utf8("abcdefghij")))
}

test("accumulate partial data frames") {
val framer = new ClientFramer
assert(framer(Buf.Utf8("VALUE foo 0 10\r\nabc")) == Seq(Buf.Utf8("VALUE foo 0 10")))
framer(Buf.Utf8("def"))
framer(Buf.Utf8("ghi"))
assert(framer(Buf.Utf8("j\r\n")) == Seq(Buf.Utf8("abcdefghij")))
}

test("accumulate response after framing data frame") {
val framer = new ClientFramer
assert(framer(Buf.Utf8("VALUE foo 0 3\r\nabc\r\nSTO")) ==
Seq(Buf.Utf8("VALUE foo 0 3"), Buf.Utf8("abc")))
assert(framer(Buf.Utf8("RED\r\n")) == Seq(Buf.Utf8("STORED")))
}

test("Don't frame data frame until newlines are received") {
val framer = new ClientFramer
framer(Buf.Utf8("VALUE foo 0 3\r\n"))
assert(framer(Buf.Utf8("abc")) == Seq.empty)
assert(framer(Buf.Utf8("\r\n")) == Seq(Buf.Utf8("abc")))
}

test("Ignore newlines in the middle of data frames") {
val framer = new ClientFramer
framer(Buf.Utf8("VALUE foo 0 10\r\n"))
assert(framer(Buf.Utf8("abc\r\ndef\r\n\r\n")) == Seq(Buf.Utf8("abc\r\ndef\r\n")))
}
}
Loading

0 comments on commit 06c69dc

Please sign in to comment.