From 1f9c374bd96e2e8e5b331746abac44c67f02eb0e Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Mon, 1 Aug 2016 09:46:09 +0100 Subject: [PATCH] Cluster Sharding with remember-entity enabled fails to recover after restart #20744 --- .../src/main/resources/reference.conf | 14 ++++ .../sharding/ClusterShardingSettings.scala | 74 +++++++++++++++---- .../scala/akka/cluster/sharding/Shard.scala | 72 ++++++++++++++++-- .../sharding/ClusterShardingSpec.scala | 37 +++++++++- .../AllAtOnceEntityRecoveryStrategySpec.scala | 37 ++++++++++ ...nstantRateEntityRecoveryStrategySpec.scala | 45 +++++++++++ 6 files changed, 259 insertions(+), 20 deletions(-) create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 8206019d46..c89d0036a5 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -94,6 +94,20 @@ akka.cluster.sharding { # works only for state-store-mode = "ddata" updating-state-timeout = 5 s + # The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used + # by the persistent shard when rebalancing or restarting. The value can either be "all" or "constant". The "all" + # strategy start all the underlying entity actors at the same time. The constant strategy will start the underlying + # entity actors at a fix rate. The default strategy "all". + entity-recovery-strategy = "all" + + # Default settings for the constant rate entity recovery strategy + entity-recovery-constant-rate-strategy { + # Sets the frequency at which a batch of entity actors is started. + frequency = 100 ms + # Sets the number of entity actors to be restart at a particular interval + number-of-entities = 5 + } + # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. # The "role" of the singleton configuration is not used. The singleton role will # be the same as "akka.cluster.sharding.role". diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index b77fb80eb8..230cfc4a5e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -38,7 +38,10 @@ object ClusterShardingSettings { leastShardAllocationMaxSimultaneousRebalance = config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"), waitingForStateTimeout = config.getDuration("waiting-for-state-timeout", MILLISECONDS).millis, - updatingStateTimeout = config.getDuration("updating-state-timeout", MILLISECONDS).millis) + updatingStateTimeout = config.getDuration("updating-state-timeout", MILLISECONDS).millis, + entityRecoveryStrategy = config.getString("entity-recovery-strategy"), + entityRecoveryConstantRateStrategyFrequency = config.getDuration("entity-recovery-constant-rate-strategy.frequency", MILLISECONDS).millis, + entityRecoveryConstantRateStrategyNumberOfEntities = config.getInt("entity-recovery-constant-rate-strategy.number-of-entities")) val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton")) @@ -71,19 +74,62 @@ object ClusterShardingSettings { if (role == "") None else Option(role) class TuningParameters( - val coordinatorFailureBackoff: FiniteDuration, - val retryInterval: FiniteDuration, - val bufferSize: Int, - val handOffTimeout: FiniteDuration, - val shardStartTimeout: FiniteDuration, - val shardFailureBackoff: FiniteDuration, - val entityRestartBackoff: FiniteDuration, - val rebalanceInterval: FiniteDuration, - val snapshotAfter: Int, - val leastShardAllocationRebalanceThreshold: Int, - val leastShardAllocationMaxSimultaneousRebalance: Int, - val waitingForStateTimeout: FiniteDuration, - val updatingStateTimeout: FiniteDuration) + val coordinatorFailureBackoff: FiniteDuration, + val retryInterval: FiniteDuration, + val bufferSize: Int, + val handOffTimeout: FiniteDuration, + val shardStartTimeout: FiniteDuration, + val shardFailureBackoff: FiniteDuration, + val entityRestartBackoff: FiniteDuration, + val rebalanceInterval: FiniteDuration, + val snapshotAfter: Int, + val leastShardAllocationRebalanceThreshold: Int, + val leastShardAllocationMaxSimultaneousRebalance: Int, + val waitingForStateTimeout: FiniteDuration, + val updatingStateTimeout: FiniteDuration, + val entityRecoveryStrategy: String, + val entityRecoveryConstantRateStrategyFrequency: FiniteDuration, + val entityRecoveryConstantRateStrategyNumberOfEntities: Int) { + + require( + entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", + s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'") + + // included for binary compatibility + def this( + coordinatorFailureBackoff: FiniteDuration, + retryInterval: FiniteDuration, + bufferSize: Int, + handOffTimeout: FiniteDuration, + shardStartTimeout: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entityRestartBackoff: FiniteDuration, + rebalanceInterval: FiniteDuration, + snapshotAfter: Int, + leastShardAllocationRebalanceThreshold: Int, + leastShardAllocationMaxSimultaneousRebalance: Int, + waitingForStateTimeout: FiniteDuration, + updatingStateTimeout: FiniteDuration) = { + this( + coordinatorFailureBackoff, + retryInterval, + bufferSize, + handOffTimeout, + shardStartTimeout, + shardFailureBackoff, + entityRestartBackoff, + rebalanceInterval, + snapshotAfter, + leastShardAllocationRebalanceThreshold, + leastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout, + updatingStateTimeout, + "all", + 100 milliseconds, + 5 + ) + } + } } /** diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index e8381da165..a1d42d1034 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -6,16 +6,19 @@ package akka.cluster.sharding import java.net.URLEncoder import akka.actor.ActorLogging import akka.actor.ActorRef +import akka.actor.ActorSystem import akka.actor.Deploy import akka.actor.Props import akka.actor.Terminated -import akka.cluster.sharding.Shard.{ ShardCommand } +import akka.cluster.sharding.Shard.ShardCommand import akka.persistence.PersistentActor import akka.persistence.SnapshotOffer import akka.actor.Actor import akka.persistence.RecoveryCompleted import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotSuccess +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -35,6 +38,12 @@ private[akka] object Shard { */ final case class RestartEntity(entity: EntityId) extends ShardCommand + /** + * When initialising a shard with remember entities enabled the following message is used + * to restart batches of entity actors at a time. + */ + final case class RestartEntities(entity: Set[EntityId]) extends ShardCommand + /** * A case class which represents a state change for the Shard */ @@ -116,7 +125,7 @@ private[akka] class Shard( import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized } import ShardCoordinator.Internal.{ HandOff, ShardStopped } - import Shard.{ State, RestartEntity, EntityStopped, EntityStarted } + import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted } import Shard.{ ShardQuery, GetCurrentShardState, CurrentShardState, GetShardStats, ShardStats } import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage import akka.cluster.sharding.ShardRegion.ShardRegionCommand @@ -151,7 +160,8 @@ private[akka] class Shard( } def receiveShardCommand(msg: ShardCommand): Unit = msg match { - case RestartEntity(id) ⇒ getEntity(id) + case RestartEntity(id) ⇒ getEntity(id) + case RestartEntities(ids) ⇒ ids foreach getEntity } def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { @@ -313,8 +323,19 @@ private[akka] class PersistentShard( with PersistentActor with ActorLogging { import ShardRegion.{ EntityId, Msg } - import Shard.{ State, RestartEntity, EntityStopped, EntityStarted } + import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted } import settings.tuningParameters._ + import akka.pattern.pipe + + val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy = + entityRecoveryStrategy match { + case "all" ⇒ EntityRecoveryStrategy.allStrategy() + case "constant" ⇒ EntityRecoveryStrategy.constantStrategy( + context.system, + entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities + ) + } override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" @@ -344,7 +365,7 @@ private[akka] class PersistentShard( case EntityStopped(id) ⇒ state = state.copy(state.entities - id) case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot case RecoveryCompleted ⇒ - state.entities foreach getEntity + restartRememberedEntities() super.initialized() log.debug("Shard recovery completed {}", shardId) } @@ -388,4 +409,45 @@ private[akka] class PersistentShard( } } + private def restartRememberedEntities(): Unit = { + rememberedEntitiesRecoveryStrategy.recoverEntities(state.entities).foreach { scheduledRecovery ⇒ + import context.dispatcher + scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self) + } + } +} + +object EntityRecoveryStrategy { + def allStrategy(): EntityRecoveryStrategy = new AllAtOnceEntityRecoveryStrategy() + + def constantStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int): EntityRecoveryStrategy = + new ConstantRateEntityRecoveryStrategy(actorSystem, frequency, numberOfEntities) +} + +trait EntityRecoveryStrategy { + import ShardRegion.EntityId + import scala.concurrent.Future + + def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] +} + +final class AllAtOnceEntityRecoveryStrategy extends EntityRecoveryStrategy { + import ShardRegion.EntityId + override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] = + if (entities.isEmpty) Set.empty else Set(Future.successful(entities)) +} + +final class ConstantRateEntityRecoveryStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int) extends EntityRecoveryStrategy { + import ShardRegion.EntityId + import akka.pattern.after + import actorSystem.dispatcher + + override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] = + entities.grouped(numberOfEntities).foldLeft((frequency, Set[Future[Set[EntityId]]]())) { + case ((interval, scheduledEntityIds), entityIds) ⇒ + (interval + frequency, scheduledEntityIds + scheduleEntities(interval, entityIds)) + }._2 + + private def scheduleEntities(interval: FiniteDuration, entityIds: Set[EntityId]) = + after(interval, actorSystem.scheduler)(Future.successful[Set[EntityId]](entityIds)) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 0788c895cb..93aa967374 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -114,7 +114,11 @@ object ClusterShardingSpec { } -abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingSpecConfig( + val mode: String, + val entityRecoveryStrategy: String = "all") + extends MultiNodeConfig { + val controller = role("controller") val first = role("first") val second = role("second") @@ -144,6 +148,11 @@ abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConf entity-restart-backoff = 1s rebalance-interval = 2 s state-store-mode = "$mode" + entity-recovery-strategy = "$entityRecoveryStrategy" + entity-recovery-constant-rate-strategy { + frequency = 1 ms + number-of-entities = 1 + } least-shard-allocation-strategy { rebalance-threshold = 2 max-simultaneous-rebalance = 1 @@ -177,9 +186,19 @@ object ClusterShardingDocCode { object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence") object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig("ddata") +object PersistentClusterShardingWithEntityRecoverySpecConfig extends ClusterShardingSpecConfig( + "persistence", + "all" +) +object DDataClusterShardingWithEntityRecoverySpecConfig extends ClusterShardingSpecConfig( + "ddata", + "constant" +) class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig) class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig) +class PersistentClusterShardingWithEntityRecoverySpec extends ClusterShardingSpec(PersistentClusterShardingWithEntityRecoverySpecConfig) +class DDataClusterShardingWithEntityRecoverySpec extends ClusterShardingSpec(DDataClusterShardingWithEntityRecoverySpecConfig) class PersistentClusterShardingMultiJvmNode1 extends PersistentClusterShardingSpec class PersistentClusterShardingMultiJvmNode2 extends PersistentClusterShardingSpec @@ -197,6 +216,22 @@ class DDataClusterShardingMultiJvmNode5 extends DDataClusterShardingSpec class DDataClusterShardingMultiJvmNode6 extends DDataClusterShardingSpec class DDataClusterShardingMultiJvmNode7 extends DDataClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode1 extends PersistentClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode2 extends PersistentClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode3 extends PersistentClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode4 extends PersistentClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode5 extends PersistentClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode6 extends PersistentClusterShardingSpec +class PersistentClusterShardingWithEntityRecoveryMultiJvmNode7 extends PersistentClusterShardingSpec + +class DDataClusterShardingWithEntityRecoveryMultiJvmNode1 extends DDataClusterShardingSpec +class DDataClusterShardingWithEntityRecoveryMultiJvmNode2 extends DDataClusterShardingSpec +class DDataClusterShardingWithEntityRecoveryMultiJvmNode3 extends DDataClusterShardingSpec +class DDataClusterShardingWithEntityRecoveryMultiJvmNode4 extends DDataClusterShardingSpec +class DDataClusterShardingWithEntityRecoveryMultiJvmNode5 extends DDataClusterShardingSpec +class DDataClusterShardingWithEntityRecoveryMultiJvmNode6 extends DDataClusterShardingSpec +class DDataClusterShardingWithEntityRecoveryMultiJvmNode7 extends DDataClusterShardingSpec + abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingSpec._ import config._ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala new file mode 100644 index 0000000000..b9b7b2b562 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala @@ -0,0 +1,37 @@ +package akka.cluster.sharding + +import akka.cluster.sharding.ShardRegion.EntityId +import akka.testkit.AkkaSpec +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ +import scala.language.postfixOps + +class AllAtOnceEntityRecoveryStrategySpec extends AkkaSpec { + val strategy = EntityRecoveryStrategy.allStrategy() + + import system.dispatcher + + "AllAtOnceEntityRecoveryStrategy" must { + "recover entities" in { + val entities = Set[EntityId]("1", "2", "3", "4", "5") + val startTime = System.currentTimeMillis() + val resultWithTimes = strategy.recoverEntities(entities).map( + _.map(entityIds ⇒ (entityIds, System.currentTimeMillis() - startTime)) + ) + + val result = Await.result(Future.sequence(resultWithTimes), 4 seconds).toList.sortWith(_._2 < _._2) + result.size should ===(1) + + val scheduledEntities = result.map(_._1) + scheduledEntities.head should ===(entities) + + val times = result.map(_._2) + times.head should ===(0L +- 20L) + } + + "not recover when no entities to recover" in { + val result = strategy.recoverEntities(Set[EntityId]()) + result.size should ===(0) + } + } +} \ No newline at end of file diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala new file mode 100644 index 0000000000..454218ca0f --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala @@ -0,0 +1,45 @@ +package akka.cluster.sharding + +import akka.cluster.sharding.ShardRegion.EntityId +import akka.testkit.AkkaSpec + +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ +import scala.language.postfixOps + +class ConstantRateEntityRecoveryStrategySpec extends AkkaSpec { + + import system.dispatcher + + val strategy = EntityRecoveryStrategy.constantStrategy(system, 500 millis, 2) + + "ConstantRateEntityRecoveryStrategy" must { + "recover entities" in { + val entities = Set[EntityId]("1", "2", "3", "4", "5") + val startTime = System.currentTimeMillis() + val resultWithTimes = strategy.recoverEntities(entities).map( + _.map(entityIds ⇒ (entityIds, System.currentTimeMillis() - startTime)) + ) + + val result = Await.result(Future.sequence(resultWithTimes), 4 seconds).toList.sortWith(_._2 < _._2) + result.size should ===(3) + + val scheduledEntities = result.map(_._1) + scheduledEntities.head.size should ===(2) + scheduledEntities(1).size should ===(2) + scheduledEntities(2).size should ===(1) + scheduledEntities.foldLeft(Set[EntityId]())(_ ++ _) should ===(entities) + + val times = result.map(_._2) + + times.head should ===(500L +- 30L) + times(1) should ===(1000L +- 30L) + times(2) should ===(1500L +- 30L) + } + + "not recover when no entities to recover" in { + val result = strategy.recoverEntities(Set[EntityId]()) + result.size should ===(0) + } + } +}