Skip to content

Commit

Permalink
finagle/finagle-memcached: Fix support for testing with external memc…
Browse files Browse the repository at this point in the history
…ached

Problem

External memcached tests could not run because we were checking for an address
in use with an incorrect exception string match.

There were also bugs in the external memcached tests.

Solution

Fix the issues above and add a README for how to run tests with external memcached.

Differential Revision: https://phabricator.twitter.biz/D1178243
  • Loading branch information
jcrossley authored and jenkins committed Oct 22, 2024
1 parent 8e36a30 commit af68b5d
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 136 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ New Features
* finagle-mysql: Added support for LONG_BLOB data type. ``PHAB_ID=D1152247``


Bug Fixes
~~~~~~~~~~

* finagle-memcached: Fixed support for running memcached tests with external memcached. Added README with
instructions under finagle/finagle-memcached. ``PHAB_ID=D1120240``


Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand Down
15 changes: 15 additions & 0 deletions finagle-memcached/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
To run the memcached tests against a real memcached server:

1. Ensure you have a Memcached installation. If not, you can install it with:

$ brew install memcached

2. Take note of the path where memcached is now installed:

$ which memcached

3. Run the memcached tests with the jvm flag EXTERNAL_MEMCACHED_PATH=<path>. For example, if you
are using bazel:

$ ./bazel test --test_arg=--jvm_flags="-DEXTERNAL_MEMCACHED_PATH=<path>" \
finagle/finagle-memcached/src/test/scala:scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import com.twitter.conversions.DurationOps._
import com.twitter.finagle._
import com.twitter.finagle.liveness.FailureAccrualFactory
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.param.{Stats, Timer}
import com.twitter.finagle.param.Stats
import com.twitter.finagle.param.Timer
import com.twitter.finagle.partitioning.param
import com.twitter.finagle.partitioning.param.EjectFailedHost
import com.twitter.finagle.stats.InMemoryStatsReceiver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,48 +90,50 @@ class MemcachedPartitioningClientTest extends MemcachedTest {
client.close()
}

test("traces fanout requests") {
// we use an eventually block to retry the request if we didn't get partitioned to different shards
eventually {
val tracer = new BufferingTracer()

// the servers created in MemcachedTest have inconsistent addresses, which means sharding
// will be inconsistent across runs. To combat this, we'll start our own servers and rerun the
// tests if we partition to the same shard.
val serverOpts =
for (_ <- 1 to NumServers)
yield TestMemcachedServer.start(
Some(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)))
val servers: Seq[TestMemcachedServer] = serverOpts.flatten

val client = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withTracer(tracer)
.newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName)

awaitResult(client.set("foo", Buf.Utf8("bar")))
awaitResult(client.set("baz", Buf.Utf8("boing")))
awaitResult(
client.gets(Seq("foo", "baz"))
).flatMap {
case (key, (Buf.Utf8(value1), Buf.Utf8(value2))) =>
Map((key, (value1, value2)))
}

client.close()
servers.foreach(_.stop())

val gets: Seq[TraceId] = tracer.iterator.toList collect {
case Record(id, _, Annotation.Rpc("Gets"), _) => id
}

