diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 0548faa4f8..4ba228cd78 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -7,10 +7,10 @@ package akka.actor
import akka.AkkaException
import akka.event.LoggingAdapter
+import java.util.Optional
import scala.annotation.tailrec
import scala.beans.BeanProperty
import scala.util.control.NoStackTrace
-import java.util.Optional
import akka.annotation.InternalApi
diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
index a26e8b4ad7..33813935b2 100644
--- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
@@ -92,6 +92,7 @@ package akka.actor
* }
* }}}
*/
+@Deprecated
@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0")
abstract class UntypedActor extends Actor {
diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md
index c60b2eb569..454779935f 100644
--- a/akka-docs/src/main/paradox/stream/stages-overview.md
+++ b/akka-docs/src/main/paradox/stream/stages-overview.md
@@ -1135,7 +1135,40 @@ Adheres to the `ActorAttributes.SupervisionStrategy` attribute.
**completes** when upstream completes and all elements have been emitted from the internal flow
-**cancels** when downstream cancels
+**completes** when upstream completes and all futures have been completed and all elements have been emitted
+
+---------------------------------------------------------------
+
+### watch
+
+Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.
+The signaled failure will be an @java[@javadoc:[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)]
+@scala[@scaladoc[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)].
+
+**emits** when upstream emits
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+---------------------------------------------------------------
+
+### ask
+
+Specialized stage implementing the @scala[@extref[ask](github:akka-actor/src/main/scala/akka/pattern/AskSupport.scala)]@java[@extref[ask](github:akka-actor/src/main/scala/akka/pattern/Patterns.scala)] pattern for inter-op with untyped actors.
+
+The stream will be failed using an an @java[@javadoc:[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)]
+@scala[@scaladoc[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)] if the target actor terminates,
+or with an @java[@javadoc:[WatchedActorTerminatedException](akka.pattern.AskTimeoutException)] @scala[@scaladoc[WatchedActorTerminatedException](akka.pattern.AskTimeoutException)] if any of the asks times out.
+
+**emits** when the futures (in submission order) created by the ask pattern internally are completed
+
+**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures
+
+**fails** when the passed in actor terminates, or a timeout is exceeded in any of the asks performed
+
+**completes** when upstream completes and all futures have been completed and all elements have been emitted
+
---------------------------------------------------------------
@@ -1215,6 +1248,28 @@ If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unl
**completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
+---------------------------------------------------------------
+
+### ask
+
+Use the `ask` pattern to send a request-reply message to the target `ref` actor.
+If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
+
+The `mapTo` class parameter is used to cast the incoming responses to the expected response type.
+
+Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
+An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
+
+Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+
+**emits** when the ask @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence
+
+
+**backpressures** when the number of ask @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
+
+**completes** when upstream completes and all ask @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
+
+
---------------------------------------------------------------
diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md
index f4c81fb55c..d83ad1bd75 100644
--- a/akka-docs/src/main/paradox/stream/stream-integrations.md
+++ b/akka-docs/src/main/paradox/stream/stream-integrations.md
@@ -8,18 +8,20 @@ For piping the elements of a stream as messages to an ordinary actor you can use
Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is
materialized by `Source.actorRef`.
-### mapAsync + ask
+### ask
-A nice way to delegate some processing of elements in a stream to an actor is to
-use `ask` in `mapAsync`. The back-pressure of the stream is maintained by
-the @scala[`Future`]@java[`CompletionStage`] of the `ask` and the mailbox of the actor will not be filled with
-more messages than the given `parallelism` of the `mapAsync` stage.
+### ask
+
+A nice way to delegate some processing of elements in a stream to an actor is to use `ask`.
+The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of
+the `ask` and the mailbox of the actor will not be filled with more messages than the given
+`parallelism` of the `ask` stage (similarly to how the `mapAsync` stage works).
Scala
-: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #mapAsync-ask }
+: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #ask }
Java
-: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #mapAsync-ask }
+: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #ask }
Note that the messages received in the actor will be in the same order as
the stream elements, i.e. the `parallelism` does not change the ordering
@@ -29,8 +31,9 @@ is already a message in the mailbox when the actor has completed previous
message.
The actor must reply to the @scala[`sender()`]@java[`getSender()`] for each message from the stream. That
-reply will complete the @scala[`Future`]@java[`CompletionStage`] of the `ask` and it will be the element that
-is emitted downstreams from `mapAsync`.
+reply will complete the @scala[`Future`]@java[`CompletionStage`] of the `ask` and it will be the element that is emitted downstreams.
+
+In case the target actor is stopped, the stage will fail with an `AskStageTargetActorTerminatedException`
Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #ask-actor }
@@ -38,20 +41,21 @@ Scala
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #ask-actor }
-The stream can be completed with failure by sending `akka.actor.Status.Failure`
-as reply from the actor.
+The stream can be completed with failure by sending `akka.actor.Status.Failure` as reply from the actor.
If the `ask` fails due to timeout the stream will be completed with
`TimeoutException` failure. If that is not desired outcome you can use `recover`
-on the `ask` @scala[`Future`]@java[`CompletionStage`].
+on the `ask` @scala[`Future`]@java[`CompletionStage`], or use the other "restart" stages to restart it.
If you don't care about the reply values and only use them as back-pressure signals you
-can use `Sink.ignore` after the `mapAsync` stage and then actor is effectively a sink
+can use `Sink.ignore` after the `ask` stage and then actor is effectively a sink
of the stream.
-The same pattern can be used with @ref:[Actor routers](../routing.md). Then you
-can use `mapAsyncUnordered` for better efficiency if you don't care about the
-order of the emitted downstream elements (the replies).
+Note that while you may implement the same concept using `mapAsync`, that style would not be aware of the actor terminating.
+
+If you are intending to ask multiple actors by using @ref:[Actor routers](../routing.md), then
+you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important,
+since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the stage.
### Sink.actorRefWithAck
diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java
index 2b8ba591ee..8dd52ed5e4 100644
--- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java
+++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java
@@ -257,7 +257,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
public DatabaseService(ActorRef probe) {
this.probe = probe;
}
-
+
@Override
public Receive createReceive() {
return receiveBuilder()
@@ -272,11 +272,11 @@ public class IntegrationDocTest extends AbstractJavaTest {
//#sometimes-slow-service
static class SometimesSlowService {
private final Executor ec;
-
+
public SometimesSlowService(Executor ec) {
this.ec = ec;
}
-
+
private final AtomicInteger runningCount = new AtomicInteger();
public CompletionStage convert(String s) {
@@ -292,7 +292,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
}
}
//#sometimes-slow-service
-
+
//#ask-actor
static class Translator extends AbstractActor {
@Override
@@ -308,22 +308,21 @@ public class IntegrationDocTest extends AbstractJavaTest {
}
}
//#ask-actor
-
+
@SuppressWarnings("unchecked")
@Test
- public void mapAsyncPlusAsk() throws Exception {
- //#mapAsync-ask
+ public void askStage() throws Exception {
+ //#ask
Source words =
Source.from(Arrays.asList("hello", "hi"));
Timeout askTimeout = Timeout.apply(5, TimeUnit.SECONDS);
-
+
words
- .mapAsync(5, elem -> ask(ref, elem, askTimeout))
- .map(elem -> (String) elem)
+ .ask(5, ref, String.class, askTimeout)
// continue processing of the replies from the actor
.map(elem -> elem.toLowerCase())
.runWith(Sink.ignore(), mat);
- //#mapAsync-ask
+ //#ask
}
diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala
index 8b433a20a0..38f280df78 100644
--- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala
+++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala
@@ -140,19 +140,18 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val materializer = ActorMaterializer()
val ref: ActorRef = system.actorOf(Props[Translator])
- "mapAsync + ask" in {
- //#mapAsync-ask
- import akka.pattern.ask
+ "ask" in {
+ //#ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
- .mapAsync(parallelism = 5)(elem ⇒ (ref ? elem).mapTo[String])
+ .ask[String](parallelism = 5)(ref)
// continue processing of the replies from the actor
.map(_.toLowerCase)
.runWith(Sink.ignore)
- //#mapAsync-ask
+ //#ask
}
"calling external service with mapAsync" in {
diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala
index 0f8bc4518d..d614253232 100755
--- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala
@@ -42,13 +42,13 @@ class DslConsistencySpec extends WordSpec with Matchers {
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph")
val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass → graphHelpers,
- jSourceClass → graphHelpers,
+ jSourceClass → (graphHelpers ++ Set("watch", "ask")),
// Java subflows can only be nested using .via and .to (due to type system restrictions)
- jSubFlowClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow")),
- jSubSourceClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow")),
+ jSubFlowClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
+ jSubSourceClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
sFlowClass → Set("of"),
- sSourceClass → Set("adapt", "from"),
+ sSourceClass → Set("adapt", "from", "watch"),
sSinkClass → Set("adapt"),
sSubFlowClass → Set(),
sSubSourceClass → Set(),
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala
new file mode 100644
index 0000000000..184a20ee30
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala
@@ -0,0 +1,243 @@
+/**
+ * Copyright (C) 2014-2017 Lightbend Inc.
+ */
+package akka.stream.scaladsl
+
+import java.util.concurrent.{ CountDownLatch, LinkedBlockingQueue, ThreadLocalRandom }
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{ Actor, ActorRef, PoisonPill, Props }
+import akka.stream.ActorAttributes.{ SupervisionStrategy, supervisionStrategy }
+import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
+import akka.stream.Supervision.resumingDecider
+import akka.stream.impl.ReactiveStreamsCompliance
+import akka.stream.testkit.Utils._
+import akka.stream.testkit._
+import akka.stream.testkit.scaladsl.TestSink
+import akka.testkit.{ TestActors, TestLatch, TestProbe }
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import scala.annotation.tailrec
+import scala.concurrent.{ Await, Future, Promise }
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+import scala.util.control.NoStackTrace
+
+object FlowAskSpec {
+ case class Reply(payload: Int)
+
+ class Replier extends Actor {
+ override def receive: Receive = {
+ case msg: Int ⇒ sender() ! Reply(msg)
+ }
+ }
+
+ class ReplyAndProxy(to: ActorRef) extends Actor {
+ override def receive: Receive = {
+ case msg: Int ⇒
+ to ! msg
+ sender() ! Reply(msg)
+ }
+ }
+
+ class RandomDelaysReplier extends Actor {
+ override def receive: Receive = {
+ case msg: Int ⇒
+ import context.dispatcher
+
+ val replyTo = sender()
+ Future {
+ Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
+ replyTo ! Reply(msg)
+ }
+ }
+ }
+
+ class StatusReplier extends Actor {
+ override def receive: Receive = {
+ case msg: Int ⇒ sender() ! akka.actor.Status.Success(Reply(msg))
+ }
+ }
+
+ class FailOn(n: Int) extends Actor {
+ override def receive: Receive = {
+ case `n` ⇒ sender() ! akka.actor.Status.Failure(new Exception(s"Booming for $n!"))
+ case msg: Int ⇒ sender() ! akka.actor.Status.Success(Reply(msg))
+ }
+ }
+
+ class FailOnAllExcept(n: Int) extends Actor {
+ override def receive: Receive = {
+ case `n` ⇒ sender() ! akka.actor.Status.Success(Reply(n))
+ case msg: Int ⇒ sender() ! akka.actor.Status.Failure(new Exception(s"Booming for $n!"))
+ }
+ }
+
+}
+
+class FlowAskSpec extends StreamSpec {
+ import FlowAskSpec._
+
+ implicit val materializer = ActorMaterializer()
+
+ "A Flow with ask" must {
+
+ implicit val timeout = akka.util.Timeout(10.seconds)
+
+ val replyOnInts = system.actorOf(Props(classOf[Replier]).withDispatcher("akka.test.stream-dispatcher"), "replyOnInts")
+
+ val dontReply = system.actorOf(TestActors.blackholeProps.withDispatcher("akka.test.stream-dispatcher"), "dontReply")
+
+ val replyRandomDelays = system.actorOf(Props(classOf[RandomDelaysReplier]).withDispatcher("akka.test.stream-dispatcher"), "replyRandomDelays")
+
+ val statusReplier = system.actorOf(Props(new StatusReplier).withDispatcher("akka.test.stream-dispatcher"), "statusReplier")
+
+ def replierAndProxyTo(ref: ActorRef) = system.actorOf(Props(new ReplyAndProxy(ref)).withDispatcher("akka.test.stream-dispatcher"), s"reply-and-proxy-${ref.hashCode}")
+
+ def replierFailOn(n: Int) = system.actorOf(Props(new FailOn(n)).withDispatcher("akka.test.stream-dispatcher"), s"failureReplier-$n")
+ val failsOn1 = replierFailOn(1)
+ val failsOn3 = replierFailOn(3)
+
+ def replierFailAllExceptOn(n: Int) = system.actorOf(Props(new FailOnAllExcept(n)).withDispatcher("akka.test.stream-dispatcher"), s"failureReplier-$n")
+ val failAllExcept6 = replierFailAllExceptOn(6)
+
+ "produce asked elements" in assertAllStagesStopped {
+ val c = TestSubscriber.manualProbe[Reply]()
+ implicit val ec = system.dispatcher
+ val p = Source(1 to 3).ask[Reply](4)(replyOnInts).runWith(Sink.fromSubscriber(c))
+ val sub = c.expectSubscription()
+ sub.request(2)
+ c.expectNext(Reply(1))
+ c.expectNext(Reply(2))
+ c.expectNoMessage(200.millis)
+ sub.request(2)
+ c.expectNext(Reply(3))
+ c.expectComplete()
+ }
+ "produce asked elements, when replies are akka.actor.Status.Success" in assertAllStagesStopped {
+ val c = TestSubscriber.manualProbe[Reply]()
+ implicit val ec = system.dispatcher
+ val p = Source(1 to 3).ask[Reply](4)(statusReplier).runWith(Sink.fromSubscriber(c))
+ val sub = c.expectSubscription()
+ sub.request(2)
+ c.expectNext(Reply(1))
+ c.expectNext(Reply(2))
+ c.expectNoMessage(200.millis)
+ sub.request(2)
+ c.expectNext(Reply(3))
+ c.expectComplete()
+ }
+
+ "produce future elements in order" in {
+ val c = TestSubscriber.manualProbe[Reply]()
+ implicit val ec = system.dispatcher
+ val p = Source(1 to 50).ask[Reply](4)(replyRandomDelays).to(Sink.fromSubscriber(c)).run()
+ val sub = c.expectSubscription()
+ sub.request(1000)
+ for (n ← 1 to 50) c.expectNext(Reply(n))
+ c.expectComplete()
+ }
+
+ "signal ask timeout failure" in assertAllStagesStopped {
+ val c = TestSubscriber.manualProbe[Reply]()
+ implicit val ec = system.dispatcher
+ Source(1 to 5).map(_ + " nope")
+ .ask[Reply](4)(dontReply)(akka.util.Timeout(10.millis), implicitly[ClassTag[Reply]])
+ .to(Sink.fromSubscriber(c)).run()
+ c.expectSubscription().request(10)
+ c.expectError().getMessage should startWith("Ask timed out on [Actor[akka://FlowAskSpec/user/dontReply#")
+ }
+
+ "signal ask failure" in assertAllStagesStopped {
+ val c = TestSubscriber.manualProbe[Reply]()
+ val ref = failsOn1
+ implicit val ec = system.dispatcher
+ val p = Source(1 to 5).ask[Reply](4)(ref).to(Sink.fromSubscriber(c)).run()
+ val sub = c.expectSubscription()
+ sub.request(10)
+ c.expectError().getMessage should be("Booming for 1!")
+ }
+
+ "signal failure when target actor is terminated" in assertAllStagesStopped {
+ val r = system.actorOf(Props(classOf[Replier]).withDispatcher("akka.test.stream-dispatcher"), "wanna-fail")
+ val done = Source.maybe[Int]
+ .ask[Reply](4)(r).runWith(Sink.ignore)
+
+ intercept[RuntimeException] {
+ r ! PoisonPill
+ Await.result(done, remainingOrDefault)
+ }.getMessage should startWith("Actor watched by [ask()] has terminated! Was: Actor[akka://FlowAskSpec/user/wanna-fail#")
+ }
+
+ "a failure mid-stream must skip element with resume strategy" in assertAllStagesStopped {
+ val p = TestProbe()
+
+ val input = "a" :: "b" :: "c" :: "d" :: "e" :: "f" :: Nil
+
+ val elements = Source.fromIterator(() ⇒ input.iterator)
+ .ask[String](5)(p.ref)
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+ .runWith(Sink.seq)
+
+ // the problematic ordering:
+ p.expectMsg("a")
+ p.lastSender ! "a"
+
+ p.expectMsg("b")
+ p.lastSender ! "b"
+
+ p.expectMsg("c")
+ val cSender = p.lastSender
+
+ p.expectMsg("d")
+ p.lastSender ! "d"
+
+ p.expectMsg("e")
+ p.lastSender ! "e"
+
+ p.expectMsg("f")
+ p.lastSender ! "f"
+
+ cSender ! akka.actor.Status.Failure(new Exception("Booom!"))
+
+ elements.futureValue should ===(List("a", "b", /* no c */ "d", "e", "f"))
+ }
+
+ "resume after ask failure" in assertAllStagesStopped {
+ val c = TestSubscriber.manualProbe[Reply]()
+ implicit val ec = system.dispatcher
+ val ref = failsOn3
+ val p = Source(1 to 5)
+ .ask[Reply](4)(ref)
+ .withAttributes(supervisionStrategy(resumingDecider))
+ .to(Sink.fromSubscriber(c)).run()
+ val sub = c.expectSubscription()
+ sub.request(10)
+ for (n ← List(1, 2, 4, 5)) c.expectNext(Reply(n))
+ c.expectComplete()
+ }
+
+ "resume after multiple failures" in assertAllStagesStopped {
+ Await.result(
+ Source(1 to 6)
+ .ask[Reply](2)(failAllExcept6).withAttributes(supervisionStrategy(resumingDecider))
+ .runWith(Sink.head), 3.seconds) should ===(Reply(6))
+ }
+
+ "should handle cancel properly" in assertAllStagesStopped {
+ val pub = TestPublisher.manualProbe[Int]()
+ val sub = TestSubscriber.manualProbe[Reply]()
+
+ Source.fromPublisher(pub).ask[Reply](4)(dontReply).runWith(Sink.fromSubscriber(sub))
+
+ val upstream = pub.expectSubscription()
+ upstream.expectRequest()
+
+ sub.expectSubscription().cancel()
+
+ upstream.expectCancellation()
+
+ }
+
+ }
+}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala
new file mode 100644
index 0000000000..10c9abd5ea
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchSpec.scala
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2014-2017 Lightbend Inc.
+ */
+package akka.stream.scaladsl
+
+import akka.actor.{ Actor, PoisonPill, Props }
+import akka.stream.ActorMaterializer
+import akka.stream.testkit.Utils._
+import akka.stream.testkit._
+import akka.testkit.TestActors
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+object FlowWatchSpec {
+ case class Reply(payload: Int)
+
+ class Replier extends Actor {
+ override def receive: Receive = {
+ case msg: Int ⇒ sender() ! Reply(msg)
+ }
+ }
+
+}
+
+class FlowWatchSpec extends StreamSpec {
+ import FlowWatchSpec._
+
+ implicit val materializer = ActorMaterializer()
+
+ "A Flow with watch" must {
+
+ implicit val timeout = akka.util.Timeout(10.seconds)
+
+ val replyOnInts = system.actorOf(Props(classOf[Replier]).withDispatcher("akka.test.stream-dispatcher"), "replyOnInts")
+
+ val dontReply = system.actorOf(TestActors.blackholeProps.withDispatcher("akka.test.stream-dispatcher"), "dontReply")
+
+ "pass through elements while actor is alive" in assertAllStagesStopped {
+ val c = TestSubscriber.manualProbe[Int]()
+ implicit val ec = system.dispatcher
+ val p = Source(1 to 3).watch(replyOnInts).runWith(Sink.fromSubscriber(c))
+ val sub = c.expectSubscription()
+ sub.request(2)
+ c.expectNext(1)
+ c.expectNext(2)
+ c.expectNoMessage(200.millis)
+ sub.request(2)
+ c.expectNext(3)
+ c.expectComplete()
+ }
+
+ "signal failure when target actor is terminated" in assertAllStagesStopped {
+ val r = system.actorOf(Props(classOf[Replier]).withDispatcher("akka.test.stream-dispatcher"), "wanna-fail")
+ val done = Source.maybe[Int].watch(r).runWith(Sink.ignore)
+
+ intercept[RuntimeException] {
+ r ! PoisonPill
+ Await.result(done, remainingOrDefault)
+ }.getMessage should startWith("Actor watched by [Watch] has terminated! Was: Actor[akka://FlowWatchSpec/user/wanna-fail#")
+ }
+
+ "should handle cancel properly" in assertAllStagesStopped {
+ val pub = TestPublisher.manualProbe[Int]()
+ val sub = TestSubscriber.manualProbe[Int]()
+
+ Source.fromPublisher(pub).watch(dontReply).runWith(Sink.fromSubscriber(sub))
+
+ val upstream = pub.expectSubscription()
+ upstream.expectRequest()
+
+ sub.expectSubscription().cancel()
+
+ upstream.expectCancellation()
+
+ }
+
+ }
+}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala
index 89a1dc4ad1..b6379a7204 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala
@@ -56,7 +56,7 @@ class FlowWatchTerminationSpec extends StreamSpec {
sinkProbe.request(5)
sourceProbe.sendNext(1)
sinkProbe.expectNext(1)
- expectNoMsg(300.millis)
+ expectNoMessage(300.millis)
sourceProbe.sendComplete()
expectMsg(Done)
diff --git a/akka-stream/src/main/mima-filters/2.5.10.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.10.backwards.excludes
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes
index 56cf37ee77..67c4ddf287 100644
--- a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes
+++ b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes
@@ -15,3 +15,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSou
# #24254 add collectType
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.collectType")
+
+# #24325 ask stage
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.ask")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.watch")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.FlowOps.ask")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.FlowOps.watch")
diff --git a/akka-stream/src/main/scala/akka/stream/WatchedActorTerminatedException.scala b/akka-stream/src/main/scala/akka/stream/WatchedActorTerminatedException.scala
new file mode 100644
index 0000000000..eff4d022f6
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/WatchedActorTerminatedException.scala
@@ -0,0 +1,13 @@
+/**
+ * Copyright (C) 2009-2018 Lightbend Inc.
+ */
+package akka.stream
+
+import akka.actor.ActorRef
+
+/**
+ * Used as failure exception by an `ask` stage if the target actor terminates.
+ * See `Flow.ask` and `Flow.watch`.
+ */
+final class WatchedActorTerminatedException(val watchingStageName: String, val ref: ActorRef)
+ extends RuntimeException(s"Actor watched by [$watchingStageName] has terminated! Was: $ref")
diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
index ea06d31cf0..ff8d0a5324 100755
--- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
@@ -28,6 +28,7 @@ import akka.stream._
val recover = name("recover")
val mapAsync = name("mapAsync")
val mapAsyncUnordered = name("mapAsyncUnordered")
+ val ask = name("ask")
val grouped = name("grouped")
val groupedWithin = name("groupedWithin")
val groupedWeightedWithin = name("groupedWeightedWithin")
@@ -89,6 +90,7 @@ import akka.stream._
val delay = name("delay")
val terminationWatcher = name("terminationWatcher")
+ val watch = name("watch")
val publisherSource = name("publisherSource")
val iterableSource = name("iterableSource")
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
index 122916129b..5902586c4c 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
@@ -5,10 +5,12 @@ package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
+import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.ExecutionContexts
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
+import akka.pattern.AskSupport
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
@@ -27,7 +29,9 @@ import akka.stream.ActorAttributes.SupervisionStrategy
import scala.concurrent.duration.{ FiniteDuration, _ }
import akka.stream.impl.Stages.DefaultAttributes
-import akka.util.OptionVal
+import akka.util.{ OptionVal, Timeout }
+
+import scala.reflect.ClassTag
/**
* INTERNAL API
@@ -1325,6 +1329,33 @@ private[stream] object Collect {
}
}
+@InternalApi private[akka] final case class Watch[T](targetRef: ActorRef) extends SimpleLinearGraphStage[T] {
+
+ override def initialAttributes = DefaultAttributes.watch
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
+
+ private lazy val self = getStageActor {
+ case (_, Terminated(`targetRef`)) ⇒
+ failStage(new WatchedActorTerminatedException("Watch", targetRef))
+ }
+
+ override def preStart(): Unit = {
+ // initialize self, and watch the target
+ self.watch(targetRef)
+ }
+
+ override def onPull(): Unit =
+ pull(in)
+
+ override def onPush(): Unit =
+ push(out, grab(in))
+
+ setHandlers(in, out, this)
+ }
+}
+
/**
* INTERNAL API
*/
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 314615bd81..ec3b01e65c 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -3,7 +3,7 @@
*/
package akka.stream.javadsl
-import akka.util.ConstantFun
+import akka.util.{ ConstantFun, Timeout }
import akka.{ Done, NotUsed }
import akka.event.LoggingAdapter
import akka.japi.{ Pair, function }
@@ -16,6 +16,7 @@ import akka.japi.Util
import java.util.Comparator
import java.util.concurrent.CompletionStage
+import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts
import akka.stream.impl.fusing.LazyFlow
@@ -528,10 +529,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* '''Emits when''' the CompletionStage returned by the provided function finishes for the next element in sequence
*
- * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
+ * '''Backpressures when''' the number of CompletionStages reaches the configured parallelism and the downstream
* backpressures or the first future is not completed
*
- * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
+ * '''Completes when''' upstream completes and all CompletionStages have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
@@ -563,9 +564,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* '''Emits when''' any of the CompletionStages returned by the provided function complete
*
- * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
+ * '''Backpressures when''' the number of CompletionStages reaches the configured parallelism and the downstream backpressures
*
- * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
+ * '''Completes when''' upstream completes and all CompletionStages have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
@@ -574,6 +575,52 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala))
+ /**
+ * Use the `ask` pattern to send a request-reply message to the target `ref` actor.
+ * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
+ *
+ * The `mapTo` class parameter is used to cast the incoming responses to the expected response type.
+ *
+ * Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
+ * An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
+ *
+ * Parallelism limits the number of how many asks can be "in flight" at the same time.
+ * Please note that the elements emitted by this stage are in-order with regards to the asks being issued
+ * (i.e. same behaviour as mapAsync).
+ *
+ * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' any of the CompletionStages returned by the provided function complete
+ *
+ * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
+ *
+ * '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def ask[S](parallelism: Int, ref: ActorRef, mapTo: Class[S], timeout: Timeout): javadsl.Flow[In, S, Mat] =
+ new Flow(delegate.ask[S](parallelism)(ref)(timeout, ClassTag(mapTo)))
+
+ /**
+ * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
+ *
+ * '''Emits when''' upstream emits
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Fails when''' the watched actor terminates
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def watch(ref: ActorRef): javadsl.Flow[In, Out, Mat] =
+ new Flow(delegate.watch(ref))
+
/**
* Only pass on those elements that satisfy the given predicate.
*
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index cef2191d26..edafd5097a 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -6,7 +6,7 @@ package akka.stream.javadsl
import java.util
import java.util.Optional
-import akka.util.ConstantFun
+import akka.util.{ ConstantFun, Timeout }
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter
@@ -1253,6 +1253,52 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala))
+ /**
+ * Use the `ask` pattern to send a request-reply message to the target `ref` actor.
+ * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
+ *
+ * The `mapTo` class parameter is used to cast the incoming responses to the expected response type.
+ *
+ * Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
+ * An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
+ *
+ * Parallelism limits the number of how many asks can be "in flight" at the same time.
+ * Please note that the elements emitted by this stage are in-order with regards to the asks being issued
+ * (i.e. same behaviour as mapAsync).
+ *
+ * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' any of the CompletionStages returned by the provided function complete
+ *
+ * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
+ *
+ * '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def ask[S](parallelism: Int, ref: ActorRef, mapTo: Class[S], timeout: Timeout): javadsl.Source[S, Mat] =
+ new Source(delegate.ask[S](parallelism)(ref)(timeout, ClassTag(mapTo)))
+
+ /**
+ * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
+ *
+ * '''Emits when''' upstream emits
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Fails when''' the watched actor terminates
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def watch(ref: ActorRef): javadsl.Source[Out, Mat] =
+ new Source(delegate.watch(ref))
+
/**
* Only pass on those elements that satisfy the given predicate.
*
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 1bf4959b52..7d129ce092 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -9,7 +9,7 @@ import akka.Done
import akka.stream.impl._
import akka.stream.impl.fusing._
import akka.stream.stage._
-import akka.util.ConstantFun
+import akka.util.{ ConstantFun, Timeout }
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
@@ -19,8 +19,10 @@ import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
+import akka.actor.ActorRef
import akka.annotation.DoNotInherit
+import scala.annotation.implicitNotFound
import scala.reflect.ClassTag
/**
@@ -845,6 +847,75 @@ trait FlowOps[+Out, +Mat] {
*/
def mapAsyncUnordered[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T] = via(MapAsyncUnordered(parallelism, f))
+ /**
+ * Use the `ask` pattern to send a request-reply message to the target `ref` actor.
+ * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
+ *
+ * Do not forget to include the expected response type in the method call, like so:
+ *
+ * '''
+ * flow.ask[ExpectedReply](ref)
+ * '''
+ *
+ * otherwise `Nothing` will be assumed, which is most likely not what you want.
+ *
+ * Parallelism limits the number of how many asks can be "in flight" at the same time.
+ * Please note that the elements emitted by this stage are in-order with regards to the asks being issued
+ * (i.e. same behaviour as mapAsync).
+ *
+ * The mapTo class parameter is used to cast the incoming responses to the expected response type.
+ *
+ * Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
+ * An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
+ *
+ * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed
+ *
+ * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
+ *
+ * '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ @implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
+ def ask[S](parallelism: Int)(ref: ActorRef)(implicit timeout: Timeout, tag: ClassTag[S]): Repr[S] = {
+ val askFlow = Flow[Out]
+ .watch(ref)
+ .mapAsync(parallelism) { el ⇒
+ akka.pattern.ask(ref).?(el)(timeout).mapTo[S](tag)
+ }
+ .recover[S] {
+ // the purpose of this recovery is to change the name of the stage in that exception
+ // we do so in order to help users find which stage caused the failure -- "the ask stage"
+ case ex: WatchedActorTerminatedException ⇒
+ throw new WatchedActorTerminatedException("ask()", ex.ref)
+ }
+ .named("ask")
+
+ via(askFlow)
+ }
+
+ /**
+ * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
+ *
+ * '''Emits when''' upstream emits
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Fails when''' the watched actor terminates
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def watch(ref: ActorRef): Repr[Out] =
+ via(Watch(ref))
+
/**
* Only pass on those elements that satisfy the given predicate.
*