diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsAggregator.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsAggregator.java index aba6b67674..81b5b74ce5 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsAggregator.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsAggregator.java @@ -25,7 +25,7 @@ public class StatsAggregator extends UntypedActor { @Override public void preStart() { - getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS)); + getContext().setReceiveTimeout(Duration.create(5, TimeUnit.SECONDS)); } @Override diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java index 25366c8064..89582882fa 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java @@ -1,17 +1,23 @@ package sample.cluster.stats.japi; +import scala.concurrent.Future; import sample.cluster.stats.japi.StatsMessages.JobFailed; import sample.cluster.stats.japi.StatsMessages.StatsJob; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Recover; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.LeaderChanged; import akka.cluster.ClusterEvent.MemberEvent; import akka.event.Logging; import akka.event.LoggingAdapter; +import akka.util.Timeout; +import static akka.pattern.Patterns.ask; +import static akka.pattern.Patterns.pipe; +import static java.util.concurrent.TimeUnit.SECONDS; //#facade public class StatsFacade extends UntypedActor { @@ -43,7 +49,13 @@ public class StatsFacade extends UntypedActor { } else if (message instanceof StatsJob) { StatsJob job = (StatsJob) message; - currentMaster.forward(job, getContext()); + Future f = ask(currentMaster, job, new Timeout(10, SECONDS)). + recover(new Recover() { + public Object recover(Throwable t) { + return new JobFailed("Service unavailable, try again later"); + } + }, getContext().dispatcher()); + pipe(f, getContext().dispatcher()).to(getSender()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 87a4026d2e..d35c475780 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -22,6 +22,9 @@ import akka.cluster.ClusterEvent.MemberUp import akka.cluster.MemberStatus import akka.routing.FromConfig import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.pattern.ask +import akka.pattern.pipe +import akka.util.Timeout //#imports //#messages @@ -51,7 +54,7 @@ class StatsService extends Actor { class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor { var results = IndexedSeq.empty[Int] - context.setReceiveTimeout(10 seconds) + context.setReceiveTimeout(5 seconds) def receive = { case wordCount: Int ⇒ @@ -88,6 +91,7 @@ class StatsWorker extends Actor { //#facade class StatsFacade extends Actor with ActorLogging { + import context.dispatcher val cluster = Cluster(context.system) var currentMaster: Option[ActorRef] = None @@ -102,7 +106,12 @@ class StatsFacade extends Actor with ActorLogging { case job: StatsJob if currentMaster.isEmpty ⇒ sender ! JobFailed("Service unavailable, try again later") case job: StatsJob ⇒ - currentMaster foreach { _ forward job } + implicit val timeout = Timeout(10.seconds) + currentMaster foreach { + _ ? job recover { + case _ ⇒ JobFailed("Service unavailable, try again later") + } pipeTo sender + } case state: CurrentClusterState ⇒ state.leader foreach updateCurrentMaster case LeaderChanged(Some(leaderAddress)) ⇒ diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index f3de9f8ab4..ad678d377f 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -84,11 +84,11 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing testConductor.enter("all-up") } - "show usage of the statsFacade" in within(5 seconds) { + "show usage of the statsFacade" in within(15 seconds) { val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") // eventually the service should be ok, - // worker nodes might not be up yet + // service and worker nodes might not be up yet awaitCond { facade ! StatsJob("this is the text that will be analyzed") expectMsgPF() { diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 07d1b3e84b..64c2e2994d 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -128,7 +128,7 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig) } meanWordLength must be(3.875 plusOrMinus 0.001) - testConductor.enter("done-2") + testConductor.enter("done-3") } diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala index c2ea084a18..0712305b4b 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala @@ -111,7 +111,7 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf } meanWordLength must be(3.875 plusOrMinus 0.001) - testConductor.enter("done-2") + testConductor.enter("done-3") } diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala index 06357f510f..7a5275a7f8 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala @@ -83,11 +83,11 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample testConductor.enter("all-up") } - "show usage of the statsFacade" in within(5 seconds) { + "show usage of the statsFacade" in within(15 seconds) { val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") // eventually the service should be ok, - // worker nodes might not be up yet + // service and worker nodes might not be up yet awaitCond { facade ! new StatsJob("this is the text that will be analyzed") expectMsgPF() {