Fix cluster sharding with remember entities test (#29773)
* fix cluster sharding with remember entities test * send BeginHandOff to all regions
This commit is contained in:
parent
762dfb5e8b
commit
028ad29164
1 changed files with 20 additions and 9 deletions
|
|
@ -6,13 +6,11 @@ package akka.cluster.sharding
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
|
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
|
import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
|
||||||
import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped }
|
import akka.cluster.sharding.ShardCoordinator.Internal.{ BeginHandOff, BeginHandOffAck, HandOff, ShardStopped }
|
||||||
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
|
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
|
||||||
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate }
|
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate }
|
||||||
import akka.cluster.sharding.internal.{ DDataRememberEntitiesProvider, EventSourcedRememberEntitiesProvider }
|
import akka.cluster.sharding.internal.{ DDataRememberEntitiesProvider, EventSourcedRememberEntitiesProvider }
|
||||||
|
|
@ -766,16 +764,28 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
|
||||||
}
|
}
|
||||||
enterBarrier("persistent-started")
|
enterBarrier("persistent-started")
|
||||||
|
|
||||||
|
// watch-out, these two var are only init on 3rd node
|
||||||
|
var shard: ActorSelection = null
|
||||||
|
var region: ActorSelection = null
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
//Create an increment counter 1
|
//Create an increment counter 1
|
||||||
persistentEntitiesRegion ! EntityEnvelope(1, Increment)
|
persistentEntitiesRegion ! EntityEnvelope(1, Increment)
|
||||||
persistentEntitiesRegion ! Get(1)
|
persistentEntitiesRegion ! Get(1)
|
||||||
expectMsg(1)
|
expectMsg(1)
|
||||||
|
|
||||||
//Shut down the shard and confirm it's dead
|
shard = system.actorSelection(lastSender.path.parent)
|
||||||
val shard = system.actorSelection(lastSender.path.parent)
|
region = system.actorSelection(lastSender.path.parent.parent)
|
||||||
val region = system.actorSelection(lastSender.path.parent.parent)
|
}
|
||||||
|
enterBarrier("counter-incremented")
|
||||||
|
|
||||||
|
// clean up shard cache everywhere
|
||||||
|
runOn(third, fourth, fifth) {
|
||||||
|
persistentEntitiesRegion ! BeginHandOff("1")
|
||||||
|
expectMsg(10 seconds, "ShardStopped not received", BeginHandOffAck("1"))
|
||||||
|
}
|
||||||
|
enterBarrier("everybody-hand-off-ack")
|
||||||
|
|
||||||
|
runOn(third) {
|
||||||
//Stop the shard cleanly
|
//Stop the shard cleanly
|
||||||
region ! HandOff("1")
|
region ! HandOff("1")
|
||||||
expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1"))
|
expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1"))
|
||||||
|
|
@ -787,8 +797,10 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
|
||||||
}, 5 seconds, 500 millis)
|
}, 5 seconds, 500 millis)
|
||||||
|
|
||||||
//Get the path to where the shard now resides
|
//Get the path to where the shard now resides
|
||||||
|
awaitAssert({
|
||||||
persistentEntitiesRegion ! Get(13)
|
persistentEntitiesRegion ! Get(13)
|
||||||
expectMsg(0)
|
expectMsg(0)
|
||||||
|
}, 5 seconds, 500 millis)
|
||||||
|
|
||||||
//Check that counter 1 is now alive again, even though we have
|
//Check that counter 1 is now alive again, even though we have
|
||||||
// not sent a message to it via the ShardRegion
|
// not sent a message to it via the ShardRegion
|
||||||
|
|
@ -804,7 +816,6 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
|
||||||
counter1 ! Get(1)
|
counter1 ! Get(1)
|
||||||
expectMsg(1)
|
expectMsg(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-shard-restart")
|
enterBarrier("after-shard-restart")
|
||||||
|
|
||||||
runOn(fourth) {
|
runOn(fourth) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue