DData shard store improvements (#29166)
* Handle timeouts better wrt the timeout used in shard * load all entities up front to potentially speed up GetRemembered response
This commit is contained in:
parent
aa50c63965
commit
5a5468dd4a
3 changed files with 62 additions and 20 deletions
|
|
@ -615,8 +615,6 @@ private[akka] class Shard(
|
||||||
timers.startSingleTimer(
|
timers.startSingleTimer(
|
||||||
RememberEntityTimeoutKey,
|
RememberEntityTimeoutKey,
|
||||||
RememberEntityTimeout(update),
|
RememberEntityTimeout(update),
|
||||||
// FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times
|
|
||||||
// and this could always fail before ddata store completes retrying writes
|
|
||||||
settings.tuningParameters.updatingStateTimeout)
|
settings.tuningParameters.updatingStateTimeout)
|
||||||
|
|
||||||
context.become(waitingForRememberEntitiesStore(update, startTimeNanos))
|
context.become(waitingForRememberEntitiesStore(update, startTimeNanos))
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import akka.actor.Actor
|
||||||
import akka.actor.ActorLogging
|
import akka.actor.ActorLogging
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
|
import akka.actor.Stash
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.ddata.ORSet
|
import akka.cluster.ddata.ORSet
|
||||||
|
|
@ -77,6 +78,7 @@ private[akka] final class DDataRememberEntitiesShardStore(
|
||||||
replicator: ActorRef,
|
replicator: ActorRef,
|
||||||
majorityMinCap: Int)
|
majorityMinCap: Int)
|
||||||
extends Actor
|
extends Actor
|
||||||
|
with Stash
|
||||||
with ActorLogging {
|
with ActorLogging {
|
||||||
|
|
||||||
import DDataRememberEntitiesShardStore._
|
import DDataRememberEntitiesShardStore._
|
||||||
|
|
@ -86,8 +88,8 @@ private[akka] final class DDataRememberEntitiesShardStore(
|
||||||
implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)
|
implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)
|
||||||
|
|
||||||
private val readMajority = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
|
private val readMajority = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
|
||||||
// Note that the timeout is actually updatingStateTimeout x 3 since we do 3 retries
|
// Note that the timeout is actually updatingStateTimeout / 4 so that we fit 3 retries and a response in the timeout before the shard sees it as a failure
|
||||||
private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap)
|
private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout / 4, majorityMinCap)
|
||||||
private val maxUpdateAttempts = 3
|
private val maxUpdateAttempts = 3
|
||||||
private val keys = stateKeys(typeName, shardId)
|
private val keys = stateKeys(typeName, shardId)
|
||||||
|
|
||||||
|
|
@ -98,30 +100,42 @@ private[akka] final class DDataRememberEntitiesShardStore(
|
||||||
settings.tuningParameters.updatingStateTimeout.pretty,
|
settings.tuningParameters.updatingStateTimeout.pretty,
|
||||||
majorityMinCap)
|
majorityMinCap)
|
||||||
}
|
}
|
||||||
// FIXME potential optimization: start loading entity ids immediately on start instead of waiting for request
|
loadAllEntities()
|
||||||
// (then throw away after request has been seen)
|
|
||||||
|
|
||||||
private def key(entityId: EntityId): ORSetKey[EntityId] = {
|
private def key(entityId: EntityId): ORSetKey[EntityId] = {
|
||||||
val i = math.abs(entityId.hashCode % numberOfKeys)
|
val i = math.abs(entityId.hashCode % numberOfKeys)
|
||||||
keys(i)
|
keys(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def receive: Receive = idle
|
override def receive: Receive = {
|
||||||
|
waitingForAllEntityIds(Set.empty, Set.empty, None)
|
||||||
|
}
|
||||||
|
|
||||||
def idle: Receive = {
|
def idle: Receive = {
|
||||||
case RememberEntitiesShardStore.GetEntities => onGetEntities()
|
case RememberEntitiesShardStore.GetEntities =>
|
||||||
|
// not supported, but we may get several if the shard timed out and retried
|
||||||
|
log.debug("Another get entities request after responding to one, not expected/supported, ignoring")
|
||||||
case update: RememberEntitiesShardStore.Update => onUpdate(update)
|
case update: RememberEntitiesShardStore.Update => onUpdate(update)
|
||||||
}
|
}
|
||||||
|
|
||||||
def waitingForAllEntityIds(requestor: ActorRef, gotKeys: Set[Int], ids: Set[EntityId]): Receive = {
|
def waitingForAllEntityIds(gotKeys: Set[Int], ids: Set[EntityId], shardWaiting: Option[ActorRef]): Receive = {
|
||||||
def receiveOne(i: Int, idsForKey: Set[EntityId]): Unit = {
|
def receiveOne(i: Int, idsForKey: Set[EntityId]): Unit = {
|
||||||
val newGotKeys = gotKeys + i
|
val newGotKeys = gotKeys + i
|
||||||
val newIds = ids.union(idsForKey)
|
val newIds = ids.union(idsForKey)
|
||||||
if (newGotKeys.size == numberOfKeys) {
|
if (newGotKeys.size == numberOfKeys) {
|
||||||
requestor ! RememberEntitiesShardStore.RememberedEntities(newIds)
|
shardWaiting match {
|
||||||
context.become(idle)
|
case Some(shard) =>
|
||||||
|
log.debug("Shard waiting for remembered entities, sending remembered and going idle")
|
||||||
|
shard ! RememberEntitiesShardStore.RememberedEntities(newIds)
|
||||||
|
context.become(idle)
|
||||||
|
unstashAll()
|
||||||
|
case None =>
|
||||||
|
// we haven't seen request yet
|
||||||
|
log.debug("Got remembered entities, waiting for shard to request them")
|
||||||
|
context.become(waitingForAllEntityIds(newGotKeys, newIds, None))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
context.become(waitingForAllEntityIds(requestor, newGotKeys, newIds))
|
context.become(waitingForAllEntityIds(newGotKeys, newIds, shardWaiting))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -144,11 +158,26 @@ private[akka] final class DDataRememberEntitiesShardStore(
|
||||||
context.stop(self)
|
context.stop(self)
|
||||||
case update: RememberEntitiesShardStore.Update =>
|
case update: RememberEntitiesShardStore.Update =>
|
||||||
log.warning("Got an update before load of initial entities completed, dropping update: [{}]", update)
|
log.warning("Got an update before load of initial entities completed, dropping update: [{}]", update)
|
||||||
|
case RememberEntitiesShardStore.GetEntities =>
|
||||||
|
if (gotKeys.size == numberOfKeys) {
|
||||||
|
// we already got all and was waiting for a request
|
||||||
|
log.debug("Got request from shard, sending remembered entities")
|
||||||
|
sender() ! RememberEntitiesShardStore.RememberedEntities(ids)
|
||||||
|
context.become(idle)
|
||||||
|
unstashAll()
|
||||||
|
} else {
|
||||||
|
// we haven't seen all ids yet
|
||||||
|
log.debug("Got request from shard, waiting for all remembered entities to arrive")
|
||||||
|
context.become(waitingForAllEntityIds(gotKeys, ids, Some(sender())))
|
||||||
|
}
|
||||||
|
case _ =>
|
||||||
|
// if we get a write while waiting for the listing, defer it until we saw listing, if not we can get a mismatch
|
||||||
|
// of remembered with what the shard thinks it just wrote
|
||||||
|
stash()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = {
|
private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = {
|
||||||
// FIXME what about ordering of adds/removes vs sets, I think we can lose one
|
|
||||||
val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped))
|
val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped))
|
||||||
// map from set of evts (for same ddata key) to one update that applies each of them
|
// map from set of evts (for same ddata key) to one update that applies each of them
|
||||||
val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] =
|
val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] =
|
||||||
|
|
@ -220,12 +249,11 @@ private[akka] final class DDataRememberEntitiesShardStore(
|
||||||
next(allUpdates)
|
next(allUpdates)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def onGetEntities(): Unit = {
|
private def loadAllEntities(): Unit = {
|
||||||
(0 until numberOfKeys).toSet[Int].foreach { i =>
|
(0 until numberOfKeys).toSet[Int].foreach { i =>
|
||||||
val key = keys(i)
|
val key = keys(i)
|
||||||
replicator ! Get(key, readMajority, Some(i))
|
replicator ! Get(key, readMajority, Some(i))
|
||||||
}
|
}
|
||||||
context.become(waitingForAllEntityIds(sender(), Set.empty, Set.empty))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,12 +40,13 @@ class DDataRememberEntitiesShardStoreSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
"The DDataRememberEntitiesShardStore" must {
|
"The DDataRememberEntitiesShardStore" must {
|
||||||
|
val replicatorSettings = ReplicatorSettings(system)
|
||||||
|
val replicator = system.actorOf(Replicator.props(replicatorSettings))
|
||||||
|
|
||||||
|
val shardingSettings = ClusterShardingSettings(system)
|
||||||
|
|
||||||
"store starts and stops and list remembered entity ids" in {
|
"store starts and stops and list remembered entity ids" in {
|
||||||
val replicatorSettings = ReplicatorSettings(system)
|
|
||||||
val replicator = system.actorOf(Replicator.props(replicatorSettings))
|
|
||||||
|
|
||||||
val shardingSettings = ClusterShardingSettings(system)
|
|
||||||
val store = system.actorOf(
|
val store = system.actorOf(
|
||||||
DDataRememberEntitiesShardStore
|
DDataRememberEntitiesShardStore
|
||||||
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
|
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
|
||||||
|
|
@ -65,9 +66,24 @@ class DDataRememberEntitiesShardStoreSpec
|
||||||
store ! RememberEntitiesShardStore.Update(Set("2"), Set.empty)
|
store ! RememberEntitiesShardStore.Update(Set("2"), Set.empty)
|
||||||
expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty))
|
expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty))
|
||||||
|
|
||||||
store ! RememberEntitiesShardStore.GetEntities
|
// the store does not support get after update
|
||||||
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5"))
|
val storeIncarnation2 = system.actorOf(
|
||||||
|
DDataRememberEntitiesShardStore
|
||||||
|
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
|
||||||
|
|
||||||
|
storeIncarnation2 ! RememberEntitiesShardStore.GetEntities
|
||||||
|
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5"))
|
||||||
|
}
|
||||||
|
|
||||||
|
"handle a late request" in {
|
||||||
|
// the store does not support get after update
|
||||||
|
val storeIncarnation3 = system.actorOf(
|
||||||
|
DDataRememberEntitiesShardStore
|
||||||
|
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
|
||||||
|
|
||||||
|
Thread.sleep(500)
|
||||||
|
storeIncarnation3 ! RememberEntitiesShardStore.GetEntities
|
||||||
|
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) // from previous test
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue