diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 30f374ae0d..16fdbf11cf 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -329,6 +329,7 @@ public class IntegrationDocTest extends AbstractJavaTest { return receiveBuilder() .match(StreamInitialized.class, init -> { log().info("Stream initialized"); + sender().tell(new Ack(), self()); }) .match(String.class, element -> { log().info("Received element: {}", element); diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 8efd071f7e..5514275abe 100644 --- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -210,8 +210,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val OnCompleteMessage = AckingReceiver.StreamCompleted val onErrorMessage = (ex: Throwable) ⇒ AckingReceiver.StreamFailure(ex) + val probe = TestProbe() val receiver = system.actorOf( - Props(new AckingReceiver(ackWith = AckMessage))) + Props(new AckingReceiver(probe.ref, ackWith = AckMessage))) val sink = Sink.actorRefWithAck( receiver, onInitMessage = InitMessage, @@ -224,6 +225,10 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { .map(_.toLowerCase) .runWith(sink) //#actorRefWithAck + probe.expectMsg("Stream initialized!") + probe.expectMsg("hello") + probe.expectMsg("hi") + probe.expectMsg("Stream completed!") } //#actorRefWithAck-actor @@ -235,19 +240,23 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { final case class StreamFailure(ex: Throwable) } - class AckingReceiver(ackWith: Any) extends Actor with ActorLogging { + class AckingReceiver(probe: ActorRef, ackWith: Any) extends Actor with ActorLogging { import AckingReceiver._ def receive: Receive = { case StreamInitialized ⇒ log.info("Stream initialized!") + probe ! "Stream initialized!" + sender() ! Ack // ack to allow the stream to proceed sending more elements case el: String ⇒ log.info("Received element: {}", el) + probe ! el sender() ! Ack // ack to allow the stream to proceed sending more elements case StreamCompleted ⇒ log.info("Stream completed!") + probe ! "Stream completed!" case StreamFailure(ex) ⇒ log.error(ex, "Stream failed!") }