From 046b88ce904d0c3db404a796082f449feb16511c Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 12 Apr 2018 22:06:37 +0900 Subject: [PATCH] +doc Add examples to Sink.actorRefWithAck (#24854) --- .../paradox/stream/stream-integrations.md | 21 ++++++ .../java/jdocs/stream/IntegrationDocTest.java | 58 +++++++++++++++++ .../docs/stream/IntegrationDocSpec.scala | 65 ++++++++++++++++++- 3 files changed, 141 insertions(+), 3 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index b267140c1d..f48e973272 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -68,6 +68,27 @@ If the target actor terminates the stream will be cancelled. When the stream is given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with failure a `akka.actor.Status.Failure` message will be sent to the destination actor. +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck } + +The receiving actor would then need to be implemented similar to the following: + +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor } + +Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted +as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements. +Handling the other signals while is not required, however is a good practice, to see the state of the streams lifecycle +in the connected actor as well. Technically it is also possible to use multiple sinks targeting the same actor, +however it is not common practice to do so, and one should rather investigate using a `Merge` stage for this purpose. + + @@@ note Using `Sink.actorRef` or ordinary `tell` from a `map` or `foreach` stage means that there is diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 8dd52ed5e4..30f374ae0d 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -36,6 +36,7 @@ import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.AKKA; import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.tweets; import static junit.framework.TestCase.assertTrue; +@SuppressWarnings("ALL") public class IntegrationDocTest extends AbstractJavaTest { private static final SilenceSystemOut.System System = SilenceSystemOut.get(); @@ -309,6 +310,41 @@ public class IntegrationDocTest extends AbstractJavaTest { } //#ask-actor + //#actorRefWithAck-actor + static class Ack {} + + static class StreamInitialized {} + static class StreamCompleted {} + static class StreamFailure { + private final Throwable cause; + public StreamFailure(Throwable cause) { this.cause = cause; } + + public Throwable getCause() { return cause; } + } + + static class AckingReceiver extends AbstractLoggingActor { + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(StreamInitialized.class, init -> { + log().info("Stream initialized"); + }) + .match(String.class, element -> { + log().info("Received element: {}", element); + sender().tell(new Ack(), self()); + }) + .match(StreamCompleted.class, completed -> { + log().info("Stream completed"); + }) + .match(StreamFailure.class, failed -> { + log().error(failed.getCause(),"Stream failed!"); + }) + .build(); + } + } + //#actorRefWithAck-actor + @SuppressWarnings("unchecked") @Test public void askStage() throws Exception { @@ -325,6 +361,28 @@ public class IntegrationDocTest extends AbstractJavaTest { //#ask } + @Test + public void actorRefWithAckExample() throws Exception { + //#actorRefWithAck + Source words = + Source.from(Arrays.asList("hello", "hi")); + + ActorRef receiver = + system.actorOf(Props.create(AckingReceiver.class)); + + Sink sink = Sink.actorRefWithAck(receiver, + new StreamInitialized(), + new Ack(), + new StreamCompleted(), + ex -> new StreamFailure(ex) + ); + + words + .map(el -> el.toLowerCase()) + .runWith(sink, mat); + //#actorRefWithAck + } + @Test public void callingExternalServiceWithMapAsync() throws Exception { diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index e0758c36aa..8efd071f7e 100644 --- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -10,21 +10,23 @@ import scala.concurrent.duration._ import akka.testkit.AkkaSpec import akka.stream.scaladsl._ import akka.stream.ActorMaterializer + import scala.concurrent.Future import akka.testkit.TestProbe -import akka.actor.ActorRef +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Status } import com.typesafe.config.ConfigFactory -import akka.actor.Actor -import akka.actor.Props import akka.util.Timeout import akka.stream.Attributes import akka.stream.ActorAttributes + import scala.concurrent.ExecutionContext import akka.stream.ActorMaterializerSettings import java.util.concurrent.atomic.AtomicInteger + import akka.stream.Supervision import akka.stream.scaladsl.Flow import akka.Done +import akka.actor.Status.Status object IntegrationDocSpec { import TwitterStreamQuickstartDocSpec._ @@ -195,6 +197,63 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { probe.expectMsg("akkateam@somewhere.com") } + "actorRefWithAck" in { + //#actorRefWithAck + val words: Source[String, NotUsed] = + Source(List("hello", "hi")) + + // sent from actor to stream to "ack" processing of given element + val AckMessage = AckingReceiver.Ack + + // sent from stream to actor to indicate start, end or failure of stream: + val InitMessage = AckingReceiver.StreamInitialized + val OnCompleteMessage = AckingReceiver.StreamCompleted + val onErrorMessage = (ex: Throwable) ⇒ AckingReceiver.StreamFailure(ex) + + val receiver = system.actorOf( + Props(new AckingReceiver(ackWith = AckMessage))) + val sink = Sink.actorRefWithAck( + receiver, + onInitMessage = InitMessage, + ackMessage = AckMessage, + onCompleteMessage = OnCompleteMessage, + onFailureMessage = onErrorMessage + ) + + words + .map(_.toLowerCase) + .runWith(sink) + //#actorRefWithAck + } + + //#actorRefWithAck-actor + object AckingReceiver { + case object Ack + + case object StreamInitialized + case object StreamCompleted + final case class StreamFailure(ex: Throwable) + } + + class AckingReceiver(ackWith: Any) extends Actor with ActorLogging { + import AckingReceiver._ + + def receive: Receive = { + case StreamInitialized ⇒ + log.info("Stream initialized!") + + case el: String ⇒ + log.info("Received element: {}", el) + sender() ! Ack // ack to allow the stream to proceed sending more elements + + case StreamCompleted ⇒ + log.info("Stream completed!") + case StreamFailure(ex) ⇒ + log.error(ex, "Stream failed!") + } + } + //#actorRefWithAck-actor + "lookup email with mapAsync and supervision" in { val addressSystem = new AddressSystem2 val authors: Source[Author, NotUsed] =