diff --git a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml
index 6819d3ed3d..51541ba6f6 100644
--- a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml
+++ b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml
@@ -8,10 +8,8 @@
%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n
-
-
-
-
+
+
diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.4.backwards.excludes/remember-entities-refactor.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.6.backwards.excludes/remember-entities-refactor.excludes
similarity index 100%
rename from akka-cluster-sharding/src/main/mima-filters/2.6.4.backwards.excludes/remember-entities-refactor.excludes
rename to akka-cluster-sharding/src/main/mima-filters/2.6.6.backwards.excludes/remember-entities-refactor.excludes
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala
index 3921dc7a52..1065d7b39c 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala
@@ -23,6 +23,7 @@ object ClusterShardingSettings {
val StateStoreModeDData = "ddata"
/**
+ * Only for testing
* INTERNAL API
*/
@InternalApi
@@ -308,9 +309,8 @@ final class ClusterShardingSettings(
extends NoSerializationVerificationNeeded {
@deprecated(
- "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStoreinstead",
- "2.6.6" // TODO update once merged
- )
+ "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStore instead",
+ "2.6.7")
def this(
role: Option[String],
rememberEntities: Boolean,
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala
index dd8e31d4bf..a3eb6b609b 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala
@@ -16,7 +16,7 @@ import akka.persistence.journal.EventSeq
* INTERNAL API
*/
@InternalApi
-final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter {
+private[akka] final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter {
override def manifest(event: Any): String =
""
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
index 864365328c..b5bb7ea8d4 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
@@ -177,9 +177,11 @@ private[akka] object Shard {
}
object RememberingStart {
+ val empty = new RememberingStart(Set.empty)
+
def apply(ackTo: Option[ActorRef]): RememberingStart =
ackTo match {
- case None => RememberingStartNoAck
+ case None => empty
case Some(ackTo) => RememberingStart(Set(ackTo))
}
}
@@ -194,16 +196,15 @@ private[akka] object Shard {
case active: Active => active
case r: RememberingStart =>
if (ackTo.isEmpty) {
- if (r.ackTo.isEmpty) RememberingStartNoAck
+ if (r.ackTo.isEmpty) RememberingStart.empty
else newState
} else {
if (r.ackTo.isEmpty) this
- else RememberingStart(ackTo ++ r.ackTo)
+ else RememberingStart(ackTo.union(r.ackTo))
}
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
}
}
- private val RememberingStartNoAck = new RememberingStart(Set.empty)
/**
* When remember entities is enabled an entity is in this state while
@@ -245,8 +246,9 @@ private[akka] object Shard {
final class Entities(log: LoggingAdapter, rememberingEntities: Boolean, verboseDebug: Boolean) {
private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]()
- // needed to look up entity by reg when a Passivating is received
+ // needed to look up entity by ref when a Passivating is received
private val byRef = new util.HashMap[ActorRef, EntityId]()
+ // optimization to not have to go through all entities to find batched writes
private val remembering = new util.HashSet[EntityId]()
def alreadyRemembered(set: Set[EntityId]): Unit = {
@@ -300,9 +302,11 @@ private[akka] object Shard {
case _ => OptionVal.None
}
- def entityState(id: EntityId): EntityState = {
- OptionVal(entities.get(id)).getOrElse(NoState)
- }
+ def entityState(id: EntityId): EntityState =
+ entities.get(id) match {
+ case null => NoState
+ case state => state
+ }
def entityId(ref: ActorRef): OptionVal[EntityId] = OptionVal(byRef.get(ref))
@@ -400,7 +404,7 @@ private[akka] class Shard(
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
- final val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging")
+ private val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging")
private val rememberEntitiesStore: Option[ActorRef] =
rememberEntitiesProvider.map { provider =>
@@ -485,7 +489,9 @@ private[akka] class Shard(
tryGetLease(lease.get)
case ll: LeaseLost =>
receiveLeaseLost(ll)
- case _ =>
+ case msg =>
+ if (verboseDebug)
+ log.debug("Got msg of type [{}] from [{}] while waiting for lease, stashing", msg.getClass, sender())
stash()
}
@@ -520,7 +526,10 @@ private[akka] class Shard(
loadingEntityIdsFailed()
case msg =>
if (verboseDebug)
- log.debug("Got msg of type [{}] from [{}] while waiting for remember entitites", msg.getClass, sender())
+ log.debug(
+ "Got msg of type [{}] from [{}] while waiting for remember entities, stashing",
+ msg.getClass,
+ sender())
stash()
}
@@ -578,11 +587,13 @@ private[akka] class Shard(
storingStarts.mkString(", "),
storingStops.mkString(", "))
- storingStarts.foreach { entityId =>
- flightRecorder.rememberEntityAdd(entityId)
- }
- storingStops.foreach { id =>
- flightRecorder.rememberEntityRemove(id)
+ if (flightRecorder != NoOpShardingFlightRecorder) {
+ storingStarts.foreach { entityId =>
+ flightRecorder.rememberEntityAdd(entityId)
+ }
+ storingStops.foreach { id =>
+ flightRecorder.rememberEntityRemove(id)
+ }
}
val startTimeNanos = System.nanoTime()
val update = RememberEntitiesShardStore.Update(started = storingStarts, stopped = storingStops)
@@ -612,7 +623,7 @@ private[akka] class Shard(
onUpdateDone(storedStarts, storedStops)
case RememberEntityTimeout(`update`) =>
- log.error("Remember entity store did not respond, crashing shard")
+ log.error("Remember entity store did not respond, restarting shard")
throw new RuntimeException(
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
index c724cb2a93..eefea65243 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
@@ -1422,7 +1422,7 @@ private[akka] class DDataShardCoordinator(
if (shardIds.nonEmpty) {
val newUnallocatedShards = state.unallocatedShards.union(shardIds.diff(state.shards.keySet))
state = state.copy(unallocatedShards = newUnallocatedShards)
- newUnallocatedShards.foreach { self ! GetShardHome(_) }
+ allocateShardHomesForRememberEntities()
}
timers.cancel(RememberEntitiesTimeoutKey)
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala
index 293ab548ef..8331fc6527 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala
@@ -6,6 +6,7 @@ package akka.cluster.sharding.internal
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Props
+import akka.annotation.InternalApi
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion.ShardId
import akka.event.Logging
@@ -15,6 +16,7 @@ import akka.event.Logging
*
* Only intended for testing, not an extension point.
*/
+@InternalApi
private[akka] final class CustomStateStoreModeProvider(
typeName: String,
system: ActorSystem,
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
index e04bda991e..6766f8d5db 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
@@ -178,7 +178,7 @@ private[akka] final class DDataRememberEntitiesShardStore(
}
private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = {
- val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped))
+ val allEvts: Set[Evt] = (update.started.map(Started(_): Evt).union(update.stopped.map(Stopped)))
// map from set of evts (for same ddata key) to one update that applies each of them
val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] =
allEvts.groupBy(evt => key(evt.id)).map {
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala
index 9eb108b5e6..c8bc240a80 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala
@@ -80,7 +80,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
override def receiveRecover: Receive = {
case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids))
- case EntitiesStopped(ids) => state = state.copy(state.entities -- ids)
+ case EntitiesStopped(ids) => state = state.copy(state.entities.diff(ids))
case SnapshotOffer(_, snapshot: State) => state = snapshot
case RecoveryCompleted =>
log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size)
@@ -97,7 +97,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
left -= 1
if (left == 0) {
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
- state.copy(state.entities.union(started) -- stopped)
+ state.copy(state.entities.union(started).diff(stopped))
saveSnapshotWhenNeeded()
}
}
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala
index f17749fffe..e66c95cde2 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala
@@ -54,6 +54,8 @@ private[akka] final class RememberEntityStarter(
implicit val ec: ExecutionContext = context.dispatcher
import RememberEntityStarter._
+ require(ids.nonEmpty)
+
private var idsLeftToStart = Set.empty[EntityId]
private var waitingForAck = Set.empty[EntityId]
private var entitiesMoved = Set.empty[EntityId]
diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md
index f9c8cce3d7..1cf6671945 100644
--- a/akka-docs/src/main/paradox/typed/cluster-sharding.md
+++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md
@@ -284,7 +284,7 @@ akka.cluster.sharding.state-store-mode = ddata
```
The state of the `ShardCoordinator` is replicated across the cluster but is not stored to disk.
-@ref:[Distributed Data](distributed-data.md) handles the `ShardCoordinator`'s state with `WriteMajority`/`ReadMajority` consistency.
+@ref:[Distributed Data](distributed-data.md) handles the `ShardCoordinator`'s state with `WriteMajorityPlus`/`ReadMajorityPlus` consistency.
When all nodes in the cluster have been stopped, the state is no longer needed and dropped.
Cluster Sharding uses its own Distributed Data `Replicator` per node.
@@ -337,7 +337,7 @@ When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto anothe
node or recovers after a crash, it will recreate all the entities which were previously
running in that `Shard`.
-To permanently stop entities sent a `ClusterSharding.Passivate` to the
+To permanently stop entities send a `ClusterSharding.Passivate` to the
@scala[`ActorRef[ShardCommand]`]@java[`ActorRef`] that was passed in to
the factory method when creating the entity.
Otherwise, the entity will be automatically restarted after the entity restart backoff specified in the configuration.
@@ -397,10 +397,10 @@ If using remembered entities there are two migration options:
reads the data written by the old `persistence` mode. Your remembered entities will be remembered after a full cluster restart.
For migrating existing remembered entities an event adapter needs to be configured in the config for the journal you use in your `application.conf`.
-In this example `leveldb` is the used journal:
+In this example `cassandra` is the used journal:
```
-akka.persistence.journal.leveldb {
+akka.persistence.cassandra.journal {
event-adapters {
coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter"
}