diff --git a/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala b/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala index f616866cdf..59e0c40c7e 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala @@ -112,7 +112,7 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] { * called, not only the first one matching. */ final def onTransition(transitionHandler: UnitApply2[S, S]): Unit = - onTransition(transitionHandler(_: S, _: S)) + onTransition(transitionHandler(_: S, _: S)) /** * Set handler which is called upon reception of unhandled messages. Calling diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index abc86a01aa..62832ed412 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -801,8 +801,6 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, node.subscribe(self, ClusterShuttingDown.getClass) - var afterUpdateCallback: DomainEvent ⇒ Unit = _ - // get state from ddata replicator, repeat until GetSuccess getState() @@ -812,8 +810,9 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, def waitingForState: Receive = ({ case g @ GetSuccess(CoordinatorStateKey, _) ⇒ state = g.get(CoordinatorStateKey).value - watchStateActors() context.become(waitingForStateInitialized) + // note that watchStateActors may call update + watchStateActors() case GetFailure(CoordinatorStateKey, _) ⇒ log.error( @@ -840,10 +839,12 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, } // this state will stash all messages until it receives UpdateSuccess - def waitingForUpdate[E <: DomainEvent](evt: E): Receive = { + def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: DomainEvent ⇒ Unit): Receive = { case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒ log.debug("The coordinator state was successfully updated with {}", evt) - updateSuccess(evt) + context.unbecome() + afterUpdateCallback(evt) + unstashAll() case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒ log.error( @@ -870,18 +871,10 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, } def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { - afterUpdateCallback = f.asInstanceOf[DomainEvent ⇒ Unit] - context.become(waitingForUpdate(evt)) + context.become(waitingForUpdate(evt, f.asInstanceOf[DomainEvent ⇒ Unit]), discardOld = false) sendUpdate(evt) } - def updateSuccess(evt: DomainEvent): Unit = { - afterUpdateCallback(evt) - afterUpdateCallback = null - context.become(active) - unstashAll() - } - def getState(): Unit = replicator ! Get(CoordinatorStateKey, ReadMajority(waitingForStateTimeout)) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index f6b9164445..52078bd474 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -9,9 +9,9 @@ import akka.actor._ import akka.cluster.Cluster import akka.cluster.sharding.ShardRegion.GracefulShutdown import akka.persistence.Persistence -import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore} +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.remote.testconductor.RoleName -import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec} +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.testkit._ import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils diff --git a/project/MiMa.scala b/project/MiMa.scala index f874b17ec8..a552a6a433 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -574,7 +574,10 @@ object MiMa extends AutoPlugin { //#18353 Changes to methods and fields private to remoting actors ProblemFilters.exclude[MissingMethodProblem]("akka.remote.EndpointManager.retryGateEnabled"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable"), + + // #18722 internal changes to actor + FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator") ) ) }