=cls #18722 fix DDataShardCoordinator init
* the become logic was wrong when watchStateActors triggers an immediate state update
This commit is contained in:
parent
ef901becee
commit
27995af79f
4 changed files with 14 additions and 18 deletions
|
|
@ -112,7 +112,7 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
|
|||
* called, not only the first one matching.</b>
|
||||
*/
|
||||
final def onTransition(transitionHandler: UnitApply2[S, S]): Unit =
|
||||
onTransition(transitionHandler(_: S, _: S))
|
||||
onTransition(transitionHandler(_: S, _: S))
|
||||
|
||||
/**
|
||||
* Set handler which is called upon reception of unhandled messages. Calling
|
||||
|
|
|
|||
|
|
@ -801,8 +801,6 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
|
||||
node.subscribe(self, ClusterShuttingDown.getClass)
|
||||
|
||||
var afterUpdateCallback: DomainEvent ⇒ Unit = _
|
||||
|
||||
// get state from ddata replicator, repeat until GetSuccess
|
||||
getState()
|
||||
|
||||
|
|
@ -812,8 +810,9 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
def waitingForState: Receive = ({
|
||||
case g @ GetSuccess(CoordinatorStateKey, _) ⇒
|
||||
state = g.get(CoordinatorStateKey).value
|
||||
watchStateActors()
|
||||
context.become(waitingForStateInitialized)
|
||||
// note that watchStateActors may call update
|
||||
watchStateActors()
|
||||
|
||||
case GetFailure(CoordinatorStateKey, _) ⇒
|
||||
log.error(
|
||||
|
|
@ -840,10 +839,12 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
}
|
||||
|
||||
// this state will stash all messages until it receives UpdateSuccess
|
||||
def waitingForUpdate[E <: DomainEvent](evt: E): Receive = {
|
||||
def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: DomainEvent ⇒ Unit): Receive = {
|
||||
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||
log.debug("The coordinator state was successfully updated with {}", evt)
|
||||
updateSuccess(evt)
|
||||
context.unbecome()
|
||||
afterUpdateCallback(evt)
|
||||
unstashAll()
|
||||
|
||||
case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||
log.error(
|
||||
|
|
@ -870,18 +871,10 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
}
|
||||
|
||||
def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = {
|
||||
afterUpdateCallback = f.asInstanceOf[DomainEvent ⇒ Unit]
|
||||
context.become(waitingForUpdate(evt))
|
||||
context.become(waitingForUpdate(evt, f.asInstanceOf[DomainEvent ⇒ Unit]), discardOld = false)
|
||||
sendUpdate(evt)
|
||||
}
|
||||
|
||||
def updateSuccess(evt: DomainEvent): Unit = {
|
||||
afterUpdateCallback(evt)
|
||||
afterUpdateCallback = null
|
||||
context.become(active)
|
||||
unstashAll()
|
||||
}
|
||||
|
||||
def getState(): Unit =
|
||||
replicator ! Get(CoordinatorStateKey, ReadMajority(waitingForStateTimeout))
|
||||
|
||||
|
|
|
|||
|
|
@ -9,9 +9,9 @@ import akka.actor._
|
|||
import akka.cluster.Cluster
|
||||
import akka.cluster.sharding.ShardRegion.GracefulShutdown
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore}
|
||||
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec}
|
||||
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
||||
import akka.testkit._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
|
|
|||
|
|
@ -574,7 +574,10 @@ object MiMa extends AutoPlugin {
|
|||
|
||||
//#18353 Changes to methods and fields private to remoting actors
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.EndpointManager.retryGateEnabled"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable"),
|
||||
|
||||
// #18722 internal changes to actor
|
||||
FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue