Cluster Sharding with remember-entity enabled fails to recover after restart #20744

This commit is contained in:
Peter Barron 2016-08-01 09:46:09 +01:00 committed by Johan Andrén
parent 65d9676164
commit 1f9c374bd9
6 changed files with 259 additions and 20 deletions

View file

@ -6,16 +6,19 @@ package akka.cluster.sharding
import java.net.URLEncoder
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Deploy
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.sharding.Shard.{ ShardCommand }
import akka.cluster.sharding.Shard.ShardCommand
import akka.persistence.PersistentActor
import akka.persistence.SnapshotOffer
import akka.actor.Actor
import akka.persistence.RecoveryCompleted
import akka.persistence.SaveSnapshotFailure
import akka.persistence.SaveSnapshotSuccess
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
@ -35,6 +38,12 @@ private[akka] object Shard {
*/
final case class RestartEntity(entity: EntityId) extends ShardCommand
/**
* When initialising a shard with remember entities enabled the following message is used
* to restart batches of entity actors at a time.
*/
final case class RestartEntities(entity: Set[EntityId]) extends ShardCommand
/**
* A case class which represents a state change for the Shard
*/
@ -116,7 +125,7 @@ private[akka] class Shard(
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized }
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
import Shard.{ State, RestartEntity, EntityStopped, EntityStarted }
import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted }
import Shard.{ ShardQuery, GetCurrentShardState, CurrentShardState, GetShardStats, ShardStats }
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
import akka.cluster.sharding.ShardRegion.ShardRegionCommand
@ -151,7 +160,8 @@ private[akka] class Shard(
}
def receiveShardCommand(msg: ShardCommand): Unit = msg match {
case RestartEntity(id) getEntity(id)
case RestartEntity(id) getEntity(id)
case RestartEntities(ids) ids foreach getEntity
}
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
@ -313,8 +323,19 @@ private[akka] class PersistentShard(
with PersistentActor with ActorLogging {
import ShardRegion.{ EntityId, Msg }
import Shard.{ State, RestartEntity, EntityStopped, EntityStarted }
import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted }
import settings.tuningParameters._
import akka.pattern.pipe
val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy =
entityRecoveryStrategy match {
case "all" EntityRecoveryStrategy.allStrategy()
case "constant" EntityRecoveryStrategy.constantStrategy(
context.system,
entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities
)
}
override def persistenceId = s"/sharding/${typeName}Shard/${shardId}"
@ -344,7 +365,7 @@ private[akka] class PersistentShard(
case EntityStopped(id) state = state.copy(state.entities - id)
case SnapshotOffer(_, snapshot: State) state = snapshot
case RecoveryCompleted
state.entities foreach getEntity
restartRememberedEntities()
super.initialized()
log.debug("Shard recovery completed {}", shardId)
}
@ -388,4 +409,45 @@ private[akka] class PersistentShard(
}
}
private def restartRememberedEntities(): Unit = {
rememberedEntitiesRecoveryStrategy.recoverEntities(state.entities).foreach { scheduledRecovery
import context.dispatcher
scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self)
}
}
}
object EntityRecoveryStrategy {
def allStrategy(): EntityRecoveryStrategy = new AllAtOnceEntityRecoveryStrategy()
def constantStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int): EntityRecoveryStrategy =
new ConstantRateEntityRecoveryStrategy(actorSystem, frequency, numberOfEntities)
}
trait EntityRecoveryStrategy {
import ShardRegion.EntityId
import scala.concurrent.Future
def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]]
}
final class AllAtOnceEntityRecoveryStrategy extends EntityRecoveryStrategy {
import ShardRegion.EntityId
override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] =
if (entities.isEmpty) Set.empty else Set(Future.successful(entities))
}
final class ConstantRateEntityRecoveryStrategy(actorSystem: ActorSystem, frequency: FiniteDuration, numberOfEntities: Int) extends EntityRecoveryStrategy {
import ShardRegion.EntityId
import akka.pattern.after
import actorSystem.dispatcher
override def recoverEntities(entities: Set[EntityId]): Set[Future[Set[EntityId]]] =
entities.grouped(numberOfEntities).foldLeft((frequency, Set[Future[Set[EntityId]]]())) {
case ((interval, scheduledEntityIds), entityIds)
(interval + frequency, scheduledEntityIds + scheduleEntities(interval, entityIds))
}._2
private def scheduleEntities(interval: FiniteDuration, entityIds: Set[EntityId]) =
after(interval, actorSystem.scheduler)(Future.successful[Set[EntityId]](entityIds))
}