From f37f41574dd9873d35a9fa97fc19f1fed96bf6cc Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Mon, 20 May 2019 12:19:44 +0200 Subject: [PATCH] Implements actorRef source variant with backpressure #17610 (#26633) * Implements actorRef source variant with backpressure #17610 * Small improvements to documentation and source #17610 * Small improvements to test #17610 * Small improvements to implementation and tests #17610 * Adds API for akka-typed #17610 * Adds ack sender and java api for typed #17610 --- .../operators/ActorSource/actorRefWithAck.md | 24 ++++ .../operators/Source/actorRefWithAck.md | 33 +++++ .../main/paradox/stream/operators/index.md | 4 + .../stream/operators/SourceDocExamples.java | 27 +++- .../stream/operators/SourceOperators.scala | 29 +++- .../stream/DslFactoriesConsistencySpec.scala | 1 + .../ActorRefBackpressureSourceSpec.scala | 124 ++++++++++++++++++ .../stream/typed/javadsl/ActorSource.scala | 29 +++- .../stream/typed/scaladsl/ActorSource.scala | 29 +++- .../typed/scaladsl/ActorSourceSinkSpec.scala | 23 +++- .../impl/ActorRefBackpressureSource.scala | 95 ++++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 1 + .../scala/akka/stream/javadsl/Source.scala | 22 ++++ .../scala/akka/stream/scaladsl/Source.scala | 37 ++++++ 14 files changed, 472 insertions(+), 6 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala diff --git a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md new file mode 100644 index 0000000000..402cad9d7e --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md @@ -0,0 +1,24 @@ +# actorRefWithAck + +Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. + +@ref[Source operators](../index.md#source-operators) + +@@@ div { .group-scala } +## Signature + +@@signature [Source.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala) { #actorRefWithAck } +@@@ + +## Description + +Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`], sending messages to it will emit them on the stream. The actor responds with the provided ack message +once the element could be emitted alowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream. + +@@@div { .callout } + +**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef` + +**completes** when the `ActorRef` is sent `akka.actor.Status.Success` + +@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md new file mode 100644 index 0000000000..7ad40f7323 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md @@ -0,0 +1,33 @@ +# actorRefWithAck + +Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. + +@ref[Source operators](../index.md#source-operators) + +@@@ div { .group-scala } +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRefWithAck } +@@@ + +## Description + +Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor responds with the provided ack message +once the element could be emitted alowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream. + +@@@div { .callout } + +**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef` + +**completes** when the `ActorRef` is sent `akka.actor.Status.Success` + +@@@ + +## Examples + + +Scala +: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithAck } + +Java +: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref-with-ack } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 4b390e1d43..60428cc4b5 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -8,6 +8,8 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] | |Operator|Description| |--|--|--| |Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| +|Source|@ref[actorRefWithAck](Source/actorRefWithAck.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| +|ActorSource|@ref[actorRefWithAck](ActorSource/actorRefWithAck.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| @@ -310,6 +312,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [lazilyAsync](Source/lazilyAsync.md) * [asSubscriber](Source/asSubscriber.md) * [actorRef](Source/actorRef.md) +* [actorRefWithAck](Source/actorRefWithAck.md) * [zipN](Source/zipN.md) * [zipWithN](Source/zipWithN.md) * [queue](Source/queue.md) @@ -433,6 +436,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [withBackoff](RestartFlow/withBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [withBackoff](RestartSink/withBackoff.md) +* [actorRefWithAck](ActorSource/actorRefWithAck.md) * [ask](ActorFlow/ask.md) * [actorRef](ActorSink/actorRef.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index d94c01292e..3ee9a6e92e 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -8,8 +8,9 @@ package jdocs.stream.operators; // #range-imports import akka.NotUsed; import akka.actor.ActorSystem; +import akka.actor.testkit.typed.javadsl.ManualTime; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.stream.ActorMaterializer; -import akka.stream.CompletionStrategy; import akka.stream.Materializer; import akka.stream.javadsl.Source; // #range-imports @@ -18,7 +19,9 @@ import akka.stream.javadsl.Source; import akka.actor.ActorRef; import akka.actor.Status.Success; import akka.stream.OverflowStrategy; +import akka.stream.CompletionStrategy; import akka.stream.javadsl.Sink; +import akka.testkit.TestProbe; // #actor-ref-imports import java.util.Arrays; @@ -27,6 +30,8 @@ import java.util.Arrays; public class SourceDocExamples { + public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config()); + public static void fromExample() { // #source-from-example final ActorSystem system = ActorSystem.create("SourceFromExample"); @@ -85,4 +90,24 @@ public class SourceDocExamples { actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender()); // #actor-ref } + + static void actorRefWithAck() { + final TestProbe probe = null; + + // #actor-ref-with-ack + final ActorSystem system = ActorSystem.create(); + final Materializer materializer = ActorMaterializer.create(system); + + Source source = Source.actorRefWithAck("ack"); + + ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(materializer); + probe.send(actorRef, "hello"); + probe.expectMsg("ack"); + probe.send(actorRef, "hello"); + probe.expectMsg("ack"); + + // The stream completes successfully with the following message + actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender()); + // #actor-ref-with-ack + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index 9ccf83e4a2..86d511686e 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -6,6 +6,7 @@ package docs.stream.operators import akka.actor.ActorSystem import akka.stream.ActorMaterializer +import akka.testkit.TestProbe object SourceOperators { @@ -35,6 +36,7 @@ object SourceOperators { import akka.actor.Status.Success import akka.actor.ActorRef import akka.stream.OverflowStrategy + import akka.stream.CompletionStrategy import akka.stream.scaladsl._ implicit val system: ActorSystem = ActorSystem() @@ -48,7 +50,32 @@ object SourceOperators { actorRef ! "hello" // The stream completes successfully with the following message - actorRef ! Success("completes stream") + actorRef ! Success(CompletionStrategy.immediately) //#actorRef } + + def actorRefWithAck(): Unit = { + //#actorRefWithAck + + import akka.actor.Status.Success + import akka.actor.ActorRef + import akka.stream.CompletionStrategy + import akka.stream.scaladsl._ + + implicit val system: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + val probe = TestProbe() + + val source: Source[Any, ActorRef] = Source.actorRefWithAck[Any]("ack") + val actorRef: ActorRef = source.to(Sink.foreach(println)).run() + + probe.send(actorRef, "hello") + probe.expectMsg("ack") + probe.send(actorRef, "hello") + probe.expectMsg("ack") + + // The stream completes successfully with the following message + actorRef ! Success(CompletionStrategy.immediately) + //#actorRefWithAck + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 7f0be3d358..09560cf2d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -142,6 +142,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl + Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala new file mode 100644 index 0000000000..62304b4b0d --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.actor.Status +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, Attributes } +import akka.stream.testkit.StreamSpec +import akka.testkit.TestProbe + +import scala.concurrent.duration._ + +private object ActorRefBackpressureSourceSpec { + case object AckMsg +} + +class ActorRefBackpressureSourceSpec extends StreamSpec { + import ActorRefBackpressureSourceSpec._ + + private implicit val materializer: ActorMaterializer = ActorMaterializer() + + "An Source.actorRefWithAck" must { + + "emit received messages to the stream and ack" in assertAllStagesStopped { + val probe = TestProbe() + val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + + val sub = s.expectSubscription() + sub.request(10) + probe.send(ref, 1) + s.expectNext(1) + probe.expectMsg(AckMsg) + + probe.send(ref, 2) + s.expectNext(2) + probe.expectMsg(AckMsg) + + s.expectNoMessage(50.millis) + + ref ! Status.Success("ok") + s.expectComplete() + } + + "fail when consumer does not await ack" in assertAllStagesStopped { + val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + + val sub = s.expectSubscription() + for (n <- 1 to 20) ref ! n + sub.request(1) + + @scala.annotation.tailrec + def verifyNext(n: Int): Unit = { + if (n > 10) + s.expectComplete() + else + s.expectNextOrError() match { + case Right(`n`) => verifyNext(n + 1) + case Right(x) => fail(s"expected $n, got $x") + case Left(t) => t.getMessage shouldBe "Received new element before ack was signaled back" + } + } + verifyNext(1) + } + + "complete after receiving Status.Success" in assertAllStagesStopped { + val probe = TestProbe() + val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + + val sub = s.expectSubscription() + sub.request(10) + probe.send(ref, 1) + s.expectNext(1) + probe.expectMsg(AckMsg) + + ref ! Status.Success("ok") + + s.expectComplete() + } + + "fail after receiving Status.Failure" in assertAllStagesStopped { + val probe = TestProbe() + val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + + val sub = s.expectSubscription() + sub.request(10) + probe.send(ref, 1) + s.expectNext(1) + probe.expectMsg(AckMsg) + + ref ! Status.Failure(TE("test")) + + s.expectError(TE("test")) + } + + "not buffer elements after receiving Status.Success" in assertAllStagesStopped { + val probe = TestProbe() + val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + + val sub = s.expectSubscription() + sub.request(10) + probe.send(ref, 1) + s.expectNext(1) + probe.expectMsg(AckMsg) + + probe.send(ref, 2) + s.expectNext(2) + probe.expectMsg(AckMsg) + + ref ! Status.Success("ok") + + probe.send(ref, 100) + probe.send(ref, 100) + probe.send(ref, 100) + probe.send(ref, 100) + probe.expectNoMessage(200.millis) + + s.expectComplete() + } + } +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala index cc16238ef2..fa0cc33e13 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -7,8 +7,8 @@ package akka.stream.typed.javadsl import java.util.function.Predicate import akka.actor.typed._ -import akka.stream.OverflowStrategy import akka.stream.javadsl._ +import akka.stream.{ CompletionStrategy, OverflowStrategy } /** * Collection of Sources aimed at integrating with typed Actors. @@ -60,4 +60,31 @@ object ActorSource { overflowStrategy) .asJava } + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + def actorRefWithAck[T, Ack]( + ackTo: ActorRef[Ack], + ackMessage: Ack, + completionMatcher: PartialFunction[T, CompletionStrategy], + failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] = + akka.stream.typed.scaladsl.ActorSource + .actorRefWithAck[T, Ack]( + ackTo, + ackMessage, + completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]], + failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]]) + .asJava } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala index 1e4a571460..4d4644c340 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -5,8 +5,8 @@ package akka.stream.typed.scaladsl import akka.actor.typed._ -import akka.stream.{ CompletionStrategy, OverflowStrategy } import akka.stream.scaladsl._ +import akka.stream.{ CompletionStrategy, OverflowStrategy } /** * Collection of Sources aimed at integrating with typed Actors. @@ -59,4 +59,31 @@ object ActorSource { bufferSize, overflowStrategy) .mapMaterializedValue(actorRefAdapter) + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + def actorRefWithAck[T, Ack]( + ackTo: ActorRef[Ack], + ackMessage: Ack, + completionMatcher: PartialFunction[T, CompletionStrategy], + failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] = + Source + .actorRefWithAck[T]( + Some(ackTo.toUntyped), + ackMessage, + completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]], + failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]]) + .mapMaterializedValue(actorRefAdapter) } diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala index 2e32239f6e..f1b66564b9 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -6,7 +6,7 @@ package akka.stream.typed.scaladsl import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors -import akka.stream.OverflowStrategy +import akka.stream.{ CompletionStrategy, OverflowStrategy } import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source @@ -107,6 +107,25 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with WordSpecLike { out.failed.futureValue.getCause.getMessage shouldBe "boom!" } - } + "send message and ack" in { + val p = TestProbe[String]() + + val (in, out) = ActorSource + .actorRefWithAck[String, String]( + p.ref, + "ack", { case "complete" => CompletionStrategy.draining }, + PartialFunction.empty) + .toMat(Sink.seq)(Keep.both) + .run() + + in ! "one" + p.expectMessage("ack") + in ! "two" + p.expectMessage("ack") + in ! "complete" + + out.futureValue should contain theSameElementsAs Seq("one", "two") + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala new file mode 100644 index 0000000000..3933e693fe --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.ActorRef +import akka.annotation.InternalApi +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage._ +import akka.util.OptionVal + +private object ActorRefBackpressureSource { + private sealed trait ActorRefStage { def ref: ActorRef } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class ActorRefBackpressureSource[T]( + ackTo: Option[ActorRef], + ackMessage: Any, + completionMatcher: PartialFunction[Any, CompletionStrategy], + failureMatcher: PartialFunction[Any, Throwable]) + extends GraphStageWithMaterializedValue[SourceShape[T], ActorRef] { + import ActorRefBackpressureSource._ + + val out: Outlet[T] = Outlet[T]("actorRefSource.out") + + override val shape: SourceShape[T] = SourceShape.of(out) + override def initialAttributes: Attributes = DefaultAttributes.actorRefWithAckSource + + def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) = + throw new IllegalStateException("Not supported") + + private[akka] override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes, + eagerMaterializer: Materializer): (GraphStageLogic, ActorRef) = { + val stage: GraphStageLogic with StageLogging with ActorRefStage = new GraphStageLogic(shape) with StageLogging + with ActorRefStage { + override protected def logSource: Class[_] = classOf[ActorRefSource[_]] + + private var isCompleting: Boolean = false + private var element: OptionVal[(ActorRef, T)] = OptionVal.none + + override protected def stageActorName: String = + inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) + + val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false) { + case (_, m) if failureMatcher.isDefinedAt(m) ⇒ + failStage(failureMatcher(m)) + case (_, m) if completionMatcher.isDefinedAt(m) ⇒ + completionMatcher(m) match { + case CompletionStrategy.Draining => + isCompleting = true + tryPush() + case CompletionStrategy.Immediately => + completeStage() + } + case e: (ActorRef, T) @unchecked ⇒ + if (element.isDefined) { + failStage(new IllegalStateException("Received new element before ack was signaled back")) + } else { + ackTo match { + case Some(at) => element = OptionVal.Some((at, e._2)) + case None => element = OptionVal.Some(e) + } + tryPush() + } + }.ref + + private def tryPush(): Unit = { + if (isAvailable(out) && element.isDefined) { + val (s, e) = element.get + push(out, e) + element = OptionVal.none + s ! ackMessage + } + + if (isCompleting && element.isEmpty) { + completeStage() + } + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + tryPush() + } + }) + } + + (stage, stage.ref) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index da50fa817b..d9213d3c62 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -109,6 +109,7 @@ import akka.stream._ val subscriberSource = name("subscriberSource") val actorPublisherSource = name("actorPublisherSource") val actorRefSource = name("actorRefSource") + val actorRefWithAckSource = name("actorRefWithAckSource") val queueSource = name("queueSource") val inputStreamSource = name("inputStreamSource") and IODispatcher val outputStreamSource = name("outputStreamSource") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4c1346fdfd..457d9224d0 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -348,6 +348,28 @@ object Source { def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy)) + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]. + * If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immidiately, + * otherwise if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else) + * already buffered element will be signaled before siganling completion. + * + * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the + * actor reference. In case the Actor is still draining its internal buffer (after having received + * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = + new Source(scaladsl.Source.actorRefWithAck(ackMessage)) + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 03f17c0769..5a4804b23b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -569,6 +569,43 @@ object Source { case akka.actor.Status.Success => CompletionStrategy.Draining }, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy) + /** + * INTERNAL API + */ + @InternalApi private[akka] def actorRefWithAck[T]( + ackTo: Option[ActorRef], + ackMessage: Any, + completionMatcher: PartialFunction[Any, CompletionStrategy], + failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = { + Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher)) + } + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]. + * If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immidiately, + * otherwise if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else) + * already buffered element will be signaled before siganling completion. + * + * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the + * actor reference. In case the Actor is still draining its internal buffer (after having received + * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = + actorRefWithAck(None, ackMessage, { + case akka.actor.Status.Success(s: CompletionStrategy) => s + case akka.actor.Status.Success(_) => CompletionStrategy.Draining + case akka.actor.Status.Success => CompletionStrategy.Draining + }, { case akka.actor.Status.Failure(cause) => cause }) + /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. */