diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index 462b1dd4a3..f95819385b 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -78,6 +78,9 @@ akka.contrib.cluster.sharding { # The extension creates a top level actor with this name in top level user scope, # e.g. '/user/sharding' guardian-name = sharding + # If the coordinator can't store state changes it will be stopped + # and started again after this duration. + coordinator-failure-backoff = 10 s # Start the coordinator singleton manager on members tagged with this role. # All members are used if undefined or empty. # ShardRegion actor is started in proxy only mode on nodes that are not tagged diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 2f2c1d9ac1..fcd17db043 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -180,6 +180,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains) val GuardianName: String = config.getString("guardian-name") + val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis val BufferSize: Int = config.getInt("buffer-size") val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis @@ -359,11 +360,12 @@ private[akka] class ClusterShardingGuardian extends Actor { case Start(typeName, entryProps, idExtractor, shardResolver, allocationStrategy) ⇒ val encName = URLEncoder.encode(typeName, "utf-8") val coordinatorSingletonManagerName = encName + "Coordinator" - val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton").toStringWithoutAddress + val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress val shardRegion = context.child(encName).getOrElse { if (HasNecessaryClusterRole && context.child(coordinatorSingletonManagerName).isEmpty) { - val singletonProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, rebalanceInterval = RebalanceInterval, - snapshotInterval = SnapshotInterval, allocationStrategy) + val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, + rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) + val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) context.actorOf(ClusterSingletonManager.props( singletonProps, singletonName = "singleton", @@ -846,6 +848,40 @@ class ShardRegion( } +/** + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +object ShardCoordinatorSupervisor { + /** + * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. + */ + def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props = + Props(classOf[ShardCoordinatorSupervisor], failureBackoff, coordinatorProps) + + /** + * INTERNAL API + */ + private case object StartCoordinator +} + +class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor { + import ShardCoordinatorSupervisor._ + + def startCoordinator(): Unit = { + // it will be stopped in case of PersistenceFailure + context.watch(context.actorOf(coordinatorProps, "coordinator")) + } + + override def preStart(): Unit = startCoordinator() + + def receive = { + case Terminated(_) ⇒ + import context.dispatcher + context.system.scheduler.scheduleOnce(failureBackoff, self, StartCoordinator) + case StartCoordinator ⇒ startCoordinator() + } +} + /** * @see [[ClusterSharding$ ClusterSharding extension]] */ @@ -941,7 +977,10 @@ object ShardCoordinator { * i.e. new members in the cluster. There is a configurable threshold of how large the difference * must be to begin the rebalancing. The number of ongoing rebalancing processes can be limited. */ - class LeastShardAllocationStrategy(rebalanceThreshold: Int, maxSimultaneousRebalance: Int) extends ShardAllocationStrategy { + @SerialVersionUID(1L) + class LeastShardAllocationStrategy(rebalanceThreshold: Int, maxSimultaneousRebalance: Int) + extends ShardAllocationStrategy with Serializable { + override def allocateShard(requester: ActorRef, shardId: ShardId, currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): ActorRef = { val (regionWithLeastShards, _) = currentShardAllocations.minBy { case (_, v) ⇒ v.size } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala new file mode 100644 index 0000000000..ab6e95e567 --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.contrib.pattern + +import java.io.File +import scala.concurrent.duration._ +import org.apache.commons.io.FileUtils +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.Identify +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ + +object ClusterShardingFailureSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.roles = ["backend"] + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingFailureSpec" + } + } + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec" + akka.contrib.cluster.sharding.coordinator-failure-backoff = 3s + """)) + + testTransport(on = true) + + case class Get(id: String) + case class Add(id: String, i: Int) + case class Value(id: String, n: Int) + + class Entity extends Actor { + var n = 0 + + def receive = { + case Get(id) ⇒ sender() ! Value(id, n) + case Add(id, i) ⇒ n += i + } + } + + val idExtractor: ShardRegion.IdExtractor = { + case m @ Get(id) ⇒ (id, m) + case m @ Add(id, _) ⇒ (id, m) + } + + val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + case Get(id) ⇒ id.charAt(0).toString + case Add(id, _) ⇒ id.charAt(0).toString + } + +} + +class ClusterShardingFailureMultiJvmNode1 extends ClusterShardingFailureSpec +class ClusterShardingFailureMultiJvmNode2 extends ClusterShardingFailureSpec +class ClusterShardingFailureMultiJvmNode3 extends ClusterShardingFailureSpec + +class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingFailureSpec._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(controller) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(controller) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + startSharding() + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entryProps = Some(Props[Entity]), + idExtractor = idExtractor, + shardResolver = shardResolver) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + "Cluster sharding with flaky journal" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(controller) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + runOn(first, second) { + system.actorSelection(node(controller) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity].ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") + } + + "join cluster" in within(20.seconds) { + join(first, first) + join(second, first) + + runOn(first) { + region ! Add("10", 1) + region ! Add("20", 2) + region ! Get("10") + expectMsg(Value("10", 1)) + region ! Get("20") + expectMsg(Value("20", 2)) + } + + enterBarrier("after-2") + } + + "recover after journal failure" in within(20.seconds) { + runOn(controller) { + testConductor.blackhole(controller, first, Direction.Both).await + testConductor.blackhole(controller, second, Direction.Both).await + } + enterBarrier("journal-blackholed") + + runOn(first) { + region ! Add("30", 3) + region ! Get("30") + expectMsg(Value("30", 3)) + } + + runOn(controller) { + testConductor.passThrough(controller, first, Direction.Both).await + testConductor.passThrough(controller, second, Direction.Both).await + } + enterBarrier("journal-ok") + + runOn(second) { + region ! Add("10", 1) + region ! Add("20", 2) + region ! Add("30", 3) + region ! Get("10") + expectMsg(Value("10", 2)) + region ! Get("20") + expectMsg(Value("20", 4)) + region ! Get("30") + expectMsg(Value("30", 6)) + } + + enterBarrier("after-3") + + } + + } +} + diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 738f3de042..ddbf424804 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -45,9 +45,9 @@ object ClusterShardingSpec extends MultiNodeConfig { akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared.store { native = off - dir = "target/shared-journal" + dir = "target/journal-ClusterShardingSpec" } - akka.persistence.snapshot-store.local.dir = "target/snapshots" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec" akka.contrib.cluster.sharding { role = backend retry-interval = 1 s @@ -159,9 +159,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult def createCoordinator(): Unit = { val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) + val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds, + snapshotInterval = 3600.seconds, allocationStrategy) system.actorOf(ClusterSingletonManager.props( - singletonProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds, - snapshotInterval = 3600.seconds, allocationStrategy), + singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps), singletonName = "singleton", terminationMessage = PoisonPill, role = None), @@ -171,7 +172,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult lazy val region = system.actorOf(ShardRegion.props( entryProps = Props[Counter], role = None, - coordinatorPath = "/user/counterCoordinator/singleton", + coordinatorPath = "/user/counterCoordinator/singleton/coordinator", retryInterval = 1.second, bufferSize = 1000, idExtractor = idExtractor, @@ -441,7 +442,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(sixth) { val proxy = system.actorOf(ShardRegion.proxyProps( role = None, - coordinatorPath = "/user/counterCoordinator/singleton", + coordinatorPath = "/user/counterCoordinator/singleton/coordinator", retryInterval = 1.second, bufferSize = 1000, idExtractor = idExtractor, diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 6f07b4386f..2f0cb3326a 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -84,6 +84,9 @@ akka { # Dispatcher for the plugin actor. plugin-dispatcher = "akka.actor.default-dispatcher" + + # timeout for async journal operations + timeout = 10s store { diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 5c1e9309c5..eb41391f0d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -91,7 +91,8 @@ trait Processor extends Actor with Recovery { else { val errorMsg = "Processor killed after persistence failure " + s"(processor id = [${processorId}], sequence nr = [${p.sequenceNr}], payload class = [${p.payload.getClass.getName}]). " + - "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages." + "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " + + "PersistenceFailure was caused by: " + cause throw new ActorKilledException(errorMsg) } case LoopMessageSuccess(m) ⇒ process(receive, m) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 7ae7f0a9ee..5756ea80a1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -10,6 +10,7 @@ import akka.actor._ import akka.persistence.Persistence import akka.persistence.journal._ import akka.util.Timeout +import akka.util.Helpers.ConfigOps /** * INTERNAL API. @@ -24,7 +25,8 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi * Journal backed by a [[SharedLeveldbStore]]. For testing only. */ private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy { - val timeout: Timeout = Timeout(10 seconds) // TODO: make configurable + val timeout: Timeout = context.system.settings.config.getMillisDuration( + "akka.persistence.journal.leveldb-shared.timeout") } object SharedLeveldbJournal {