Merge pull request #2097 from akka/wip-3937-sharding-persistent-failure-patriknw

+con #3937 Start ShardCoordinator again after PersistenceFailure
This commit is contained in:
Patrik Nordwall 2014-03-25 13:40:16 +01:00
commit 11df0e8ade
7 changed files with 253 additions and 12 deletions

View file

@ -78,6 +78,9 @@ akka.contrib.cluster.sharding {
# The extension creates a top level actor with this name in top level user scope,
# e.g. '/user/sharding'
guardian-name = sharding
# If the coordinator can't store state changes it will be stopped
# and started again after this duration.
coordinator-failure-backoff = 10 s
# Start the coordinator singleton manager on members tagged with this role.
# All members are used if undefined or empty.
# ShardRegion actor is started in proxy only mode on nodes that are not tagged

View file

@ -180,6 +180,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
}
val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains)
val GuardianName: String = config.getString("guardian-name")
val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis
val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis
val BufferSize: Int = config.getInt("buffer-size")
val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis
@ -359,11 +360,12 @@ private[akka] class ClusterShardingGuardian extends Actor {
case Start(typeName, entryProps, idExtractor, shardResolver, allocationStrategy)
val encName = URLEncoder.encode(typeName, "utf-8")
val coordinatorSingletonManagerName = encName + "Coordinator"
val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton").toStringWithoutAddress
val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress
val shardRegion = context.child(encName).getOrElse {
if (HasNecessaryClusterRole && context.child(coordinatorSingletonManagerName).isEmpty) {
val singletonProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, rebalanceInterval = RebalanceInterval,
snapshotInterval = SnapshotInterval, allocationStrategy)
val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout,
rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy)
val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps)
context.actorOf(ClusterSingletonManager.props(
singletonProps,
singletonName = "singleton",
@ -846,6 +848,40 @@ class ShardRegion(
}
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
object ShardCoordinatorSupervisor {
/**
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor.
*/
def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props =
Props(classOf[ShardCoordinatorSupervisor], failureBackoff, coordinatorProps)
/**
* INTERNAL API
*/
private case object StartCoordinator
}
class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor {
import ShardCoordinatorSupervisor._
def startCoordinator(): Unit = {
// it will be stopped in case of PersistenceFailure
context.watch(context.actorOf(coordinatorProps, "coordinator"))
}
override def preStart(): Unit = startCoordinator()
def receive = {
case Terminated(_)
import context.dispatcher
context.system.scheduler.scheduleOnce(failureBackoff, self, StartCoordinator)
case StartCoordinator startCoordinator()
}
}
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
@ -941,7 +977,10 @@ object ShardCoordinator {
* i.e. new members in the cluster. There is a configurable threshold of how large the difference
* must be to begin the rebalancing. The number of ongoing rebalancing processes can be limited.
*/
class LeastShardAllocationStrategy(rebalanceThreshold: Int, maxSimultaneousRebalance: Int) extends ShardAllocationStrategy {
@SerialVersionUID(1L)
class LeastShardAllocationStrategy(rebalanceThreshold: Int, maxSimultaneousRebalance: Int)
extends ShardAllocationStrategy with Serializable {
override def allocateShard(requester: ActorRef, shardId: ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef = {
val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) v.size }

View file

@ -0,0 +1,192 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import java.io.File
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
object ClusterShardingFailureSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingFailureSpec"
}
}
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
akka.contrib.cluster.sharding.coordinator-failure-backoff = 3s
"""))
testTransport(on = true)
case class Get(id: String)
case class Add(id: String, i: Int)
case class Value(id: String, n: Int)
class Entity extends Actor {
var n = 0
def receive = {
case Get(id) sender() ! Value(id, n)
case Add(id, i) n += i
}
}
val idExtractor: ShardRegion.IdExtractor = {
case m @ Get(id) (id, m)
case m @ Add(id, _) (id, m)
}
val shardResolver: ShardRegion.ShardResolver = msg msg match {
case Get(id) id.charAt(0).toString
case Add(id, _) id.charAt(0).toString
}
}
class ClusterShardingFailureMultiJvmNode1 extends ClusterShardingFailureSpec
class ClusterShardingFailureMultiJvmNode2 extends ClusterShardingFailureSpec
class ClusterShardingFailureMultiJvmNode3 extends ClusterShardingFailureSpec
class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingFailureSpec._
override def initialParticipants = roles.size
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
runOn(controller) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
override protected def afterTermination() {
runOn(controller) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
startSharding()
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
idExtractor = idExtractor,
shardResolver = shardResolver)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
"Cluster sharding with flaky journal" must {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(controller) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
runOn(first, second) {
system.actorSelection(node(controller) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity].ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
"join cluster" in within(20.seconds) {
join(first, first)
join(second, first)
runOn(first) {
region ! Add("10", 1)
region ! Add("20", 2)
region ! Get("10")
expectMsg(Value("10", 1))
region ! Get("20")
expectMsg(Value("20", 2))
}
enterBarrier("after-2")
}
"recover after journal failure" in within(20.seconds) {
runOn(controller) {
testConductor.blackhole(controller, first, Direction.Both).await
testConductor.blackhole(controller, second, Direction.Both).await
}
enterBarrier("journal-blackholed")
runOn(first) {
region ! Add("30", 3)
region ! Get("30")
expectMsg(Value("30", 3))
}
runOn(controller) {
testConductor.passThrough(controller, first, Direction.Both).await
testConductor.passThrough(controller, second, Direction.Both).await
}
enterBarrier("journal-ok")
runOn(second) {
region ! Add("10", 1)
region ! Add("20", 2)
region ! Add("30", 3)
region ! Get("10")
expectMsg(Value("10", 2))
region ! Get("20")
expectMsg(Value("20", 4))
region ! Get("30")
expectMsg(Value("30", 6))
}
enterBarrier("after-3")
}
}
}

View file

@ -45,9 +45,9 @@ object ClusterShardingSpec extends MultiNodeConfig {
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.store {
native = off
dir = "target/shared-journal"
dir = "target/journal-ClusterShardingSpec"
}
akka.persistence.snapshot-store.local.dir = "target/snapshots"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
akka.contrib.cluster.sharding {
role = backend
retry-interval = 1 s
@ -159,9 +159,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
def createCoordinator(): Unit = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds,
snapshotInterval = 3600.seconds, allocationStrategy)
system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds,
snapshotInterval = 3600.seconds, allocationStrategy),
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = None),
@ -171,7 +172,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
lazy val region = system.actorOf(ShardRegion.props(
entryProps = Props[Counter],
role = None,
coordinatorPath = "/user/counterCoordinator/singleton",
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
bufferSize = 1000,
idExtractor = idExtractor,
@ -441,7 +442,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(sixth) {
val proxy = system.actorOf(ShardRegion.proxyProps(
role = None,
coordinatorPath = "/user/counterCoordinator/singleton",
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
bufferSize = 1000,
idExtractor = idExtractor,

View file

@ -84,6 +84,9 @@ akka {
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
# timeout for async journal operations
timeout = 10s
store {

View file

@ -91,7 +91,8 @@ trait Processor extends Actor with Recovery {
else {
val errorMsg = "Processor killed after persistence failure " +
s"(processor id = [${processorId}], sequence nr = [${p.sequenceNr}], payload class = [${p.payload.getClass.getName}]). " +
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages."
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " +
"PersistenceFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
}
case LoopMessageSuccess(m) process(receive, m)

View file

@ -10,6 +10,7 @@ import akka.actor._
import akka.persistence.Persistence
import akka.persistence.journal._
import akka.util.Timeout
import akka.util.Helpers.ConfigOps
/**
* INTERNAL API.
@ -24,7 +25,8 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi
* Journal backed by a [[SharedLeveldbStore]]. For testing only.
*/
private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy {
val timeout: Timeout = Timeout(10 seconds) // TODO: make configurable
val timeout: Timeout = context.system.settings.config.getMillisDuration(
"akka.persistence.journal.leveldb-shared.timeout")
}
object SharedLeveldbJournal {