// Moving the MemcachedTracingFilter means that partitioned requests should result in two gets spans
assert(gets.length == 2)
// However the FanoutProxy should ensure that the requests are stored in peers, not the same tid.
gets.tail.foreach { get =>
assert(get._parentId == gets.head._parentId)
assert(get.spanId != gets.head.spanId)
if (!Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) {
test("traces fanout requests") {
// we use an eventually block to retry the request if we didn't get partitioned to different shards
eventually {
val tracer = new BufferingTracer()

// the servers created in MemcachedTest have inconsistent addresses, which means sharding
// will be inconsistent across runs. To combat this, we'll start our own servers and rerun the
// tests if we partition to the same shard.
val serverOpts =
for (_ <- 1 to NumServers)
yield TestMemcachedServer.start(
Some(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)))
val servers: Seq[TestMemcachedServer] = serverOpts.flatten

val client = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withTracer(tracer)
.newRichClient(Name.bound(servers.map { s => Address(s.address) }: _*), clientName)

awaitResult(client.set("foo", Buf.Utf8("bar")))
awaitResult(client.set("baz", Buf.Utf8("boing")))
awaitResult(
client.gets(Seq("foo", "baz"))
).flatMap {
case (key, (Buf.Utf8(value1), Buf.Utf8(value2))) =>
Map((key, (value1, value2)))
}

client.close()
servers.foreach(_.stop())

val gets: Seq[TraceId] = tracer.iterator.toList collect {
case Record(id, _, Annotation.Rpc("Gets"), _) => id
}

// Moving the MemcachedTracingFilter means that partitioned requests should result in two gets spans
assert(gets.length == 2)
// However the FanoutProxy should ensure that the requests are stored in peers, not the same tid.
gets.tail.foreach { get =>
assert(get._parentId == gets.head._parentId)
assert(get.spanId != gets.head.spanId)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ abstract class MemcachedTest
protected[this] val Timeout: Duration = 15.seconds
protected[this] var servers: Seq[TestMemcachedServer] = Seq.empty
protected[this] var client: Client = _
protected[this] var singleServerClient: Client = _
protected[this] val clientName = "test_client"

protected[this] val redistributesKey: Seq[String]
Expand All @@ -57,6 +58,9 @@ abstract class MemcachedTest
val dest = Name.bound(servers.map { s => Address(s.address) }: _*)
client = createClient(dest, clientName)
}

singleServerClient =
createClient(Name.bound(Seq(Address(servers.head.address)): _*), clientName)
}

after {
Expand Down Expand Up @@ -136,66 +140,73 @@ abstract class MemcachedTest
)
}

if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) {
if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) {
test("gets") {
// create a client that connects to only one server so we can predict CAS tokens
awaitResult(client.set("foos", Buf.Utf8("xyz"))) // CAS: 1
awaitResult(client.set("bazs", Buf.Utf8("xyz"))) // CAS: 2
awaitResult(client.set("bazs", Buf.Utf8("zyx"))) // CAS: 3
awaitResult(client.set("bars", Buf.Utf8("xyz"))) // CAS: 4
awaitResult(client.set("bars", Buf.Utf8("zyx"))) // CAS: 5
awaitResult(client.set("bars", Buf.Utf8("yxz"))) // CAS: 6
// use client that connects to only one server so we can predict CAS tokens
awaitResult(singleServerClient.set("foos", Buf.Utf8("xyz")))
awaitResult(singleServerClient.set("bazs", Buf.Utf8("xyz")))
awaitResult(singleServerClient.set("bazs", Buf.Utf8("zyx")))
awaitResult(singleServerClient.set("bars", Buf.Utf8("xyz")))
awaitResult(singleServerClient.set("bars", Buf.Utf8("zyx")))
awaitResult(singleServerClient.set("bars", Buf.Utf8("yxz")))
val result =
awaitResult(
client.gets(Seq("foos", "bazs", "bars", "somethingelse"))
singleServerClient.gets(Seq("foos", "bazs", "bars", "somethingelse"))
).map {
case (key, (Buf.Utf8(value), Buf.Utf8(casUnique))) =>
(key, (value, casUnique))
}
// the "cas unique" values are predictable from a fresh memcached
val expected =
Map(
"foos" -> (("xyz", "1")),
"bazs" -> (("zyx", "3")),
"bars" -> (("yxz", "6"))
"foos" -> (("xyz", "2")),
"bazs" -> (("zyx", "4")),
"bars" -> (("yxz", "7"))
)
assert(result == expected)
}
}

if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) {
if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) {
test("getsWithFlag") {
awaitResult(client.set("foos1", Buf.Utf8("xyz")))
awaitResult(client.set("bazs1", Buf.Utf8("xyz")))
awaitResult(client.set("bazs1", Buf.Utf8("zyx")))
val result = awaitResult(client.getsWithFlag(Seq("foos1", "bazs1", "somethingelse")))
.map {
case (key, (Buf.Utf8(value), Buf.Utf8(flag), Buf.Utf8(casUnique))) =>
(key, (value, flag, casUnique))
}
// use client that connects to only one server so we can predict CAS tokens
awaitResult(singleServerClient.set("foos1", Buf.Utf8("xyz")))
awaitResult(singleServerClient.set("bazs1", Buf.Utf8("xyz")))
awaitResult(singleServerClient.set("bazs1", Buf.Utf8("zyx")))
val result =
awaitResult(singleServerClient.getsWithFlag(Seq("foos1", "bazs1", "somethingelse")))
.map {
case (key, (Buf.Utf8(value), Buf.Utf8(flag), Buf.Utf8(casUnique))) =>
(key, (value, flag, casUnique))
}

// the "cas unique" values are predictable from a fresh memcached
assert(
result == Map(
"foos1" -> (("xyz", "0", "1")),
"bazs1" -> (("zyx", "0", "2"))
"foos1" -> (("xyz", "0", "2")),
"bazs1" -> (("zyx", "0", "4"))
)
)
}
}

