diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index c89d0036a5..6839d8e509 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -112,6 +112,17 @@ akka.cluster.sharding { # The "role" of the singleton configuration is not used. The singleton role will # be the same as "akka.cluster.sharding.role". coordinator-singleton = ${akka.cluster.singleton} + + # Settings for the Distributed Data replicator. Used when state-store-mode=ddata. + # Same layout as akka.cluster.distributed-data. + # The "role" of the distributed-data configuration is not used. The distributed-data + # role will be the same as "akka.cluster.sharding.role". + # Note that there is one Replicator per role and it's not possible + # to have different distributed-data settings for different sharding entity types. + distributed-data = ${akka.cluster.distributed-data} + distributed-data { + durable.keys = ["shard-*"] + } # The id of the dispatcher to use for ClusterSharding actors. # If not specified default dispatcher is used. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index a6d596cc48..ced39c457a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -25,6 +25,10 @@ import akka.pattern.BackoffSupervisor import akka.util.ByteString import akka.pattern.ask import akka.dispatch.Dispatchers +import akka.cluster.ddata.ReplicatorSettings +import akka.cluster.ddata.Replicator +import scala.util.control.NonFatal +import akka.actor.Status /** * This extension provides sharding functionality of actors in a cluster. @@ -413,7 +417,11 @@ private[akka] class ClusterShardingGuardian extends Actor { val cluster = Cluster(context.system) val sharding = ClusterSharding(context.system) - lazy val replicator = DistributedData(context.system).replicator + + private lazy val replicatorSettings = + ReplicatorSettings(context.system.settings.config.getConfig( + "akka.cluster.sharding.distributed-data")) + private var replicatorByRole = Map.empty[Option[String], ActorRef] private def coordinatorSingletonManagerName(encName: String): String = encName + "Coordinator" @@ -421,65 +429,102 @@ private[akka] class ClusterShardingGuardian extends Actor { private def coordinatorPath(encName: String): String = (self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress + private def replicator(settings: ClusterShardingSettings): ActorRef = { + if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { + // one Replicator per role + replicatorByRole.get(settings.role) match { + case Some(ref) ⇒ ref + case None ⇒ + val name = settings.role match { + case Some(r) ⇒ URLEncoder.encode(r, ByteString.UTF_8) + "Replicator" + case None ⇒ "replicator" + } + val ref = context.actorOf(Replicator.props(replicatorSettings.withRole(settings.role)), name) + replicatorByRole = replicatorByRole.updated(settings.role, ref) + ref + } + } else + context.system.deadLetters + } + def receive = { case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) ⇒ - import settings.role - import settings.tuningParameters.coordinatorFailureBackoff + try { + import settings.role + import settings.tuningParameters.coordinatorFailureBackoff + + val rep = replicator(settings) + val encName = URLEncoder.encode(typeName, ByteString.UTF_8) + val cName = coordinatorSingletonManagerName(encName) + val cPath = coordinatorPath(encName) + val shardRegion = context.child(encName).getOrElse { + if (context.child(cName).isEmpty) { + val coordinatorProps = + if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) + ShardCoordinator.props(typeName, settings, allocationStrategy) + else { + ShardCoordinator.props(typeName, settings, allocationStrategy, rep) + } + val singletonProps = BackoffSupervisor.props( + childProps = coordinatorProps, + childName = "coordinator", + minBackoff = coordinatorFailureBackoff, + maxBackoff = coordinatorFailureBackoff * 5, + randomFactor = 0.2).withDeploy(Deploy.local) + val singletonSettings = settings.coordinatorSingletonSettings + .withSingletonName("singleton").withRole(role) + context.actorOf( + ClusterSingletonManager.props( + singletonProps, + terminationMessage = PoisonPill, + singletonSettings).withDispatcher(context.props.dispatcher), + name = cName) + } - val encName = URLEncoder.encode(typeName, ByteString.UTF_8) - val cName = coordinatorSingletonManagerName(encName) - val cPath = coordinatorPath(encName) - val shardRegion = context.child(encName).getOrElse { - if (context.child(cName).isEmpty) { - val coordinatorProps = - if (settings.stateStoreMode == "persistence") - ShardCoordinator.props(typeName, settings, allocationStrategy) - else - ShardCoordinator.props(typeName, settings, allocationStrategy, replicator) - val singletonProps = BackoffSupervisor.props( - childProps = coordinatorProps, - childName = "coordinator", - minBackoff = coordinatorFailureBackoff, - maxBackoff = coordinatorFailureBackoff * 5, - randomFactor = 0.2).withDeploy(Deploy.local) - val singletonSettings = settings.coordinatorSingletonSettings - .withSingletonName("singleton").withRole(role) context.actorOf( - ClusterSingletonManager.props( - singletonProps, - terminationMessage = PoisonPill, - singletonSettings).withDispatcher(context.props.dispatcher), - name = cName) + ShardRegion.props( + typeName = typeName, + entityProps = entityProps, + settings = settings, + coordinatorPath = cPath, + extractEntityId = extractEntityId, + extractShardId = extractShardId, + handOffStopMessage = handOffStopMessage, + replicator = rep).withDispatcher(context.props.dispatcher), + name = encName) } - - context.actorOf( - ShardRegion.props( - typeName = typeName, - entityProps = entityProps, - settings = settings, - coordinatorPath = cPath, - extractEntityId = extractEntityId, - extractShardId = extractShardId, - handOffStopMessage = handOffStopMessage).withDispatcher(context.props.dispatcher), - name = encName) + sender() ! Started(shardRegion) + } catch { + case NonFatal(e) ⇒ + // don't restart + // could be invalid ReplicatorSettings, or InvalidActorNameException + // if it has already been started + sender() ! Status.Failure(e) } - sender() ! Started(shardRegion) case StartProxy(typeName, settings, extractEntityId, extractShardId) ⇒ - val encName = URLEncoder.encode(typeName, ByteString.UTF_8) - val cName = coordinatorSingletonManagerName(encName) - val cPath = coordinatorPath(encName) - val shardRegion = context.child(encName).getOrElse { - context.actorOf( - ShardRegion.proxyProps( - typeName = typeName, - settings = settings, - coordinatorPath = cPath, - extractEntityId = extractEntityId, - extractShardId = extractShardId).withDispatcher(context.props.dispatcher), - name = encName) + try { + val encName = URLEncoder.encode(typeName, ByteString.UTF_8) + val cName = coordinatorSingletonManagerName(encName) + val cPath = coordinatorPath(encName) + val shardRegion = context.child(encName).getOrElse { + context.actorOf( + ShardRegion.proxyProps( + typeName = typeName, + settings = settings, + coordinatorPath = cPath, + extractEntityId = extractEntityId, + extractShardId = extractShardId, + replicator = context.system.deadLetters).withDispatcher(context.props.dispatcher), + name = encName) + } + sender() ! Started(shardRegion) + } catch { + case NonFatal(e) ⇒ + // don't restart + // could be InvalidActorNameException if it has already been started + sender() ! Status.Failure(e) } - sender() ! Started(shardRegion) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 7f84d32624..5e31ce23a4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -11,6 +11,10 @@ import com.typesafe.config.Config import akka.cluster.singleton.ClusterSingletonManagerSettings object ClusterShardingSettings { + + val StateStoreModePersistence = "persistence" + val StateStoreModeDData = "ddata" + /** * Create settings from the default configuration * `akka.cluster.sharding`. @@ -155,9 +159,10 @@ final class ClusterShardingSettings( val tuningParameters: ClusterShardingSettings.TuningParameters, val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { + import ClusterShardingSettings.{ StateStoreModePersistence, StateStoreModeDData } require( - stateStoreMode == "persistence" || stateStoreMode == "ddata", - s"Unknown 'state-store-mode' [$stateStoreMode], valid values are 'persistence' or 'ddata'") + stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, + s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'") def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role)) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 8a145b4acf..911420d2f1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -19,6 +19,12 @@ import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotSuccess import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import akka.cluster.Cluster +import akka.cluster.ddata.ORSet +import akka.cluster.ddata.ORSetKey +import akka.cluster.ddata.Replicator._ +import akka.actor.Stash +import akka.cluster.ddata.DistributedData /** * INTERNAL API @@ -96,8 +102,12 @@ private[akka] object Shard { settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, - handOffStopMessage: Any): Props = { - if (settings.rememberEntities) + handOffStopMessage: Any, + replicator: ActorRef): Props = { + if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { + Props(new DDataShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, + handOffStopMessage, replicator)).withDeploy(Deploy.local) + } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) else @@ -125,8 +135,7 @@ private[akka] class Shard( import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized } import ShardCoordinator.Internal.{ HandOff, ShardStopped } - import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted } - import Shard.{ ShardQuery, GetCurrentShardState, CurrentShardState, GetShardStats, ShardStats } + import Shard._ import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage import akka.cluster.sharding.ShardRegion.ShardRegionCommand import settings.tuningParameters._ @@ -145,7 +154,7 @@ private[akka] class Shard( def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size } - def processChange[A](event: A)(handler: A ⇒ Unit): Unit = + def processChange[E <: StateChange](event: E)(handler: E ⇒ Unit): Unit = handler(event) def receive = receiveCommand @@ -304,80 +313,35 @@ private[akka] class Shard( } /** - * INTERNAL API - * - * This actor creates children entity actors on demand that it is told to be - * responsible for. It is used when `rememberEntities` is enabled. - * - * @see [[ClusterSharding$ ClusterSharding extension]] + * INTERNAL API: Common things for PersistentShard and DDataShard */ -private[akka] class PersistentShard( - typeName: String, - shardId: ShardRegion.ShardId, - entityProps: Props, - settings: ClusterShardingSettings, - extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId, - handOffStopMessage: Any) extends Shard( - typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) - with PersistentActor with ActorLogging { - +private[akka] trait RememberingShard { selfType: Shard ⇒ import ShardRegion.{ EntityId, Msg } - import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted } - import settings.tuningParameters._ + import Shard._ import akka.pattern.pipe - val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy = + protected val settings: ClusterShardingSettings + + protected val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy = { + import settings.tuningParameters._ entityRecoveryStrategy match { case "all" ⇒ EntityRecoveryStrategy.allStrategy() case "constant" ⇒ EntityRecoveryStrategy.constantStrategy( context.system, entityRecoveryConstantRateStrategyFrequency, - entityRecoveryConstantRateStrategyNumberOfEntities - ) - } - - override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" - - override def journalPluginId: String = settings.journalPluginId - - override def snapshotPluginId: String = settings.snapshotPluginId - - // would be initialized after recovery completed - override def initialized(): Unit = {} - - override def receive = receiveCommand - - override def processChange[A](event: A)(handler: A ⇒ Unit): Unit = { - saveSnapshotWhenNeeded() - persist(event)(handler) - } - - def saveSnapshotWhenNeeded(): Unit = { - if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) { - log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) - saveSnapshot(state) + entityRecoveryConstantRateStrategyNumberOfEntities) } } - override def receiveRecover: Receive = { - case EntityStarted(id) ⇒ state = state.copy(state.entities + id) - case EntityStopped(id) ⇒ state = state.copy(state.entities - id) - case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot - case RecoveryCompleted ⇒ - restartRememberedEntities() - super.initialized() - log.debug("Shard recovery completed {}", shardId) + protected def restartRememberedEntities(): Unit = { + rememberedEntitiesRecoveryStrategy.recoverEntities(state.entities).foreach { scheduledRecovery ⇒ + import context.dispatcher + scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self) + } } - override def receiveCommand: Receive = ({ - case _: SaveSnapshotSuccess ⇒ - log.debug("PersistentShard snapshot saved successfully") - case SaveSnapshotFailure(_, reason) ⇒ - log.warning("PersistentShard snapshot failure: {}", reason.getMessage) - }: Receive).orElse(super.receiveCommand) - override def entityTerminated(ref: ActorRef): Unit = { + import settings.tuningParameters._ val id = idByRef(ref) if (messageBuffers.getOrElse(id, Vector.empty).nonEmpty) { //Note; because we're not persisting the EntityStopped, we don't need @@ -404,17 +368,225 @@ private[akka] class PersistentShard( case None ⇒ //Note; we only do this if remembering, otherwise the buffer is an overhead messageBuffers = messageBuffers.updated(id, Vector((msg, snd))) - saveSnapshotWhenNeeded() - persist(EntityStarted(id))(sendMsgBuffer) + processChange(EntityStarted(id))(sendMsgBuffer) } } - private def restartRememberedEntities(): Unit = { - rememberedEntitiesRecoveryStrategy.recoverEntities(state.entities).foreach { scheduledRecovery ⇒ - import context.dispatcher - scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self) +} + +/** + * INTERNAL API + * + * This actor creates children entity actors on demand that it is told to be + * responsible for. It is used when `rememberEntities` is enabled and + * `state-store-mode=persistence`. + * + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +private[akka] class PersistentShard( + typeName: String, + shardId: ShardRegion.ShardId, + entityProps: Props, + override val settings: ClusterShardingSettings, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, + handOffStopMessage: Any) extends Shard( + typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) + with RememberingShard with PersistentActor with ActorLogging { + + import ShardRegion.{ EntityId, Msg } + import Shard._ + import settings.tuningParameters._ + + override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" + + override def journalPluginId: String = settings.journalPluginId + + override def snapshotPluginId: String = settings.snapshotPluginId + + // would be initialized after recovery completed + override def initialized(): Unit = {} + + override def receive = receiveCommand + + override def processChange[E <: StateChange](event: E)(handler: E ⇒ Unit): Unit = { + saveSnapshotWhenNeeded() + persist(event)(handler) + } + + def saveSnapshotWhenNeeded(): Unit = { + if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) { + log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) + saveSnapshot(state) } } + + override def receiveRecover: Receive = { + case EntityStarted(id) ⇒ state = state.copy(state.entities + id) + case EntityStopped(id) ⇒ state = state.copy(state.entities - id) + case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot + case RecoveryCompleted ⇒ + restartRememberedEntities() + super.initialized() + log.debug("PersistentShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size) + } + + override def receiveCommand: Receive = ({ + case _: SaveSnapshotSuccess ⇒ + log.debug("PersistentShard snapshot saved successfully") + case SaveSnapshotFailure(_, reason) ⇒ + log.warning("PersistentShard snapshot failure: {}", reason.getMessage) + }: Receive).orElse(super.receiveCommand) + +} + +/** + * INTERNAL API + * + * This actor creates children entity actors on demand that it is told to be + * responsible for. It is used when `rememberEntities` is enabled and + * `state-store-mode=ddata`. + * + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +private[akka] class DDataShard( + typeName: String, + shardId: ShardRegion.ShardId, + entityProps: Props, + override val settings: ClusterShardingSettings, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, + handOffStopMessage: Any, + replicator: ActorRef) extends Shard( + typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) + with RememberingShard with Stash with ActorLogging { + + import ShardRegion.{ EntityId, Msg } + import Shard._ + import settings.tuningParameters._ + + private val waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout + private val updatingStateTimeout = settings.tuningParameters.updatingStateTimeout + private val maxUpdateAttempts = 3 + + implicit private val node = Cluster(context.system) + + // The default maximum-frame-size is 256 KiB with Artery. + // ORSet with 40000 elements has a size of ~ 200000 bytes. + // By splitting the elements over 5 keys we can safely support 200000 entities per shard. + // This is by intention not configurable because it's important to have the same + // configuration on each node. + private val numberOfKeys = 5 + private val stateKeys: Array[ORSetKey[EntityId]] = + Array.tabulate(numberOfKeys)(i ⇒ ORSetKey[EntityId](s"shard-${typeName}-${shardId}-$i")) + + private def key(entityId: EntityId): ORSetKey[EntityId] = { + val i = (math.abs(entityId.hashCode) % numberOfKeys) + stateKeys(i) + } + + // get initial state from ddata replicator + getState() + + private def getState(): Unit = { + (0 until numberOfKeys).map { i ⇒ + replicator ! Get(stateKeys(i), ReadMajority(waitingForStateTimeout), Some(i)) + } + } + + // would be initialized after recovery completed + override def initialized(): Unit = {} + + override def receive = waitingForState(Set.empty) + + // This state will stash all commands + private def waitingForState(gotKeys: Set[Int]): Receive = { + def receiveOne(i: Int): Unit = { + val newGotKeys = gotKeys + i + if (newGotKeys.size == numberOfKeys) + recoveryCompleted() + else + context.become(waitingForState(newGotKeys)) + } + + { + case g @ GetSuccess(_, Some(i: Int)) ⇒ + val key = stateKeys(i) + state = state.copy(entities = state.entities union (g.get(key).elements)) + receiveOne(i) + + case GetFailure(_, _) ⇒ + log.error( + "The DDataShard was unable to get an initial state within 'waiting-for-state-timeout': {} millis", + waitingForStateTimeout.toMillis) + // parent ShardRegion supervisor will notice that it terminated and will start it again, after backoff + context.stop(self) + + case NotFound(_, Some(i: Int)) ⇒ + receiveOne(i) + + case _ ⇒ + stash() + } + } + + private def recoveryCompleted(): Unit = { + restartRememberedEntities() + super.initialized() + log.debug("DDataShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size) + unstashAll() + context.become(receiveCommand) + } + + override def processChange[E <: StateChange](event: E)(handler: E ⇒ Unit): Unit = { + context.become(waitingForUpdate(event, handler), discardOld = false) + sendUpdate(event, retryCount = 1) + } + + private def sendUpdate(evt: StateChange, retryCount: Int) = { + replicator ! Update(key(evt.entityId), ORSet.empty[EntityId], WriteMajority(updatingStateTimeout), + Some((evt, retryCount))) { existing ⇒ + evt match { + case EntityStarted(id) ⇒ existing + id + case EntityStopped(id) ⇒ existing - id + } + } + } + + // this state will stash all messages until it receives UpdateSuccess + private def waitingForUpdate[E <: StateChange](evt: E, afterUpdateCallback: E ⇒ Unit): Receive = { + case UpdateSuccess(_, Some((`evt`, _))) ⇒ + log.debug("The DDataShard state was successfully updated with {}", evt) + context.unbecome() + afterUpdateCallback(evt) + unstashAll() + + case UpdateTimeout(_, Some((`evt`, retryCount: Int))) ⇒ + if (retryCount == maxUpdateAttempts) { + // parent ShardRegion supervisor will notice that it terminated and will start it again, after backoff + log.error( + "The DDataShard was unable to update state after {} attempts, within 'updating-state-timeout'={} millis, event={}. " + + "Shard will be restarted after backoff.", + maxUpdateAttempts, updatingStateTimeout.toMillis, evt) + context.stop(self) + } else { + log.warning( + "The DDataShard was unable to update state, attempt {} of {}, within 'updating-state-timeout'={} millis, event={}", + retryCount, maxUpdateAttempts, updatingStateTimeout.toMillis, evt) + sendUpdate(evt, retryCount + 1) + } + + case ModifyFailure(_, error, cause, Some((`evt`, _))) ⇒ + log.error( + cause, + "The DDataShard was unable to update state with error {} and event {}. Shard will be restarted", + error, + evt) + throw cause + + case _ ⇒ stash() + } + } object EntityRecoveryStrategy { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index e8ce32f3e6..f7631a54fe 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -882,7 +882,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, } // this state will stash all messages until it receives UpdateSuccess - def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: DomainEvent ⇒ Unit): Receive = { + def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Receive = { case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒ log.debug("The coordinator state was successfully updated with {}", evt) context.unbecome() @@ -914,7 +914,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, } def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { - context.become(waitingForUpdate(evt, f.asInstanceOf[DomainEvent ⇒ Unit]), discardOld = false) + context.become(waitingForUpdate(evt, f), discardOld = false) sendUpdate(evt) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index c5b2982f8d..4366830ce9 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -37,9 +37,10 @@ object ShardRegion { coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, - handOffStopMessage: Any): Props = + handOffStopMessage: Any, + replicator: ActorRef): Props = Props(new ShardRegion(typeName, Some(entityProps), settings, coordinatorPath, extractEntityId, - extractShardId, handOffStopMessage)).withDeploy(Deploy.local) + extractShardId, handOffStopMessage, replicator)).withDeploy(Deploy.local) /** * INTERNAL API @@ -51,9 +52,10 @@ object ShardRegion { settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId): Props = - Props(new ShardRegion(typeName, None, settings, coordinatorPath, extractEntityId, extractShardId, PoisonPill)) - .withDeploy(Deploy.local) + extractShardId: ShardRegion.ExtractShardId, + replicator: ActorRef): Props = + Props(new ShardRegion(typeName, None, settings, coordinatorPath, extractEntityId, extractShardId, + PoisonPill, replicator)).withDeploy(Deploy.local) /** * Marker type of entity identifier (`String`). @@ -332,20 +334,23 @@ object ShardRegion { } /** + * INTERNAL API + * * This actor creates children entity actors on demand for the shards that it is told to be * responsible for. It delegates messages targeted to other shards to the responsible * `ShardRegion` actor on other nodes. * * @see [[ClusterSharding$ ClusterSharding extension]] */ -class ShardRegion( +private[akka] class ShardRegion( typeName: String, entityProps: Option[Props], settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, - handOffStopMessage: Any) extends Actor with ActorLogging { + handOffStopMessage: Any, + replicator: ActorRef) extends Actor with ActorLogging { import ShardCoordinator.Internal._ import ShardRegion._ @@ -762,7 +767,8 @@ class ShardRegion( settings, extractEntityId, extractShardId, - handOffStopMessage).withDispatcher(context.props.dispatcher), + handOffStopMessage, + replicator).withDispatcher(context.props.dispatcher), name)) shardsByRef = shardsByRef.updated(shard, id) shards = shards.updated(id, shard) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 476dbf3c9b..f31f78469b 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -92,12 +92,16 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) timeout = 5s store { native = off - dir = "target/journal-ClusterShardingCustomShardAllocationSpec" + dir = "target/ClusterShardingCustomShardAllocationSpec/journal" } } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec" + akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingCustomShardAllocationSpec/sharding-ddata + map-size = 10 MiB + } """)) } @@ -119,21 +123,16 @@ abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingC override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } def join(from: RoleName, to: RoleName): Unit = { @@ -159,23 +158,27 @@ abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingC lazy val allocator = system.actorOf(Props[Allocator], "allocator") + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + s"Cluster sharding ($mode) with custom allocation strategy" must { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") - runOn(first, second) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } + runOn(first, second) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } - enterBarrier("after-1") + enterBarrier("after-1") + } } "use specified region" in within(10.seconds) { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index 13f94ad14c..4440ce6f30 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -21,6 +21,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.testkit._ +import akka.cluster.MemberStatus object ClusterShardingFailureSpec { case class Get(id: String) @@ -64,16 +65,20 @@ abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiN timeout = 5s store { native = off - dir = "target/journal-ClusterShardingFailureSpec" + dir = "target/ClusterShardingFailureSpec/journal" } } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec" + akka.persistence.snapshot-store.local.dir = "target/ClusterShardingFailureSpec/snapshots" akka.cluster.sharding { coordinator-failure-backoff = 3s shard-failure-backoff = 3s state-store-mode = "$mode" } + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingFailureSpec/sharding-ddata + map-size = 10 MiB + } """)) testTransport(on = true) @@ -99,27 +104,31 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(controller) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(controller) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } + val cluster = Cluster(system) + def join(from: RoleName, to: RoleName): Unit = { runOn(from) { - Cluster(system) join node(to).address + cluster join node(to).address startSharding() + + within(remaining) { + awaitAssert { + cluster.state.members.map(_.uniqueAddress) should contain(cluster.selfUniqueAddress) + cluster.state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } } enterBarrier(from.name + "-joined") } @@ -135,23 +144,27 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf lazy val region = ClusterSharding(system).shardRegion("Entity") - s"Cluster sharding ($mode) with flaky journal" must { + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(controller) { - system.actorOf(Props[SharedLeveldbStore], "store") + s"Cluster sharding ($mode) with flaky journal/network" must { + + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(controller) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + runOn(first, second) { + system.actorSelection(node(controller) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") } - enterBarrier("peristence-started") - - runOn(first, second) { - system.actorSelection(node(controller) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } - - enterBarrier("after-1") } "join cluster" in within(20.seconds) { @@ -173,15 +186,19 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf enterBarrier("after-2") } - "recover after journal failure" in within(20.seconds) { + "recover after journal/network failure" in within(20.seconds) { runOn(controller) { - testConductor.blackhole(controller, first, Direction.Both).await - testConductor.blackhole(controller, second, Direction.Both).await + if (isDdataMode) + testConductor.blackhole(first, second, Direction.Both).await + else { + testConductor.blackhole(controller, first, Direction.Both).await + testConductor.blackhole(controller, second, Direction.Both).await + } } enterBarrier("journal-blackholed") runOn(first) { - // try with a new shard, will not reply until journal is available again + // try with a new shard, will not reply until journal/network is available again region ! Add("40", 4) val probe = TestProbe() region.tell(Get("40"), probe.ref) @@ -191,8 +208,12 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf enterBarrier("first-delayed") runOn(controller) { - testConductor.passThrough(controller, first, Direction.Both).await - testConductor.passThrough(controller, second, Direction.Both).await + if (isDdataMode) + testConductor.passThrough(first, second, Direction.Both).await + else { + testConductor.passThrough(controller, first, Direction.Both).await + testConductor.passThrough(controller, second, Direction.Both).await + } } enterBarrier("journal-ok") @@ -202,13 +223,13 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf val entity21 = lastSender val shard2 = system.actorSelection(entity21.path.parent) - //Test the ShardCoordinator allocating shards during a journal failure + //Test the ShardCoordinator allocating shards after a journal/network failure region ! Add("30", 3) - //Test the Shard starting entities and persisting during a journal failure + //Test the Shard starting entities and persisting after a journal/network failure region ! Add("11", 1) - //Test the Shard passivate works during a journal failure + //Test the Shard passivate works after a journal failure shard2.tell(Passivate(PoisonPill), entity21) region ! Add("21", 1) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala index faa80a54ae..1601c052cd 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala @@ -54,6 +54,10 @@ object ClusterShardingGetStateSpecConfig extends MultiNodeConfig { shard-failure-backoff = 3s state-store-mode = "ddata" } + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingGetStateSpec/sharding-ddata + map-size = 10 MiB + } """)) nodeConfig(first, second)(ConfigFactory.parseString( diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 91409e0daf..46f25bcc03 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -56,6 +56,10 @@ object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig { updating-state-timeout = 2s waiting-for-state-timeout = 2s } + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingGetStatsSpec/sharding-ddata + map-size = 10 MiB + } akka.actor.warn-about-java-serializer-usage=false """)) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index 006bc62f29..fa7c962190 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -53,12 +53,16 @@ abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) exten timeout = 5s store { native = off - dir = "target/journal-ClusterShardingGracefulShutdownSpec" + dir = "target/ClusterShardingGracefulShutdownSpec/journal" } } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec" + akka.persistence.snapshot-store.local.dir = "target/ClusterShardingGracefulShutdownSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingGracefulShutdownSpec/sharding-ddata + map-size = 10 MiB + } """)) } @@ -80,21 +84,16 @@ abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracef override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } def join(from: RoleName, to: RoleName): Unit = { @@ -119,23 +118,27 @@ abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracef lazy val region = ClusterSharding(system).shardRegion("Entity") + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + s"Cluster sharding ($mode)" must { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") - runOn(first, second) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } + runOn(first, second) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } - enterBarrier("after-1") + enterBarrier("after-1") + } } "start some shards in both regions" in within(30.seconds) { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 01d6a3be5d..5c165ef170 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -71,12 +71,16 @@ abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiN timeout = 5s store { native = off - dir = "target/journal-ClusterShardingLeavingSpec" + dir = "target/ClusterShardingLeavingSpec/journal" } } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec" + akka.persistence.snapshot-store.local.dir = "target/ClusterShardingLeavingSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingLeavingSpec/sharding-ddata + map-size = 10 MiB + } """)) } @@ -102,21 +106,16 @@ abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConf override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } val cluster = Cluster(system) @@ -145,21 +144,25 @@ abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConf lazy val region = ClusterSharding(system).shardRegion("Entity") + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + s"Cluster sharding ($mode) with leaving member" must { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + + enterBarrier("after-1") } - enterBarrier("peristence-started") - - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - - enterBarrier("after-1") } "join cluster" in within(20.seconds) { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala index d3e6302c61..398a664f9d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -49,13 +49,17 @@ abstract class ClusterShardingMinMembersSpecConfig(val mode: String) extends Mul timeout = 5s store { native = off - dir = "target/journal-ClusterShardingMinMembersSpec" + dir = "target/ClusterShardingMinMembersSpec/journal" } } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingMinMembersSpec" + akka.persistence.snapshot-store.local.dir = "target/ClusterShardingMinMembersSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" akka.cluster.sharding.rebalance-interval = 120s #disable rebalance + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ClusterShardingMinMembersSpec/sharding-ddata + map-size = 10 MiB + } akka.cluster.min-nr-of-members = 3 """)) } @@ -80,21 +84,16 @@ abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSp override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } def join(from: RoleName, to: RoleName): Unit = { @@ -120,23 +119,27 @@ abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSp lazy val region = ClusterSharding(system).shardRegion("Entity") + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + s"Cluster with min-nr-of-members using sharding ($mode)" must { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") - runOn(first, second, third) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } + runOn(first, second, third) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } - enterBarrier("after-1") + enterBarrier("after-1") + } } "use all nodes" in within(30.seconds) { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 6065e1c4fc..ad4a9f6db3 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -61,45 +61,54 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten timeout = 5s store { native = off - dir = "target/journal-ClusterShardingRememberEntitiesSpec" + dir = "target/ShardingRememberEntitiesSpec/journal" } } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingRememberEntitiesSpec" + akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ShardingRememberEntitiesSpec/sharding-ddata + map-size = 10 MiB + } """)) } -object PersistentClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig("persistence") -object DDataClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig("ddata") +object PersistentClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig( + ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig( + ClusterShardingSettings.StateStoreModeDData) -class PersistentClusterShardingRememberEntitiesSpec extends ClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig) +class PersistentClusterShardingRememberEntitiesSpec extends ClusterShardingRememberEntitiesSpec( + PersistentClusterShardingRememberEntitiesSpecConfig) class PersistentClusterShardingRememberEntitiesMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesSpec class PersistentClusterShardingRememberEntitiesMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesSpec class PersistentClusterShardingRememberEntitiesMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesSpec +class DDataClusterShardingRememberEntitiesSpec extends ClusterShardingRememberEntitiesSpec( + DDataClusterShardingRememberEntitiesSpecConfig) + +class DDataClusterShardingRememberEntitiesMultiJvmNode1 extends DDataClusterShardingRememberEntitiesSpec +class DDataClusterShardingRememberEntitiesMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec +class DDataClusterShardingRememberEntitiesMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec + abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememberEntitiesSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingRememberEntitiesSpec._ import config._ override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(first) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } def join(from: RoleName, to: RoleName): Unit = { @@ -122,23 +131,27 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb lazy val region = ClusterSharding(system).shardRegion("Entity") + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + s"Cluster with min-nr-of-members using sharding ($mode)" must { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") - runOn(second, third) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } + runOn(second, third) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } - enterBarrier("after-1") + enterBarrier("after-1") + } } "start remembered entities when coordinator fail over" in within(30.seconds) { @@ -148,6 +161,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb region ! 1 expectMsgType[Started] } + enterBarrier("second-started") join(third, second) runOn(third) { @@ -164,6 +178,12 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb enterBarrier("all-up") runOn(first) { + if (isDdataMode) { + // Entity 1 in region of first node was started when there was only one node + // and then the remembering state will be replicated to second node by the + // gossip. So we must give that a chance to replicate before shutting down second. + Thread.sleep(5000) + } testConductor.exit(second, 0).await } enterBarrier("crash-second") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 071e2f4826..109edb10ca 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -137,10 +137,10 @@ abstract class ClusterShardingSpecConfig( akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared.store { native = off - dir = "target/journal-ClusterShardingSpec" + dir = "target/ClusterShardingSpec/journal" } akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec" + akka.persistence.snapshot-store.local.dir = "target/ClusterShardingSpec/snapshots" akka.cluster.sharding { retry-interval = 1 s handoff-timeout = 10 s @@ -157,6 +157,10 @@ abstract class ClusterShardingSpecConfig( rebalance-threshold = 2 max-simultaneous-rebalance = 1 } + distributed-data.durable.lmdb { + dir = target/ClusterShardingSpec/sharding-ddata + map-size = 10 MiB + } } akka.testconductor.barrier-timeout = 70s """)) @@ -237,21 +241,16 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu override def initialParticipants = roles.size - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup() { - runOn(controller) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") } override protected def afterTermination() { - runOn(controller) { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) - } + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) } def join(from: RoleName, to: RoleName): Unit = { @@ -262,9 +261,10 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu enterBarrier(from.name + "-joined") } + lazy val replicator = system.actorOf(Replicator.props( + ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator") + def createCoordinator(): Unit = { - val replicator = system.actorOf(Replicator.props( - ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator") def coordinatorProps(typeName: String, rebalanceEnabled: Boolean, rememberEntities: Boolean) = { val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) @@ -316,7 +316,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", extractEntityId = extractEntityId, extractShardId = extractShardId, - handOffStopMessage = PoisonPill), + handOffStopMessage = PoisonPill, + replicator), name = typeName + "Region") } @@ -329,8 +330,11 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu lazy val rebalancingPersistentRegion = createRegion("RebalancingRememberCounter", rememberEntities = true) lazy val autoMigrateRegion = createRegion("AutoMigrateRememberRegionTest", rememberEntities = true) + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + s"Cluster sharding ($mode)" must { + // must be done also in ddata mode since Counter is PersistentActor "setup shared journal" in { // start the Persistence extension Persistence(system) @@ -440,7 +444,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu settings, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", extractEntityId = extractEntityId, - extractShardId = extractShardId), + extractShardId = extractShardId, + system.deadLetters), name = "regionProxy") proxy ! Get(1) @@ -802,7 +807,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu system.actorSelection(shard / "1") ! Identify(3) expectMsg(ActorIdentity(3, None)) - //Check counter 13 is alive again 8 + //Check counter 13 is alive again system.actorSelection(shard / "13") ! Identify(4) expectMsgType[ActorIdentity](3 seconds).ref should not be (None) } diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index 5061039e87..ad8a7502a1 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -72,6 +72,12 @@ akka.cluster.distributed-data { # and its remote port. # 2. Otherwise the path is used as is, as a relative or absolute path to # a directory. + # + # When running in production you may want to configure this to a specific + # path (alt 2), since the default directory contains the remote port of the + # actor system to make the name unique. If using a dynamically assigned + # port (0) it will be different each time and the previously stored data + # will not be loaded. dir = "ddata" # Size in bytes of the memory mapped file. diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala index 621dd609ca..fd9d6437c5 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala @@ -115,6 +115,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { case path ⇒ new File(path) } + log.info("Using durable data in LMDB directory [{}]", dir.getCanonicalPath) dir.mkdirs() Env.create() .setMapSize(mapSize) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index 63f0b8165e..e152d2c5b0 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -50,10 +50,11 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( newBlob should equal(oldBlob) } - def checkSerialization(obj: AnyRef): Unit = { + def checkSerialization(obj: AnyRef): Int = { val blob = serializer.toBinary(obj) val ref = serializer.fromBinary(blob, serializer.manifest(obj)) ref should be(obj) + blob.length } def checkSameContent(a: AnyRef, b: AnyRef): Unit = { @@ -101,6 +102,31 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( checkSameContent(s3.merge(s4), s4.merge(s3)) } + "serialize large GSet" in { + val largeSet = (10000 until 20000).foldLeft(GSet.empty[String]) { + case (acc, n) ⇒ acc.add(n.toString) + } + val numberOfBytes = checkSerialization(largeSet) + info(s"size of GSet with ${largeSet.size} elements: $numberOfBytes bytes") + numberOfBytes should be <= (80000) + } + + "serialize large ORSet" in { + val largeSet = (10000 until 20000).foldLeft(ORSet.empty[String]) { + case (acc, n) ⇒ + val address = (n % 3) match { + case 0 ⇒ address1 + case 1 ⇒ address2 + case 2 ⇒ address3 + } + acc.add(address, n.toString) + } + val numberOfBytes = checkSerialization(largeSet) + // note that ORSet is compressed, and therefore smaller than GSet + info(s"size of ORSet with ${largeSet.size} elements: $numberOfBytes bytes") + numberOfBytes should be <= (50000) + } + "serialize Flag" in { checkSerialization(Flag()) checkSerialization(Flag().switchOn) diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index 45063fc957..fb7db68922 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -185,6 +185,8 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards. +.. _cluster_sharding_ddata_java: + Distributed Data Mode --------------------- @@ -197,19 +199,18 @@ This mode can be enabled by setting configuration property:: akka.cluster.sharding.state-store-mode = ddata -It is using the Distributed Data extension that must be running on all nodes in the cluster. -Therefore you should add that extension to the configuration to make sure that it is started -on all nodes:: - - akka.extensions += "akka.cluster.ddata.DistributedData" +It is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of +all nodes for some entity types and another subset for other entity types. Each such replicator has a name +that contains the node role and therefore the role configuration must be the same on all nodes in the +cluster, i.e. you can't change the roles when performing a rolling upgrade. + +The settings for Distributed Data is configured in the the section +``akka.cluster.sharding.distributed-data``. It's not possible to have different +``distributed-data`` settings for different sharding entity types. You must explicitly add the ``akka-distributed-data-experimental`` dependency to your build if you use this mode. It is possible to remove ``akka-persistence`` dependency from a project if it -is not used in user code and ``remember-entities`` is ``off``. -Using it together with ``Remember Entities`` shards will be recreated after rebalancing, however will -not be recreated after a clean cluster start as the Sharding Coordinator state is empty after a clean cluster -start when using ddata mode. When ``Remember Entities`` is ``on`` Sharding Region always keeps data usig persistence, -no matter how ``State Store Mode`` is set. +is not used in user code. .. warning:: @@ -261,6 +262,13 @@ a ``Passivate`` message must be sent to the parent of the entity actor, otherwis entity will be automatically restarted after the entity restart backoff specified in the configuration. +When :ref:`cluster_sharding_ddata_java` is used the identifiers of the entities are +stored in :ref:`ddata_durable_java` of Distributed Data. You may want to change the +configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since +the default directory contains the remote port of the actor system. If using a dynamically +assigned port (0) it will be different each time and the previously stored data will not +be loaded. + When ``rememberEntities`` is set to false, a ``Shard`` will not automatically restart any entities after a rebalance or recovering from a crash. Entities will only be started once the first message for that entity has been received in the ``Shard``. Entities will not be restarted if they stop without diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index d1eca4c7f9..320a996616 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -451,7 +451,9 @@ works with any type that has a registered Akka serializer. This is how such an s look like for the ``TwoPhaseSet``: .. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java#serializer - + +.. _ddata_durable_java: + Durable Storage --------------- @@ -487,6 +489,12 @@ The location of the files for the data is configured with:: # a directory. akka.cluster.distributed-data.lmdb.dir = "ddata" +When running in production you may want to configure the directory to a specific +path (alt 2), since the default directory contains the remote port of the +actor system to make the name unique. If using a dynamically assigned +port (0) it will be different each time and the previously stored data +will not be loaded. + Making the data durable has of course a performance cost. By default, each update is flushed to disk before the ``UpdateSuccess`` reply is sent. For better performance, but with the risk of losing the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 7c83c57065..14a71c1c0e 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -188,6 +188,8 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards. +.. _cluster_sharding_ddata_scala: + Distributed Data Mode --------------------- @@ -200,19 +202,18 @@ This mode can be enabled by setting configuration property:: akka.cluster.sharding.state-store-mode = ddata -It is using the Distributed Data extension that must be running on all nodes in the cluster. -Therefore you should add that extension to the configuration to make sure that it is started -on all nodes:: - - akka.extensions += "akka.cluster.ddata.DistributedData" +It is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of +all nodes for some entity types and another subset for other entity types. Each such replicator has a name +that contains the node role and therefore the role configuration must be the same on all nodes in the +cluster, i.e. you can't change the roles when performing a rolling upgrade. + +The settings for Distributed Data is configured in the the section +``akka.cluster.sharding.distributed-data``. It's not possible to have different +``distributed-data`` settings for different sharding entity types. You must explicitly add the ``akka-distributed-data-experimental`` dependency to your build if you use this mode. It is possible to remove ``akka-persistence`` dependency from a project if it -is not used in user code and ``remember-entities`` is ``off``. -Using it together with ``Remember Entities`` shards will be recreated after rebalancing, however will -not be recreated after a clean cluster start as the Sharding Coordinator state is empty after a clean cluster -start when using ddata mode. When ``Remember Entities`` is ``on`` Sharding Region always keeps data usig persistence, -no matter how ``State Store Mode`` is set. +is not used in user code. .. warning:: @@ -264,6 +265,13 @@ a ``Passivate`` message must be sent to the parent of the entity actor, otherwis entity will be automatically restarted after the entity restart backoff specified in the configuration. +When :ref:`cluster_sharding_ddata_scala` is used the identifiers of the entities are +stored in :ref:`ddata_durable_scala` of Distributed Data. You may want to change the +configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since +the default directory contains the remote port of the actor system. If using a dynamically +assigned port (0) it will be different each time and the previously stored data will not +be loaded. + When ``rememberEntities`` is set to false, a ``Shard`` will not automatically restart any entities after a rebalance or recovering from a crash. Entities will only be started once the first message for that entity has been received in the ``Shard``. Entities will not be restarted if they stop without diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 5c4be461f6..366ada5788 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -463,7 +463,9 @@ works with any type that has a registered Akka serializer. This is how such an s look like for the ``TwoPhaseSet``: .. includecode:: code/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala#serializer - + +.. _ddata_durable_scala: + Durable Storage --------------- @@ -499,6 +501,12 @@ The location of the files for the data is configured with:: # a directory. akka.cluster.distributed-data.durable.lmdb.dir = "ddata" +When running in production you may want to configure the directory to a specific +path (alt 2), since the default directory contains the remote port of the +actor system to make the name unique. If using a dynamically assigned +port (0) it will be different each time and the previously stored data +will not be loaded. + Making the data durable has of course a performance cost. By default, each update is flushed to disk before the ``UpdateSuccess`` reply is sent. For better performance, but with the risk of losing the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during diff --git a/project/MiMa.scala b/project/MiMa.scala index b54049f914..23bf5d8847 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -156,6 +156,12 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Metric$Builder"), ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Number$Builder"), + // #22154 Sharding remembering entities with ddata, internal actors + FilterAnyProblemStartingWith("akka.cluster.sharding.Shard"), + FilterAnyProblemStartingWith("akka.cluster.sharding.PersistentShard"), + FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"), + FilterAnyProblemStartingWith("akka.cluster.sharding.ShardRegion"), + // #21537 coordinated shutdown ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"),