Merge pull request #18260 from akka/wip-15040-ScatterGatherFirstCompleted-patriknw

=act #15040 ScatterGatherFirstCompleted reply when no routees
This commit is contained in:
Patrik Nordwall 2015-08-18 20:00:37 +02:00
commit 1732e62d0c
2 changed files with 35 additions and 15 deletions

View file

@ -10,6 +10,9 @@ import akka.actor.{ Props, Actor }
import akka.pattern.ask
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
import akka.actor.ActorSystem
import akka.actor.Status
import java.util.concurrent.TimeoutException
import akka.testkit.TestProbe
object ScatterGatherFirstCompletedSpec {
class TestActor extends Actor {
@ -86,4 +89,15 @@ class ScatterGatherFirstCompletedSpec extends AkkaSpec with DefaultTimeout with
}
"Scatter-gather pool" must {
"without routees should reply immediately" in {
val probe = TestProbe()
val router = system.actorOf(ScatterGatherFirstCompletedPool(nrOfInstances = 0, within = 5.seconds).props(Props.empty))
router.tell("hello", probe.ref)
probe.expectMsgType[Status.Failure](2.seconds).cause.getClass should be(classOf[TimeoutException])
}
}
}

View file

@ -21,6 +21,8 @@ import akka.util.Timeout
import akka.util.Helpers.ConfigOps
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import scala.concurrent.Future
import java.util.concurrent.TimeoutException
/**
* Broadcasts the message to all routees, and replies with the first response.
@ -31,8 +33,7 @@ import akka.actor.ActorSystem
@SerialVersionUID(1L)
final case class ScatterGatherFirstCompletedRoutingLogic(within: FiniteDuration) extends RoutingLogic {
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else ScatterGatherFirstCompletedRoutees(routees, within)
ScatterGatherFirstCompletedRoutees(routees, within)
}
/**
@ -42,20 +43,25 @@ final case class ScatterGatherFirstCompletedRoutingLogic(within: FiniteDuration)
private[akka] final case class ScatterGatherFirstCompletedRoutees(
routees: immutable.IndexedSeq[Routee], within: FiniteDuration) extends Routee {
override def send(message: Any, sender: ActorRef): Unit = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
implicit val timeout = Timeout(within)
val promise = Promise[Any]()
routees.foreach {
case ActorRefRoutee(ref)
promise.tryCompleteWith(ref.ask(message))
case ActorSelectionRoutee(sel)
promise.tryCompleteWith(sel.ask(message))
case _
}
override def send(message: Any, sender: ActorRef): Unit =
if (routees.isEmpty) {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
val reply = Future.failed(new TimeoutException("Timeout due to no routees"))
reply.pipeTo(sender)
} else {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
implicit val timeout = Timeout(within)
val promise = Promise[Any]()
routees.foreach {
case ActorRefRoutee(ref)
promise.tryCompleteWith(ref.ask(message))
case ActorSelectionRoutee(sel)
promise.tryCompleteWith(sel.ask(message))
case _
}
promise.future.pipeTo(sender)
}
promise.future.pipeTo(sender)
}
}
/**