From 5a5468dd4ae0b26b81a8a8de37efea21da106fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 3 Jun 2020 09:02:32 +0200 Subject: [PATCH] DData shard store improvements (#29166) * Handle timeouts better wrt the timeout used in shard * load all entities up front to potentially speed up GetRemembered response --- .../scala/akka/cluster/sharding/Shard.scala | 2 - .../DDataRememberEntitiesShardStore.scala | 54 ++++++++++++++----- .../DDataRememberEntitiesShardStoreSpec.scala | 26 +++++++-- 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 382ae5e56e..14022be549 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -615,8 +615,6 @@ private[akka] class Shard( timers.startSingleTimer( RememberEntityTimeoutKey, RememberEntityTimeout(update), - // FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times - // and this could always fail before ddata store completes retrying writes settings.tuningParameters.updatingStateTimeout) context.become(waitingForRememberEntitiesStore(update, startTimeNanos)) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index 2b2bdfbf1a..e04bda991e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -8,6 +8,7 @@ import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.Props +import akka.actor.Stash import akka.annotation.InternalApi import akka.cluster.Cluster import akka.cluster.ddata.ORSet @@ -77,6 +78,7 @@ private[akka] final class DDataRememberEntitiesShardStore( replicator: ActorRef, majorityMinCap: Int) extends Actor + with Stash with ActorLogging { import DDataRememberEntitiesShardStore._ @@ -86,8 +88,8 @@ private[akka] final class DDataRememberEntitiesShardStore( implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress) private val readMajority = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap) - // Note that the timeout is actually updatingStateTimeout x 3 since we do 3 retries - private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap) + // Note that the timeout is actually updatingStateTimeout / 4 so that we fit 3 retries and a response in the timeout before the shard sees it as a failure + private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout / 4, majorityMinCap) private val maxUpdateAttempts = 3 private val keys = stateKeys(typeName, shardId) @@ -98,30 +100,42 @@ private[akka] final class DDataRememberEntitiesShardStore( settings.tuningParameters.updatingStateTimeout.pretty, majorityMinCap) } - // FIXME potential optimization: start loading entity ids immediately on start instead of waiting for request - // (then throw away after request has been seen) + loadAllEntities() private def key(entityId: EntityId): ORSetKey[EntityId] = { val i = math.abs(entityId.hashCode % numberOfKeys) keys(i) } - override def receive: Receive = idle + override def receive: Receive = { + waitingForAllEntityIds(Set.empty, Set.empty, None) + } def idle: Receive = { - case RememberEntitiesShardStore.GetEntities => onGetEntities() + case RememberEntitiesShardStore.GetEntities => + // not supported, but we may get several if the shard timed out and retried + log.debug("Another get entities request after responding to one, not expected/supported, ignoring") case update: RememberEntitiesShardStore.Update => onUpdate(update) } - def waitingForAllEntityIds(requestor: ActorRef, gotKeys: Set[Int], ids: Set[EntityId]): Receive = { + def waitingForAllEntityIds(gotKeys: Set[Int], ids: Set[EntityId], shardWaiting: Option[ActorRef]): Receive = { def receiveOne(i: Int, idsForKey: Set[EntityId]): Unit = { val newGotKeys = gotKeys + i val newIds = ids.union(idsForKey) if (newGotKeys.size == numberOfKeys) { - requestor ! RememberEntitiesShardStore.RememberedEntities(newIds) - context.become(idle) + shardWaiting match { + case Some(shard) => + log.debug("Shard waiting for remembered entities, sending remembered and going idle") + shard ! RememberEntitiesShardStore.RememberedEntities(newIds) + context.become(idle) + unstashAll() + case None => + // we haven't seen request yet + log.debug("Got remembered entities, waiting for shard to request them") + context.become(waitingForAllEntityIds(newGotKeys, newIds, None)) + } } else { - context.become(waitingForAllEntityIds(requestor, newGotKeys, newIds)) + context.become(waitingForAllEntityIds(newGotKeys, newIds, shardWaiting)) } } @@ -144,11 +158,26 @@ private[akka] final class DDataRememberEntitiesShardStore( context.stop(self) case update: RememberEntitiesShardStore.Update => log.warning("Got an update before load of initial entities completed, dropping update: [{}]", update) + case RememberEntitiesShardStore.GetEntities => + if (gotKeys.size == numberOfKeys) { + // we already got all and was waiting for a request + log.debug("Got request from shard, sending remembered entities") + sender() ! RememberEntitiesShardStore.RememberedEntities(ids) + context.become(idle) + unstashAll() + } else { + // we haven't seen all ids yet + log.debug("Got request from shard, waiting for all remembered entities to arrive") + context.become(waitingForAllEntityIds(gotKeys, ids, Some(sender()))) + } + case _ => + // if we get a write while waiting for the listing, defer it until we saw listing, if not we can get a mismatch + // of remembered with what the shard thinks it just wrote + stash() } } private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = { - // FIXME what about ordering of adds/removes vs sets, I think we can lose one val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped)) // map from set of evts (for same ddata key) to one update that applies each of them val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] = @@ -220,12 +249,11 @@ private[akka] final class DDataRememberEntitiesShardStore( next(allUpdates) } - private def onGetEntities(): Unit = { + private def loadAllEntities(): Unit = { (0 until numberOfKeys).toSet[Int].foreach { i => val key = keys(i) replicator ! Get(key, readMajority, Some(i)) } - context.become(waitingForAllEntityIds(sender(), Set.empty, Set.empty)) } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala index 0b8316854e..1888fd6014 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala @@ -40,12 +40,13 @@ class DDataRememberEntitiesShardStoreSpec } "The DDataRememberEntitiesShardStore" must { + val replicatorSettings = ReplicatorSettings(system) + val replicator = system.actorOf(Replicator.props(replicatorSettings)) + + val shardingSettings = ClusterShardingSettings(system) "store starts and stops and list remembered entity ids" in { - val replicatorSettings = ReplicatorSettings(system) - val replicator = system.actorOf(Replicator.props(replicatorSettings)) - val shardingSettings = ClusterShardingSettings(system) val store = system.actorOf( DDataRememberEntitiesShardStore .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) @@ -65,9 +66,24 @@ class DDataRememberEntitiesShardStoreSpec store ! RememberEntitiesShardStore.Update(Set("2"), Set.empty) expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty)) - store ! RememberEntitiesShardStore.GetEntities - expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) + // the store does not support get after update + val storeIncarnation2 = system.actorOf( + DDataRememberEntitiesShardStore + .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) + storeIncarnation2 ! RememberEntitiesShardStore.GetEntities + expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) + } + + "handle a late request" in { + // the store does not support get after update + val storeIncarnation3 = system.actorOf( + DDataRememberEntitiesShardStore + .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) + + Thread.sleep(500) + storeIncarnation3 ! RememberEntitiesShardStore.GetEntities + expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) // from previous test } }