=cdd #18768 Cache serialization of read and write msg

The WriteAggregator and ReadAggregator typically send
the same message to several replicas and by caching the serialized bytes
we avoid doing the same thing for each message

and add a test for WriteAggregator
This commit is contained in:
Patrik Nordwall 2015-10-18 09:36:32 +02:00
parent 81cba2e580
commit 6b935e0c0b
5 changed files with 399 additions and 4 deletions

View file

@ -41,6 +41,11 @@ akka.cluster.distributed-data {
# unreachable, so it should be configured to worst case in a healthy cluster.
max-pruning-dissemination = 60 s
# Serialized Write and Read messages are cached when they are sent to
# several nodes. If no further activity they are removed from the cache
# after this duration.
serializer-cache-time-to-live = 10s
}
#//#distributed-data

View file

@ -1359,7 +1359,7 @@ private[akka] class WriteAggregator(
def receive = {
case WriteAck
remaining -= sender().path.address
remaining -= senderAddress()
if (remaining.size == doneWhenRemainingSize)
reply(ok = true)
case SendToSecondary
@ -1367,6 +1367,8 @@ private[akka] class WriteAggregator(
case ReceiveTimeout reply(ok = false)
}
def senderAddress(): Address = sender().path.address
def reply(ok: Boolean): Unit = {
if (ok && envelope.data == DeletedData)
replyTo.tell(DeleteSuccess(key), context.parent)

View file

@ -3,6 +3,7 @@
*/
package akka.cluster.ddata.protobuf
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.breakOut
@ -21,12 +22,136 @@ import akka.serialization.BaseSerializer
import akka.util.{ ByteString AkkaByteString }
import akka.protobuf.ByteString
import akka.cluster.ddata.Key.KeyR
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
*/
private[akka] object ReplicatorMessageSerializer {
/**
* A cache that is designed for a small number (<= 32) of
* entries. It is using instance equality.
* Adding new entry overwrites oldest. It is
* thread safe but duplicates of same entry may occur.
*
* `evict` must be called from the outside, i.e. the
* cache will not cleanup itself.
*/
final class SmallCache[A <: AnyRef, B <: AnyRef](size: Int, timeToLive: FiniteDuration, getOrAddFactory: A B) {
require((size & (size - 1)) == 0, "size must be a power of 2")
require(size <= 32, "size must be <= 32")
private val n = new AtomicInteger(0)
private val mask = size - 1
private val elements = Array.ofDim[(A, B)](size)
private val ttlNanos = timeToLive.toNanos
// in theory this should be volatile, but since the cache has low
// guarantees anyway and visibility will be synced by other activity
// so we use non-volatile
private var lastUsed = System.nanoTime()
/**
* Get value from cache or `null` if it doesn't exist.
*/
def get(a: A): B = get(a, n.get)
private def get(a: A, startPos: Int): B = {
// start at the latest added value, most likely that we want that
val end = startPos + elements.length
@tailrec def find(i: Int): B = {
if (end - i == 0) null.asInstanceOf[B]
else {
val x = elements(i & mask)
if ((x eq null) || (x._1 ne a)) find(i + 1)
else x._2
}
}
lastUsed = System.nanoTime()
find(startPos)
}
/**
* Add entry to the cache.
* Overwrites oldest entry.
*/
def add(a: A, b: B): Unit = add((a, b))
def add(x: (A, B)): Unit = {
val i = n.incrementAndGet()
elements(i & mask) = x
lastUsed = System.nanoTime()
}
/**
* Get value from cache or create new value with the
* `getOrAddFactory` that was given in the constructor. The new
* value is added to the cache. Duplicates of same value may be added
* if multiple threads call this concurrently, but decent
* (low cost) effort is made to reduce the chance of duplicates.
*/
def getOrAdd(a: A): B = {
val position = n.get
val c = get(a, position)
if (c ne null)
c
else {
val b2 = getOrAddFactory(a)
if (position == n.get) {
// no change, add the new value
add(a, b2)
b2
} else {
// some other thread added, try one more time
// to reduce duplicates
val c2 = get(a)
if (c2 ne null) c2 // found it
else {
add(a, b2)
b2
}
}
}
}
/**
* Remove all elements if the cache has not been
* used within the `timeToLive`.
*/
def evict(): Unit =
if ((System.nanoTime() - lastUsed) > ttlNanos) {
var i = 0
while (i < elements.length) {
elements(i) = null
i += 1
}
}
override def toString: String =
elements.mkString("[", ",", "]")
}
}
/**
* Protobuf serializer of ReplicatorMessage messages.
*/
class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest with SerializationSupport with BaseSerializer {
import ReplicatorMessageSerializer.SmallCache
private val cacheTimeToLive = system.settings.config.getDuration(
"akka.cluster.distributed-data.serializer-cache-time-to-live", TimeUnit.MILLISECONDS).millis
private val readCache = new SmallCache[Read, Array[Byte]](4, cacheTimeToLive, m readToProto(m).toByteArray)
private val writeCache = new SmallCache[Write, Array[Byte]](4, cacheTimeToLive, m writeToProto(m).toByteArray)
system.scheduler.schedule(cacheTimeToLive, cacheTimeToLive / 2) {
readCache.evict()
writeCache.evict()
}(system.dispatcher)
private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray
val GetManifest = "A"
val GetSuccessManifest = "B"
@ -80,9 +205,9 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: DataEnvelope dataEnvelopeToProto(m).toByteArray
case m: Write writeToProto(m).toByteArray
case WriteAck dm.Empty.getDefaultInstance.toByteArray
case m: Read readToProto(m).toByteArray
case m: Write writeCache.getOrAdd(m)
case WriteAck writeAckBytes
case m: Read readCache.getOrAdd(m)
case m: ReadResult readResultToProto(m).toByteArray
case m: Status statusToProto(m).toByteArray
case m: Get[_] getToProto(m).toByteArray

View file

@ -0,0 +1,133 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.ddata
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import akka.actor.Address
import akka.actor.ActorRef
import akka.cluster.ddata.Replicator.Internal._
import akka.cluster.ddata.Replicator._
import akka.actor.ActorSelection
import akka.cluster.UniqueAddress
object WriteAggregatorSpec {
val key = GSetKey[String]("a")
def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef): Props =
Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo))
class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef)
extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo) {
override def replica(address: Address): ActorSelection =
context.actorSelection(probes(address).path)
override def senderAddress(): Address =
probes.find { case (a, r) r == sender() }.get._1
}
def writeAckAdapterProps(replica: ActorRef): Props =
Props(new WriteAckAdapter(replica))
class WriteAckAdapter(replica: ActorRef) extends Actor {
var replicator: Option[ActorRef] = None
def receive = {
case WriteAck
replicator.foreach(_ ! WriteAck)
case msg
replicator = Some(sender())
replica ! msg
}
}
}
class WriteAggregatorSpec extends AkkaSpec("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port=0
""")
with ImplicitSender {
val nodeA = Address("akka.tcp", "Sys", "a", 2552)
val nodeB = nodeA.copy(host = Some("b"))
val nodeC = nodeA.copy(host = Some("c"))
val nodeD = nodeA.copy(host = Some("d"))
// 4 replicas + the local => 5
val nodes = Set(nodeA, nodeB, nodeC, nodeD)
val data = GSet.empty + "A" + "B"
val timeout = 3.seconds.dilated
val writeTwo = WriteTo(2, timeout)
val writeMajority = WriteMajority(timeout)
def probes(probe: ActorRef): Map[Address, ActorRef] =
nodes.toSeq.map(_ -> system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap
"WriteAggregator" must {
"send to at least N/2+1 replicas when WriteMajority" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor))
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
"send to more when no immediate reply" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor))
probe.expectMsgType[Write]
// no reply
probe.expectMsgType[Write]
// no reply
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor))
probe.expectMsgType[Write]
// no reply
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
// no reply
probe.expectMsgType[Write]
// no reply
expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
}
}

View file

@ -78,4 +78,134 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem("ReplicatorMes
}
}
"Cache" must {
import ReplicatorMessageSerializer._
"be power of 2" in {
intercept[IllegalArgumentException] {
new SmallCache[String, String](3, 5.seconds, _ null)
}
}
"get added element" in {
val cache = new SmallCache[Read, String](2, 5.seconds, _ null)
val a = Read("a")
cache.add(a, "A")
cache.get(a) should be("A")
val b = Read("b")
cache.add(b, "B")
cache.get(a) should be("A")
cache.get(b) should be("B")
}
"return null for non-existing elements" in {
val cache = new SmallCache[Read, String](4, 5.seconds, _ null)
val a = Read("a")
cache.get(a) should be(null)
cache.add(a, "A")
val b = Read("b")
cache.get(b) should be(null)
}
"hold latest added elements" in {
val cache = new SmallCache[Read, String](4, 5.seconds, _ null)
val a = Read("a")
val b = Read("b")
val c = Read("c")
val d = Read("d")
val e = Read("e")
cache.add(a, "A")
cache.get(a) should be("A")
cache.add(b, "B")
cache.get(a) should be("A")
cache.add(c, "C")
cache.get(a) should be("A")
cache.add(d, "D")
cache.get(a) should be("A")
// now it is full and a will be pushed out
cache.add(e, "E")
cache.get(a) should be(null)
cache.get(b) should be("B")
cache.get(c) should be("C")
cache.get(d) should be("D")
cache.get(e) should be("E")
cache.add(a, "A")
cache.get(a) should be("A")
cache.get(b) should be(null)
cache.get(c) should be("C")
cache.get(d) should be("D")
cache.get(e) should be("E")
}
"handle Int wrap around" ignore { // ignored because it takes 20 seconds (but it works)
val cache = new SmallCache[Read, String](2, 5.seconds, _ null)
val a = Read("a")
val x = a -> "A"
var n = 0
while (n <= Int.MaxValue - 3) {
cache.add(x)
n += 1
}
cache.get(a) should be("A")
val b = Read("b")
val c = Read("c")
cache.add(b, "B")
cache.get(a) should be("A")
cache.get(b) should be("B")
cache.add(c, "C")
cache.get(a) should be(null)
cache.get(b) should be("B")
cache.get(c) should be("C")
cache.add(a, "A")
cache.get(a) should be("A")
cache.get(b) should be(null)
cache.get(c) should be("C")
}
"suppory getOrAdd" in {
var n = 0
def createValue(a: Read): AnyRef = {
n += 1
new AnyRef {
override val toString = "v" + n
}
}
val cache = new SmallCache[Read, AnyRef](4, 5.seconds, a createValue(a))
val a = Read("a")
val v1 = cache.getOrAdd(a)
v1.toString should be("v1")
cache.getOrAdd(a) should be theSameInstanceAs v1
}
"evict cache after time-to-live" in {
val cache = new SmallCache[Read, AnyRef](4, 10.millis, _ null)
val b = Read("b")
val c = Read("c")
cache.add(b, "B")
cache.add(c, "C")
Thread.sleep(30)
cache.evict()
cache.get(b) should be(null)
cache.get(c) should be(null)
}
"not evict cache before time-to-live" in {
val cache = new SmallCache[Read, AnyRef](4, 5.seconds, _ null)
val b = Read("b")
val c = Read("c")
cache.add(b, "B")
cache.add(c, "C")
cache.evict()
cache.get(b) should be("B")
cache.get(c) should be("C")
}
}
}