if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) {
if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) {
test("cas") {
awaitResult(client.set("x", Buf.Utf8("y")))
val Some((value, casUnique)) = awaitResult(client.gets("x"))
// use client that connects to only one server so we can predict CAS tokens
awaitResult(singleServerClient.set("x", Buf.Utf8("y"))) // Next CAS: 2
val Some((value, casUnique)) = awaitResult(singleServerClient.gets("x"))
assert(value == Buf.Utf8("y"))
assert(casUnique == Buf.Utf8("1"))
assert(casUnique == Buf.Utf8("2"))

assert(!awaitResult(client.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("2")).map(_.replaced)))
assert(
awaitResult(client.checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue
!awaitResult(
singleServerClient.checkAndSet("x", Buf.Utf8("z"), Buf.Utf8("1")).map(_.replaced)))
assert(
awaitResult(
singleServerClient
.checkAndSet("x", Buf.Utf8("z"), casUnique).map(_.replaced)).booleanValue
)
val res = awaitResult(client.get("x"))
val res = awaitResult(singleServerClient.get("x"))
assert(res.isDefined)
assert(res.get == Buf.Utf8("z"))
}
Expand Down Expand Up @@ -224,7 +235,7 @@ abstract class MemcachedTest
assert(awaitResult(client.decr("foo", l)) == Some(0L))
}

if (Option(System.getProperty("USE_EXTERNAL_MEMCACHED")).isDefined) {
if (Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) {
test("stats") {
// We can't use a partitioned client to get stats, because we don't hash to a server based on
// a key. Instead, we create a ConnectedClient, which is connected to one server.
Expand Down Expand Up @@ -483,39 +494,49 @@ abstract class MemcachedTest
assertRead(newClient, keys2)
}

test("partial success") {
val keys = writeKeys(client, 1000, 20)
assertRead(client, keys)

val initialResult = awaitResult { client.getResult(keys) }
assert(initialResult.failures.isEmpty)
assert(initialResult.misses.isEmpty)
assert(initialResult.values.size == keys.size)

// now kill one server
servers.head.stop()

// test partial success with getResult()
val getResult = awaitResult { client.getResult(keys) }
// assert the failures are set to the exception received from the failing partition
assert(getResult.failures.nonEmpty)
getResult.failures.foreach {
case (_, e) =>
assert(e.isInstanceOf[Exception])
}
// there should be no misses as all keys are known
assert(getResult.misses.isEmpty)

// assert that the values are what we expect them to be. We are not checking for exact
// number of failures and successes here because we don't know how many keys will fall into
// the failed partition. The accuracy of the responses are tested in other tests anyways.
assert(getResult.values.nonEmpty)
assert(getResult.values.size < keys.size)
getResult.values.foreach {
case (keyStr, valueBuf) =>
val Buf.Utf8(valStr) = valueBuf
assert(valStr == s"$keyStr$ValueSuffix")
}
// This works with our internal memcached because when the server is shutdown, we get an immediate
// "connection refused" when trying to send a request. With external memcached, the connection
// establishment instead hangs. To make this test pass with external memcached, we could add
// `withSession.acquisitionTimeout` to the client, but this makes the test a) slow and b) can
// make the other tests flakey, so don't bother.
if (!Option(System.getProperty("EXTERNAL_MEMCACHED_PATH")).isDefined) {
test("partial success") {
val keys = writeKeys(client, 1000, 20)
assertRead(client, keys)

val initialResult = awaitResult {
client.getResult(keys)
}
assert(initialResult.failures.isEmpty)
assert(initialResult.misses.isEmpty)
assert(initialResult.values.size == keys.size)

// now kill one server
servers.head.stop()

// test partial success with getResult()
val getResult = awaitResult {
client.getResult(keys)
}
// assert the failures are set to the exception received from the failing partition
assert(getResult.failures.nonEmpty)
getResult.failures.foreach {
case (_, e) =>
assert(e.isInstanceOf[Exception])
}
// there should be no misses as all keys are known
assert(getResult.misses.isEmpty)

// assert that the values are what we expect them to be. We are not checking for exact
// number of failures and successes here because we don't know how many keys will fall into
// the failed partition. The accuracy of the responses are tested in other tests anyways.
assert(getResult.values.nonEmpty)
assert(getResult.values.size < keys.size)
getResult.values.foreach {
case (keyStr, valueBuf) =>
val Buf.Utf8(valStr) = valueBuf
assert(valStr == s"$keyStr$ValueSuffix")
}
}
}
}
Loading

0 comments on commit af68b5d

Please sign in to comment.