From d2dd80fe17451848b2407ece13947a0622eec343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 15:45:00 +0200 Subject: [PATCH 01/15] Some clarifying comments --- .../scala/akka/cluster/sharding/ClusterShardingSettings.scala | 1 + .../src/main/scala/akka/cluster/sharding/Shard.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 3921dc7a52..a9f92b0c1d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -23,6 +23,7 @@ object ClusterShardingSettings { val StateStoreModeDData = "ddata" /** + * Only for testing * INTERNAL API */ @InternalApi diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 864365328c..c7a6066a44 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -247,6 +247,7 @@ private[akka] object Shard { private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]() // needed to look up entity by reg when a Passivating is received private val byRef = new util.HashMap[ActorRef, EntityId]() + // optimization to not have to go through all entities to find batched writes private val remembering = new util.HashSet[EntityId]() def alreadyRemembered(set: Set[EntityId]): Unit = { From 9c7f16a4dbfa29fb873b59be856e295dba91382a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 15:46:03 +0200 Subject: [PATCH 02/15] Re-use logic for allocating shard home for remembered entities --- .../src/main/scala/akka/cluster/sharding/ShardCoordinator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index c724cb2a93..eefea65243 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1422,7 +1422,7 @@ private[akka] class DDataShardCoordinator( if (shardIds.nonEmpty) { val newUnallocatedShards = state.unallocatedShards.union(shardIds.diff(state.shards.keySet)) state = state.copy(unallocatedShards = newUnallocatedShards) - newUnallocatedShards.foreach { self ! GetShardHome(_) } + allocateShardHomesForRememberEntities() } timers.cancel(RememberEntitiesTimeoutKey) From 33b34da36c8c3a9dacc48bc90f9f143f733729c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 15:52:46 +0200 Subject: [PATCH 03/15] Union and diff for sets are more performant --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 2 +- .../sharding/internal/DDataRememberEntitiesShardStore.scala | 2 +- .../internal/EventSourcedRememberEntitiesShardStore.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index c7a6066a44..5cf7d0e6ed 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -198,7 +198,7 @@ private[akka] object Shard { else newState } else { if (r.ackTo.isEmpty) this - else RememberingStart(ackTo ++ r.ackTo) + else RememberingStart(ackTo.union(r.ackTo)) } case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index e04bda991e..6766f8d5db 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -178,7 +178,7 @@ private[akka] final class DDataRememberEntitiesShardStore( } private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = { - val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped)) + val allEvts: Set[Evt] = (update.started.map(Started(_): Evt).union(update.stopped.map(Stopped))) // map from set of evts (for same ddata key) to one update that applies each of them val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] = allEvts.groupBy(evt => key(evt.id)).map { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala index 9eb108b5e6..c8bc240a80 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala @@ -80,7 +80,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( override def receiveRecover: Receive = { case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids)) - case EntitiesStopped(ids) => state = state.copy(state.entities -- ids) + case EntitiesStopped(ids) => state = state.copy(state.entities.diff(ids)) case SnapshotOffer(_, snapshot: State) => state = snapshot case RecoveryCompleted => log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size) @@ -97,7 +97,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( left -= 1 if (left == 0) { sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped) - state.copy(state.entities.union(started) -- stopped) + state.copy(state.entities.union(started).diff(stopped)) saveSnapshotWhenNeeded() } } From 55311cf9146ba4d9075422e1134e597a4fa87299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 15:56:56 +0200 Subject: [PATCH 04/15] RememberingStartNoAck -> RememberingStart.empty --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 5cf7d0e6ed..628cb831bf 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -177,9 +177,11 @@ private[akka] object Shard { } object RememberingStart { + val empty = new RememberingStart(Set.empty) + def apply(ackTo: Option[ActorRef]): RememberingStart = ackTo match { - case None => RememberingStartNoAck + case None => empty case Some(ackTo) => RememberingStart(Set(ackTo)) } } @@ -194,7 +196,7 @@ private[akka] object Shard { case active: Active => active case r: RememberingStart => if (ackTo.isEmpty) { - if (r.ackTo.isEmpty) RememberingStartNoAck + if (r.ackTo.isEmpty) RememberingStart.empty else newState } else { if (r.ackTo.isEmpty) this @@ -203,7 +205,6 @@ private[akka] object Shard { case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") } } - private val RememberingStartNoAck = new RememberingStart(Set.empty) /** * When remember entities is enabled an entity is in this state while From 1e811eebbc4d6e80b35856ca4da77afd056b2cd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 15:58:33 +0200 Subject: [PATCH 05/15] Avoid allocation on entity lookup --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 628cb831bf..b331202d9e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -246,7 +246,7 @@ private[akka] object Shard { final class Entities(log: LoggingAdapter, rememberingEntities: Boolean, verboseDebug: Boolean) { private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]() - // needed to look up entity by reg when a Passivating is received + // needed to look up entity by ref when a Passivating is received private val byRef = new util.HashMap[ActorRef, EntityId]() // optimization to not have to go through all entities to find batched writes private val remembering = new util.HashSet[EntityId]() @@ -302,9 +302,11 @@ private[akka] object Shard { case _ => OptionVal.None } - def entityState(id: EntityId): EntityState = { - OptionVal(entities.get(id)).getOrElse(NoState) - } + def entityState(id: EntityId): EntityState = + entities.get(id) match { + case null => NoState + case state => state + } def entityId(ref: ActorRef): OptionVal[EntityId] = OptionVal(byRef.get(ref)) From a31260e8fca6c91ac31c31b13400223b7eac1c20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:00:43 +0200 Subject: [PATCH 06/15] More private and @InternalApi sprinkling on top --- .../sharding/OldCoordinatorStateMigrationEventAdapter.scala | 2 +- .../src/main/scala/akka/cluster/sharding/Shard.scala | 2 +- .../sharding/internal/CustomStateStoreModeProvider.scala | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala index dd8e31d4bf..a3eb6b609b 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala @@ -16,7 +16,7 @@ import akka.persistence.journal.EventSeq * INTERNAL API */ @InternalApi -final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter { +private[akka] final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter { override def manifest(event: Any): String = "" diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index b331202d9e..7303b5c409 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -404,7 +404,7 @@ private[akka] class Shard( import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage - final val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging") + private val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging") private val rememberEntitiesStore: Option[ActorRef] = rememberEntitiesProvider.map { provider => diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala index 293ab548ef..8331fc6527 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala @@ -6,6 +6,7 @@ package akka.cluster.sharding.internal import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem import akka.actor.Props +import akka.annotation.InternalApi import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardRegion.ShardId import akka.event.Logging @@ -15,6 +16,7 @@ import akka.event.Logging * * Only intended for testing, not an extension point. */ +@InternalApi private[akka] final class CustomStateStoreModeProvider( typeName: String, system: ActorSystem, From 4a4a6065c163371838046111f58c96847807cbee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:02:07 +0200 Subject: [PATCH 07/15] Move MiMa excludes to correct version --- .../remember-entities-refactor.excludes | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename akka-cluster-sharding/src/main/mima-filters/{2.6.4.backwards.excludes => 2.6.6.backwards.excludes}/remember-entities-refactor.excludes (100%) diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.4.backwards.excludes/remember-entities-refactor.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.6.backwards.excludes/remember-entities-refactor.excludes similarity index 100% rename from akka-cluster-sharding/src/main/mima-filters/2.6.4.backwards.excludes/remember-entities-refactor.excludes rename to akka-cluster-sharding/src/main/mima-filters/2.6.6.backwards.excludes/remember-entities-refactor.excludes From c9e153a47f4371cbdb23c9354f9ed14421a3680a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:03:22 +0200 Subject: [PATCH 08/15] Tune test logs back to info --- .../src/multi-jvm/resources/logback-test.xml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml index 6819d3ed3d..51541ba6f6 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml +++ b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml @@ -8,10 +8,8 @@ %date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n - - - - + + From a0fa2843989deba948fba6a2bae1dbe4c7ea351f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:04:23 +0200 Subject: [PATCH 09/15] Deprecation version updated --- .../akka/cluster/sharding/ClusterShardingSettings.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index a9f92b0c1d..1065d7b39c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -309,9 +309,8 @@ final class ClusterShardingSettings( extends NoSerializationVerificationNeeded { @deprecated( - "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStoreinstead", - "2.6.6" // TODO update once merged - ) + "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStore instead", + "2.6.7") def this( role: Option[String], rememberEntities: Boolean, From 25a1daa0d291b22e61e00be02f6ca8ab55012da0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:05:36 +0200 Subject: [PATCH 10/15] Majority plus in docs --- akka-docs/src/main/paradox/typed/cluster-sharding.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index f9c8cce3d7..5a94ec7408 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -284,7 +284,7 @@ akka.cluster.sharding.state-store-mode = ddata ``` The state of the `ShardCoordinator` is replicated across the cluster but is not stored to disk. -@ref:[Distributed Data](distributed-data.md) handles the `ShardCoordinator`'s state with `WriteMajority`/`ReadMajority` consistency. +@ref:[Distributed Data](distributed-data.md) handles the `ShardCoordinator`'s state with `WriteMajorityPlus`/`ReadMajorityPlus` consistency. When all nodes in the cluster have been stopped, the state is no longer needed and dropped. Cluster Sharding uses its own Distributed Data `Replicator` per node. From c5aa91873278b6b0eca8fcb5694f0f3e504bba01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:07:50 +0200 Subject: [PATCH 11/15] Language fix and don't showcase leveldb --- akka-docs/src/main/paradox/typed/cluster-sharding.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 5a94ec7408..1cf6671945 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -337,7 +337,7 @@ When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto anothe node or recovers after a crash, it will recreate all the entities which were previously running in that `Shard`. -To permanently stop entities sent a `ClusterSharding.Passivate` to the +To permanently stop entities send a `ClusterSharding.Passivate` to the @scala[`ActorRef[ShardCommand]`]@java[`ActorRef`] that was passed in to the factory method when creating the entity. Otherwise, the entity will be automatically restarted after the entity restart backoff specified in the configuration. @@ -397,10 +397,10 @@ If using remembered entities there are two migration options: reads the data written by the old `persistence` mode. Your remembered entities will be remembered after a full cluster restart. For migrating existing remembered entities an event adapter needs to be configured in the config for the journal you use in your `application.conf`. -In this example `leveldb` is the used journal: +In this example `cassandra` is the used journal: ``` -akka.persistence.journal.leveldb { +akka.persistence.cassandra.journal { event-adapters { coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter" } From 5e222214b68fa72bc75665392fc5428f1275815d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:57:32 +0200 Subject: [PATCH 12/15] Debug log for stashing while awaiting lease --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 7303b5c409..c7765aa331 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -489,7 +489,9 @@ private[akka] class Shard( tryGetLease(lease.get) case ll: LeaseLost => receiveLeaseLost(ll) - case _ => + case msg => + if (verboseDebug) + log.debug("Got msg of type [{}] from [{}] while waiting for lease, stashing", msg.getClass, sender()) stash() } @@ -524,7 +526,10 @@ private[akka] class Shard( loadingEntityIdsFailed() case msg => if (verboseDebug) - log.debug("Got msg of type [{}] from [{}] while waiting for remember entitites", msg.getClass, sender()) + log.debug( + "Got msg of type [{}] from [{}] while waiting for remember entities, stashing", + msg.getClass, + sender()) stash() } From aa874d610f95ef8b801da8dc808b995eb2af6e79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 16:58:41 +0200 Subject: [PATCH 13/15] Require set of remembered entities to start to be nonEmpty --- .../akka/cluster/sharding/internal/RememberEntityStarter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala index f17749fffe..e66c95cde2 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntityStarter.scala @@ -54,6 +54,8 @@ private[akka] final class RememberEntityStarter( implicit val ec: ExecutionContext = context.dispatcher import RememberEntityStarter._ + require(ids.nonEmpty) + private var idsLeftToStart = Set.empty[EntityId] private var waitingForAck = Set.empty[EntityId] private var entitiesMoved = Set.empty[EntityId] From 0475c11a1b2fbdd54dc21ee6de2e57c892150a56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 17:00:51 +0200 Subject: [PATCH 14/15] Avoid loop when not flightrecording --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index c7765aa331..99391d9449 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -587,11 +587,13 @@ private[akka] class Shard( storingStarts.mkString(", "), storingStops.mkString(", ")) - storingStarts.foreach { entityId => - flightRecorder.rememberEntityAdd(entityId) - } - storingStops.foreach { id => - flightRecorder.rememberEntityRemove(id) + if (flightRecorder != NoOpShardingFlightRecorder) { + storingStarts.foreach { entityId => + flightRecorder.rememberEntityAdd(entityId) + } + storingStops.foreach { id => + flightRecorder.rememberEntityRemove(id) + } } val startTimeNanos = System.nanoTime() val update = RememberEntitiesShardStore.Update(started = storingStarts, stopped = storingStops) From 5e7b895c22021d194ccfd607526bc6cf8774827e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 17:01:32 +0200 Subject: [PATCH 15/15] s/crashing/restarting --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 99391d9449..b5bb7ea8d4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -623,7 +623,7 @@ private[akka] class Shard( onUpdateDone(storedStarts, storedStops) case RememberEntityTimeout(`update`) => - log.error("Remember entity store did not respond, crashing shard") + log.error("Remember entity store did not respond, restarting shard") throw new RuntimeException( s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))