From 81caeb4fc61054bc42d6885de35d87f03a03e2e2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Aug 2015 18:04:34 +0200 Subject: [PATCH] =act #15040 ScatterGatherFirstCompleted reply when no routees --- .../ScatterGatherFirstCompletedSpec.scala | 14 ++++++++ .../routing/ScatterGatherFirstCompleted.scala | 36 +++++++++++-------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ScatterGatherFirstCompletedSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ScatterGatherFirstCompletedSpec.scala index e42214218c..10bcb69d1c 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ScatterGatherFirstCompletedSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ScatterGatherFirstCompletedSpec.scala @@ -10,6 +10,9 @@ import akka.actor.{ Props, Actor } import akka.pattern.ask import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec } import akka.actor.ActorSystem +import akka.actor.Status +import java.util.concurrent.TimeoutException +import akka.testkit.TestProbe object ScatterGatherFirstCompletedSpec { class TestActor extends Actor { @@ -86,4 +89,15 @@ class ScatterGatherFirstCompletedSpec extends AkkaSpec with DefaultTimeout with } + "Scatter-gather pool" must { + + "without routees should reply immediately" in { + val probe = TestProbe() + val router = system.actorOf(ScatterGatherFirstCompletedPool(nrOfInstances = 0, within = 5.seconds).props(Props.empty)) + router.tell("hello", probe.ref) + probe.expectMsgType[Status.Failure](2.seconds).cause.getClass should be(classOf[TimeoutException]) + } + + } + } diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index 8ed0d77a58..20b2f1a6f0 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -21,6 +21,8 @@ import akka.util.Timeout import akka.util.Helpers.ConfigOps import java.util.concurrent.TimeUnit import akka.actor.ActorSystem +import scala.concurrent.Future +import java.util.concurrent.TimeoutException /** * Broadcasts the message to all routees, and replies with the first response. @@ -31,8 +33,7 @@ import akka.actor.ActorSystem @SerialVersionUID(1L) final case class ScatterGatherFirstCompletedRoutingLogic(within: FiniteDuration) extends RoutingLogic { override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = - if (routees.isEmpty) NoRoutee - else ScatterGatherFirstCompletedRoutees(routees, within) + ScatterGatherFirstCompletedRoutees(routees, within) } /** @@ -42,20 +43,25 @@ final case class ScatterGatherFirstCompletedRoutingLogic(within: FiniteDuration) private[akka] final case class ScatterGatherFirstCompletedRoutees( routees: immutable.IndexedSeq[Routee], within: FiniteDuration) extends Routee { - override def send(message: Any, sender: ActorRef): Unit = { - implicit val ec = ExecutionContexts.sameThreadExecutionContext - implicit val timeout = Timeout(within) - val promise = Promise[Any]() - routees.foreach { - case ActorRefRoutee(ref) ⇒ - promise.tryCompleteWith(ref.ask(message)) - case ActorSelectionRoutee(sel) ⇒ - promise.tryCompleteWith(sel.ask(message)) - case _ ⇒ - } + override def send(message: Any, sender: ActorRef): Unit = + if (routees.isEmpty) { + implicit val ec = ExecutionContexts.sameThreadExecutionContext + val reply = Future.failed(new TimeoutException("Timeout due to no routees")) + reply.pipeTo(sender) + } else { + implicit val ec = ExecutionContexts.sameThreadExecutionContext + implicit val timeout = Timeout(within) + val promise = Promise[Any]() + routees.foreach { + case ActorRefRoutee(ref) ⇒ + promise.tryCompleteWith(ref.ask(message)) + case ActorSelectionRoutee(sel) ⇒ + promise.tryCompleteWith(sel.ask(message)) + case _ ⇒ + } - promise.future.pipeTo(sender) - } + promise.future.pipeTo(sender) + } } /**