diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 4347e01c3f..5e8a89734c 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -1098,21 +1098,29 @@ object ShardCoordinator { def updated(event: DomainEvent): State = event match { case ShardRegionRegistered(region) ⇒ + require(!regions.contains(region), s"Region $region already registered: $this") copy(regions = regions.updated(region, Vector.empty)) case ShardRegionProxyRegistered(proxy) ⇒ + require(!regionProxies.contains(proxy), s"Region proxy $proxy already registered: $this") copy(regionProxies = regionProxies + proxy) case ShardRegionTerminated(region) ⇒ + require(regions.contains(region), s"Terminated region $region not registered: $this") copy( regions = regions - region, shards = shards -- regions(region)) case ShardRegionProxyTerminated(proxy) ⇒ + require(regionProxies.contains(proxy), s"Terminated region proxy $proxy not registered: $this") copy(regionProxies = regionProxies - proxy) case ShardHomeAllocated(shard, region) ⇒ + require(regions.contains(region), s"Region $region not registered: $this") + require(!shards.contains(shard), s"Shard [$shard] already allocated: $this") copy( shards = shards.updated(shard, region), regions = regions.updated(region, regions(region) :+ shard)) case ShardHomeDeallocated(shard) ⇒ + require(shards.contains(shard), s"Shard [$shard] not allocated: $this") val region = shards(shard) + require(regions.contains(region), s"Region $region for shard [$shard] not registered: $this") copy( shards = shards - shard, regions = regions.updated(region, regions(region).filterNot(_ == shard))) @@ -1206,28 +1214,32 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite } override def receiveRecover: Receive = { - case evt: DomainEvent ⇒ evt match { - case ShardRegionRegistered(region) ⇒ - persistentState = persistentState.updated(evt) - case ShardRegionProxyRegistered(proxy) ⇒ - persistentState = persistentState.updated(evt) - case ShardRegionTerminated(region) ⇒ - if (persistentState.regions.contains(region)) + case evt: DomainEvent ⇒ + log.debug("receiveRecover {}", evt) + evt match { + case ShardRegionRegistered(region) ⇒ persistentState = persistentState.updated(evt) - else { - log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + - " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + - "removed by later watch.", region) - } - case ShardRegionProxyTerminated(proxy) ⇒ - persistentState = persistentState.updated(evt) - case _: ShardHomeAllocated ⇒ - persistentState = persistentState.updated(evt) - case _: ShardHomeDeallocated ⇒ - persistentState = persistentState.updated(evt) - } + case ShardRegionProxyRegistered(proxy) ⇒ + persistentState = persistentState.updated(evt) + case ShardRegionTerminated(region) ⇒ + if (persistentState.regions.contains(region)) + persistentState = persistentState.updated(evt) + else { + log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + + " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + + "removed by later watch.", region) + } + case ShardRegionProxyTerminated(proxy) ⇒ + if (persistentState.regionProxies.contains(proxy)) + persistentState = persistentState.updated(evt) + case ShardHomeAllocated(shard, region) ⇒ + persistentState = persistentState.updated(evt) + case _: ShardHomeDeallocated ⇒ + persistentState = persistentState.updated(evt) + } case SnapshotOffer(_, state: State) ⇒ + log.debug("receiveRecover SnapshotOffer {}", state) persistentState = state } @@ -1274,6 +1286,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case None ⇒ if (persistentState.regions.nonEmpty) { val region = allocationStrategy.allocateShard(sender(), shard, persistentState.regions) + require(persistentState.regions.contains(region), + s"Allocated region $region for shard [$shard] must be one of the registered regions: $persistentState") persist(ShardHomeAllocated(shard, region)) { evt ⇒ persistentState = persistentState.updated(evt) log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) @@ -1296,10 +1310,12 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case RebalanceDone(shard, ok) ⇒ rebalanceInProgress -= shard log.debug("Rebalance shard [{}] done [{}]", shard, ok) - if (ok) persist(ShardHomeDeallocated(shard)) { evt ⇒ - persistentState = persistentState.updated(evt) - log.debug("Shard [{}] deallocated", evt.shard) - } + // The shard could have been removed by ShardRegionTerminated + if (ok && persistentState.shards.contains(shard)) + persist(ShardHomeDeallocated(shard)) { evt ⇒ + persistentState = persistentState.updated(evt) + log.debug("Shard [{}] deallocated", evt.shard) + } case SnapshotTick ⇒ log.debug("Saving persistent snapshot") diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 918121653d..9eeaf02a3b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -369,6 +369,16 @@ private[persistence] trait Eventsourced extends ProcessorImpl { currentState.aroundReceive(receive, message) } + /** + * INTERNAL API. + */ + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { + // flushJournalBatch will send outstanding persistAsync and defer events to the journal + // and also prevent those to be unstashed in Processor.aroundPreRestart + flushJournalBatch() + super.aroundPreRestart(reason, message) + } + /** * Calls `super.preRestart` then unstashes all messages from the internal stash. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index bd9390a67c..53c1512713 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -145,8 +145,7 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { processorBatch.length >= extension.settings.journal.maxMessageBatchSize def journalBatch(): Unit = { - journal ! WriteMessages(processorBatch, self, instanceId) - processorBatch = Vector.empty + flushJournalBatch() batching = true } } @@ -257,6 +256,14 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent) } + /** + * INTERNAL API + */ + private[akka] def flushJournalBatch(): Unit = { + journal ! WriteMessages(processorBatch, self, instanceId) + processorBatch = Vector.empty + } + /** * INTERNAL API. */