diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index 8c6cd55776..e0f59a79ab 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -41,6 +41,11 @@ akka.cluster.distributed-data { # unreachable, so it should be configured to worst case in a healthy cluster. max-pruning-dissemination = 60 s + # Serialized Write and Read messages are cached when they are sent to + # several nodes. If no further activity they are removed from the cache + # after this duration. + serializer-cache-time-to-live = 10s + } #//#distributed-data diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index ad359066f4..15c756406f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1359,7 +1359,7 @@ private[akka] class WriteAggregator( def receive = { case WriteAck ⇒ - remaining -= sender().path.address + remaining -= senderAddress() if (remaining.size == doneWhenRemainingSize) reply(ok = true) case SendToSecondary ⇒ @@ -1367,6 +1367,8 @@ private[akka] class WriteAggregator( case ReceiveTimeout ⇒ reply(ok = false) } + def senderAddress(): Address = sender().path.address + def reply(ok: Boolean): Unit = { if (ok && envelope.data == DeletedData) replyTo.tell(DeleteSuccess(key), context.parent) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index 3f8e9d98fc..4ba296e73d 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -3,6 +3,7 @@ */ package akka.cluster.ddata.protobuf +import scala.concurrent.duration._ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.breakOut @@ -21,12 +22,136 @@ import akka.serialization.BaseSerializer import akka.util.{ ByteString ⇒ AkkaByteString } import akka.protobuf.ByteString import akka.cluster.ddata.Key.KeyR +import java.util.concurrent.atomic.AtomicInteger +import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration + +/** + * INTERNAL API + */ +private[akka] object ReplicatorMessageSerializer { + + /** + * A cache that is designed for a small number (<= 32) of + * entries. It is using instance equality. + * Adding new entry overwrites oldest. It is + * thread safe but duplicates of same entry may occur. + * + * `evict` must be called from the outside, i.e. the + * cache will not cleanup itself. + */ + final class SmallCache[A <: AnyRef, B <: AnyRef](size: Int, timeToLive: FiniteDuration, getOrAddFactory: A ⇒ B) { + require((size & (size - 1)) == 0, "size must be a power of 2") + require(size <= 32, "size must be <= 32") + + private val n = new AtomicInteger(0) + private val mask = size - 1 + private val elements = Array.ofDim[(A, B)](size) + private val ttlNanos = timeToLive.toNanos + + // in theory this should be volatile, but since the cache has low + // guarantees anyway and visibility will be synced by other activity + // so we use non-volatile + private var lastUsed = System.nanoTime() + + /** + * Get value from cache or `null` if it doesn't exist. + */ + def get(a: A): B = get(a, n.get) + + private def get(a: A, startPos: Int): B = { + // start at the latest added value, most likely that we want that + val end = startPos + elements.length + @tailrec def find(i: Int): B = { + if (end - i == 0) null.asInstanceOf[B] + else { + val x = elements(i & mask) + if ((x eq null) || (x._1 ne a)) find(i + 1) + else x._2 + } + } + lastUsed = System.nanoTime() + find(startPos) + } + + /** + * Add entry to the cache. + * Overwrites oldest entry. + */ + def add(a: A, b: B): Unit = add((a, b)) + + def add(x: (A, B)): Unit = { + val i = n.incrementAndGet() + elements(i & mask) = x + lastUsed = System.nanoTime() + } + + /** + * Get value from cache or create new value with the + * `getOrAddFactory` that was given in the constructor. The new + * value is added to the cache. Duplicates of same value may be added + * if multiple threads call this concurrently, but decent + * (low cost) effort is made to reduce the chance of duplicates. + */ + def getOrAdd(a: A): B = { + val position = n.get + val c = get(a, position) + if (c ne null) + c + else { + val b2 = getOrAddFactory(a) + if (position == n.get) { + // no change, add the new value + add(a, b2) + b2 + } else { + // some other thread added, try one more time + // to reduce duplicates + val c2 = get(a) + if (c2 ne null) c2 // found it + else { + add(a, b2) + b2 + } + } + } + } + + /** + * Remove all elements if the cache has not been + * used within the `timeToLive`. + */ + def evict(): Unit = + if ((System.nanoTime() - lastUsed) > ttlNanos) { + var i = 0 + while (i < elements.length) { + elements(i) = null + i += 1 + } + } + + override def toString: String = + elements.mkString("[", ",", "]") + } +} /** * Protobuf serializer of ReplicatorMessage messages. */ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with SerializationSupport with BaseSerializer { + import ReplicatorMessageSerializer.SmallCache + + private val cacheTimeToLive = system.settings.config.getDuration( + "akka.cluster.distributed-data.serializer-cache-time-to-live", TimeUnit.MILLISECONDS).millis + private val readCache = new SmallCache[Read, Array[Byte]](4, cacheTimeToLive, m ⇒ readToProto(m).toByteArray) + private val writeCache = new SmallCache[Write, Array[Byte]](4, cacheTimeToLive, m ⇒ writeToProto(m).toByteArray) + system.scheduler.schedule(cacheTimeToLive, cacheTimeToLive / 2) { + readCache.evict() + writeCache.evict() + }(system.dispatcher) + + private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray val GetManifest = "A" val GetSuccessManifest = "B" @@ -80,9 +205,9 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) def toBinary(obj: AnyRef): Array[Byte] = obj match { case m: DataEnvelope ⇒ dataEnvelopeToProto(m).toByteArray - case m: Write ⇒ writeToProto(m).toByteArray - case WriteAck ⇒ dm.Empty.getDefaultInstance.toByteArray - case m: Read ⇒ readToProto(m).toByteArray + case m: Write ⇒ writeCache.getOrAdd(m) + case WriteAck ⇒ writeAckBytes + case m: Read ⇒ readCache.getOrAdd(m) case m: ReadResult ⇒ readResultToProto(m).toByteArray case m: Status ⇒ statusToProto(m).toByteArray case m: Get[_] ⇒ getToProto(m).toByteArray diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala new file mode 100644 index 0000000000..92de652ca6 --- /dev/null +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.cluster.ddata + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.Props +import akka.cluster.Cluster +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.Matchers +import org.scalatest.WordSpecLike +import akka.actor.Address +import akka.actor.ActorRef +import akka.cluster.ddata.Replicator.Internal._ +import akka.cluster.ddata.Replicator._ +import akka.actor.ActorSelection +import akka.cluster.UniqueAddress + +object WriteAggregatorSpec { + + val key = GSetKey[String]("a") + + def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency, + probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef): Props = + Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo)) + + class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency, + probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef) + extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo) { + + override def replica(address: Address): ActorSelection = + context.actorSelection(probes(address).path) + + override def senderAddress(): Address = + probes.find { case (a, r) ⇒ r == sender() }.get._1 + } + + def writeAckAdapterProps(replica: ActorRef): Props = + Props(new WriteAckAdapter(replica)) + + class WriteAckAdapter(replica: ActorRef) extends Actor { + var replicator: Option[ActorRef] = None + + def receive = { + case WriteAck ⇒ + replicator.foreach(_ ! WriteAck) + case msg ⇒ + replicator = Some(sender()) + replica ! msg + } + } +} + +class WriteAggregatorSpec extends AkkaSpec(""" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.netty.tcp.port=0 + """) + with ImplicitSender { + + val nodeA = Address("akka.tcp", "Sys", "a", 2552) + val nodeB = nodeA.copy(host = Some("b")) + val nodeC = nodeA.copy(host = Some("c")) + val nodeD = nodeA.copy(host = Some("d")) + // 4 replicas + the local => 5 + val nodes = Set(nodeA, nodeB, nodeC, nodeD) + + val data = GSet.empty + "A" + "B" + val timeout = 3.seconds.dilated + val writeTwo = WriteTo(2, timeout) + val writeMajority = WriteMajority(timeout) + + def probes(probe: ActorRef): Map[Address, ActorRef] = + nodes.toSeq.map(_ -> system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap + + "WriteAggregator" must { + "send to at least N/2+1 replicas when WriteMajority" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( + data, writeMajority, probes(probe.ref), nodes, testActor)) + + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + watch(aggr) + expectTerminated(aggr) + } + + "send to more when no immediate reply" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( + data, writeMajority, probes(probe.ref), nodes, testActor)) + + probe.expectMsgType[Write] + // no reply + probe.expectMsgType[Write] + // no reply + probe.lastSender ! WriteAck + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + watch(aggr) + expectTerminated(aggr) + } + + "timeout when less than required acks" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( + data, writeMajority, probes(probe.ref), nodes, testActor)) + + probe.expectMsgType[Write] + // no reply + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + probe.expectMsgType[Write] + // no reply + probe.expectMsgType[Write] + // no reply + expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) + watch(aggr) + expectTerminated(aggr) + } + + } +} diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index 11cc68126a..bef819ce28 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -78,4 +78,134 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem("ReplicatorMes } } + + "Cache" must { + import ReplicatorMessageSerializer._ + "be power of 2" in { + intercept[IllegalArgumentException] { + new SmallCache[String, String](3, 5.seconds, _ ⇒ null) + } + } + + "get added element" in { + val cache = new SmallCache[Read, String](2, 5.seconds, _ ⇒ null) + val a = Read("a") + cache.add(a, "A") + cache.get(a) should be("A") + val b = Read("b") + cache.add(b, "B") + cache.get(a) should be("A") + cache.get(b) should be("B") + } + + "return null for non-existing elements" in { + val cache = new SmallCache[Read, String](4, 5.seconds, _ ⇒ null) + val a = Read("a") + cache.get(a) should be(null) + cache.add(a, "A") + val b = Read("b") + cache.get(b) should be(null) + } + + "hold latest added elements" in { + val cache = new SmallCache[Read, String](4, 5.seconds, _ ⇒ null) + val a = Read("a") + val b = Read("b") + val c = Read("c") + val d = Read("d") + val e = Read("e") + cache.add(a, "A") + cache.get(a) should be("A") + cache.add(b, "B") + cache.get(a) should be("A") + cache.add(c, "C") + cache.get(a) should be("A") + cache.add(d, "D") + cache.get(a) should be("A") + // now it is full and a will be pushed out + cache.add(e, "E") + cache.get(a) should be(null) + cache.get(b) should be("B") + cache.get(c) should be("C") + cache.get(d) should be("D") + cache.get(e) should be("E") + + cache.add(a, "A") + cache.get(a) should be("A") + cache.get(b) should be(null) + cache.get(c) should be("C") + cache.get(d) should be("D") + cache.get(e) should be("E") + } + + "handle Int wrap around" ignore { // ignored because it takes 20 seconds (but it works) + val cache = new SmallCache[Read, String](2, 5.seconds, _ ⇒ null) + val a = Read("a") + val x = a -> "A" + var n = 0 + while (n <= Int.MaxValue - 3) { + cache.add(x) + n += 1 + } + + cache.get(a) should be("A") + + val b = Read("b") + val c = Read("c") + cache.add(b, "B") + cache.get(a) should be("A") + cache.get(b) should be("B") + + cache.add(c, "C") + cache.get(a) should be(null) + cache.get(b) should be("B") + cache.get(c) should be("C") + + cache.add(a, "A") + cache.get(a) should be("A") + cache.get(b) should be(null) + cache.get(c) should be("C") + } + + "suppory getOrAdd" in { + var n = 0 + def createValue(a: Read): AnyRef = { + n += 1 + new AnyRef { + override val toString = "v" + n + } + } + + val cache = new SmallCache[Read, AnyRef](4, 5.seconds, a ⇒ createValue(a)) + val a = Read("a") + val v1 = cache.getOrAdd(a) + v1.toString should be("v1") + cache.getOrAdd(a) should be theSameInstanceAs v1 + } + + "evict cache after time-to-live" in { + val cache = new SmallCache[Read, AnyRef](4, 10.millis, _ ⇒ null) + val b = Read("b") + val c = Read("c") + cache.add(b, "B") + cache.add(c, "C") + + Thread.sleep(30) + cache.evict() + cache.get(b) should be(null) + cache.get(c) should be(null) + } + + "not evict cache before time-to-live" in { + val cache = new SmallCache[Read, AnyRef](4, 5.seconds, _ ⇒ null) + val b = Read("b") + val c = Read("c") + cache.add(b, "B") + cache.add(c, "C") + cache.evict() + cache.get(b) should be("B") + cache.get(c) should be("C") + } + + } }