!con #15496 Remember entries in cluster sharding

- Move all entry related logic out of the ShardRegion and into a
  new dedicated child `Shard` actor.
- Shard actor persists entry started and passivated messages.
- Non passivated entries get restarted on termination.
- Shard Coordinator restarts shards on other regions upon region failure or handoff
- Ensures shard rebalance restarts shards.
- Shard buffers messages after an EntryStarted is received until state persisted
- Shard buffers messages (still) after a Passivate is received until state persisted
- Shard will retry persisting state until success
- Shard will restart entries automatically (after a backoff) if not passivated and remembering entries
- Added Entry path change to the migration docs
This commit is contained in:
Dominic Black 2014-07-08 17:51:18 +01:00
parent 4ce7766164
commit af657880e2
7 changed files with 909 additions and 163 deletions

View file

@ -3,6 +3,9 @@
*/
package akka.contrib.pattern
import akka.contrib.pattern.ShardCoordinator.Internal.{ ShardStopped, HandOff }
import akka.contrib.pattern.ShardRegion.Passivate
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
@ -51,12 +54,14 @@ object ClusterShardingSpec extends MultiNodeConfig {
role = backend
retry-interval = 1 s
handoff-timeout = 10 s
shard-start-timeout = 5s
entry-restart-backoff = 1s
rebalance-interval = 2 s
least-shard-allocation-strategy {
rebalance-threshold = 2
max-simultaneous-rebalance = 1
}
}
}
"""))
nodeConfig(sixth) {
@ -77,9 +82,9 @@ object ClusterShardingSpec extends MultiNodeConfig {
context.setReceiveTimeout(120.seconds)
// self.path.parent.name is the type name (utf-8 URL-encoded)
// self.path.parent.parent.name is the type name (utf-8 URL-encoded)
// self.path.name is the entry identifier (utf-8 URL-encoded)
override def persistenceId: String = self.path.parent.name + "-" + self.path.name
override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.name
var count = 0
//#counter-actor
@ -162,7 +167,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
def createCoordinator(): Unit = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds,
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds,
snapshotInterval = 3600.seconds, allocationStrategy)
system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps),
@ -173,15 +178,27 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
lazy val region = system.actorOf(ShardRegion.props(
typeName = "counter",
entryProps = Props[Counter],
role = None,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
shardFailureBackoff = 1.second,
entryRestartBackoff = 1.second,
snapshotInterval = 1.hour,
bufferSize = 1000,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver),
name = "counterRegion")
lazy val persistentRegion = ClusterSharding(system).start(
typeName = "PersistentCounter",
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
"Cluster sharding" must {
"setup shared journal" in {
@ -239,22 +256,22 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
region ! EntryEnvelope(2, Increment)
region ! Get(2)
expectMsg(3)
lastSender.path should be(node(second) / "user" / "counterRegion" / "2")
lastSender.path should be(node(second) / "user" / "counterRegion" / "2" / "2")
region ! Get(11)
expectMsg(1)
// local on first
lastSender.path should be(region.path / "11")
lastSender.path should be(region.path / "11" / "11")
region ! Get(12)
expectMsg(1)
lastSender.path should be(node(second) / "user" / "counterRegion" / "12")
lastSender.path should be(node(second) / "user" / "counterRegion" / "0" / "12")
}
enterBarrier("first-update")
runOn(second) {
region ! Get(2)
expectMsg(3)
lastSender.path should be(region.path / "2")
lastSender.path should be(region.path / "2" / "2")
}
enterBarrier("after-3")
@ -291,7 +308,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
within(1.second) {
region.tell(Get(2), probe1.ref)
probe1.expectMsg(4)
probe1.lastSender.path should be(region.path / "2")
probe1.lastSender.path should be(region.path / "2" / "2")
}
}
val probe2 = TestProbe()
@ -299,7 +316,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
within(1.second) {
region.tell(Get(12), probe2.ref)
probe2.expectMsg(1)
probe2.lastSender.path should be(region.path / "12")
probe2.lastSender.path should be(region.path / "0" / "12")
}
}
}
@ -331,25 +348,25 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
region ! EntryEnvelope(3, Increment)
region ! Get(3)
expectMsg(11)
lastSender.path should be(node(third) / "user" / "counterRegion" / "3")
lastSender.path should be(node(third) / "user" / "counterRegion" / "3" / "3")
region ! EntryEnvelope(4, Increment)
region ! Get(4)
expectMsg(21)
lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4")
lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4" / "4")
}
enterBarrier("first-update")
runOn(third) {
region ! Get(3)
expectMsg(11)
lastSender.path should be(region.path / "3")
lastSender.path should be(region.path / "3" / "3")
}
runOn(fourth) {
region ! Get(4)
expectMsg(21)
lastSender.path should be(region.path / "4")
lastSender.path should be(region.path / "4" / "4")
}
enterBarrier("after-6")
@ -369,7 +386,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
within(1.second) {
region.tell(Get(3), probe3.ref)
probe3.expectMsg(11)
probe3.lastSender.path should be(node(third) / "user" / "counterRegion" / "3")
probe3.lastSender.path should be(node(third) / "user" / "counterRegion" / "3" / "3")
}
}
val probe4 = TestProbe()
@ -377,7 +394,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
within(1.second) {
region.tell(Get(4), probe4.ref)
probe4.expectMsg(21)
probe4.lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4")
probe4.lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4" / "4")
}
}
@ -407,7 +424,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
}
// add more shards, which should later trigger rebalance to new node sixth
// add more shards, which should later trigger rebalance to new node sixth
for (n 5 to 10)
region ! EntryEnvelope(n, Increment)
@ -428,7 +445,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
for (n 1 to 10) {
region.tell(Get(n), probe.ref)
probe.expectMsgType[Int]
if (probe.lastSender.path == region.path / n.toString)
if (probe.lastSender.path == region.path / (n % 12).toString / n.toString)
count += 1
}
count should be(2)
@ -444,6 +461,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"support proxy only mode" in within(10.seconds) {
runOn(sixth) {
val proxy = system.actorOf(ShardRegion.proxyProps(
typeName = "counter",
role = None,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
@ -466,12 +484,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = "Counter",
entryProps = Some(Props[Counter]),
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
//#counter-start
//#counter-start
ClusterSharding(system).start(
typeName = "AnotherCounter",
entryProps = Some(Props[Counter]),
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
}
@ -512,6 +532,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
typeName = "ApiTest",
entryProps = Some(Props[Counter]),
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
@ -522,5 +543,263 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("after-10")
}
"Persistent Cluster Shards" must {
"recover entries upon restart" in within(50.seconds) {
runOn(third, fourth, fifth) {
ClusterSharding(system).start(
typeName = "PersistentCounterEntries",
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
ClusterSharding(system).start(
typeName = "AnotherPersistentCounter",
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
}
enterBarrier("persistent-started")
runOn(third) {
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("PersistentCounterEntries")
//Create an increment counter 1
counterRegion ! EntryEnvelope(1, Increment)
counterRegion ! Get(1)
expectMsg(1)
//Shut down the shard and confirm it's dead
val shard = system.actorSelection(lastSender.path.parent)
val region = system.actorSelection(lastSender.path.parent.parent)
//Stop the shard cleanly
region ! HandOff("1")
expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1"))
awaitAssert({
shard ! Identify(1)
expectMsg(1 second, "Shard was still around", ActorIdentity(1, None))
}, 5 seconds, 500 millis)
//Get the path to where the shard now resides
counterRegion ! Get(13)
expectMsg(0)
//Check that counter 1 is now alive again, even though we have
// not sent a message to it via the ShardRegion
val counter1 = system.actorSelection(lastSender.path.parent / "1")
counter1 ! Identify(2)
receiveOne(1 second) match {
case ActorIdentity(2, location)
location should not be (None)
}
counter1 ! Get(1)
expectMsg(1)
}
enterBarrier("after-shard-restart")
runOn(fourth) {
//Check a second region does not share the same persistent shards
val anotherRegion: ActorRef = ClusterSharding(system).shardRegion("AnotherPersistentCounter")
//Create a separate 13 counter
anotherRegion ! EntryEnvelope(13, Increment)
anotherRegion ! Get(13)
expectMsg(1)
//Check that no counter "1" exists in this shard
val secondCounter1 = system.actorSelection(lastSender.path.parent / "1")
secondCounter1 ! Identify(3)
receiveOne(1 second) match {
case ActorIdentity(3, location)
location should be(None)
}
}
enterBarrier("after-11")
}
"permanently stop entries which passivate" in within(50.seconds) {
runOn(third, fourth, fifth) {
persistentRegion
}
enterBarrier("cluster-started-12")
runOn(third) {
//Create and increment counter 1
persistentRegion ! EntryEnvelope(1, Increment)
persistentRegion ! Get(1)
expectMsg(1)
val counter1 = lastSender
val shard = system.actorSelection(counter1.path.parent)
val region = system.actorSelection(counter1.path.parent.parent)
//Create and increment counter 13
persistentRegion ! EntryEnvelope(13, Increment)
persistentRegion ! Get(13)
expectMsg(1)
val counter13 = lastSender
counter1.path.parent should be(counter13.path.parent)
//Send the shard the passivate message from the counter
shard.tell(Passivate(Stop), counter1)
awaitAssert({
//Check counter 1 is dead
counter1 ! Identify(1)
expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None))
}, 5 second, 500 millis)
//Stop the shard cleanly
region ! HandOff("1")
expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1"))
awaitAssert({
shard ! Identify(2)
expectMsg(1 second, "Shard was still around", ActorIdentity(2, None))
}, 5 seconds, 500 millis)
}
enterBarrier("shard-shutdown-12")
runOn(fourth) {
//Force the shard back up
persistentRegion ! Get(25)
expectMsg(0)
val shard = lastSender.path.parent
//Check counter 1 is still dead
system.actorSelection(shard / "1") ! Identify(3)
receiveOne(1 second) should be(ActorIdentity(3, None))
//Check counter 13 is alive again 8
system.actorSelection(shard / "13") ! Identify(4)
receiveOne(1 second) match {
case ActorIdentity(4, location)
location should not be (None)
}
}
enterBarrier("after-12")
}
"restart entries which stop without passivating" in within(50.seconds) {
runOn(third, fourth) {
persistentRegion
}
enterBarrier("cluster-started-12")
runOn(third) {
//Create and increment counter 1
persistentRegion ! EntryEnvelope(1, Increment)
persistentRegion ! Get(1)
expectMsg(2)
val counter1 = system.actorSelection(lastSender.path)
counter1 ! Stop
awaitAssert({
counter1 ! Identify(1)
receiveOne(1 second) match {
case ActorIdentity(1, location)
location should not be (None)
}
}, 5.seconds, 500.millis)
}
enterBarrier("after-13")
}
"be migrated to new regions upon region failure" in within(50.seconds) {
lazy val migrationRegion: ActorRef = ClusterSharding(system).start(
typeName = "AutoMigrateRegionTest",
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
//Start only one region, and force an entry onto that region
runOn(third) {
migrationRegion ! EntryEnvelope(1, Increment)
}
enterBarrier("shard1-region3")
//Start another region and test it talks to node 3
runOn(fourth) {
migrationRegion ! EntryEnvelope(1, Increment)
migrationRegion ! Get(1)
expectMsg(2)
lastSender.path should be(node(third) / "user" / "sharding" / "AutoMigrateRegionTest" / "1" / "1")
//Kill region 3
system.actorSelection(lastSender.path.parent.parent) ! PoisonPill
}
enterBarrier("region4-up")
//Wait for migration to happen
Thread sleep 2500
//Test the shard, thus counter was moved onto node 4 and started.
runOn(fourth) {
val counter1 = system.actorSelection(system / "sharding" / "AutoMigrateRegionTest" / "1" / "1")
counter1 ! Identify(1)
receiveOne(1 second) match {
case ActorIdentity(1, location)
location should not be (None)
}
counter1 ! Get(1)
expectMsg(2)
}
enterBarrier("after-14")
}
"ensure rebalance restarts shards" in within(50.seconds) {
runOn(fourth) {
for (i 2 to 12) {
persistentRegion ! EntryEnvelope(i, Increment)
}
for (i 2 to 12) {
persistentRegion ! Get(i)
expectMsg(1)
}
}
enterBarrier("entries-started")
runOn(fifth) {
persistentRegion
}
enterBarrier("fifth-joined-shard")
runOn(fifth) {
var count = 0
for (n 2 to 12) {
var entry = system.actorSelection(system / "sharding" / "PersistentCounter" / (n % 12).toString / n.toString)
entry ! Identify(n)
receiveOne(1 second) match {
case ActorIdentity(id, Some(_)) if id == n count = count + 1
case ActorIdentity(id, None) //Not on the fifth shard
}
}
assert(count >= 3, s"Not enough entries migrated, only ${count}")
}
enterBarrier("after-15")
}
}
}