From c9b3f1de6d3a6acd14bb0947c0d0af367e469d11 Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Wed, 27 Mar 2019 14:16:38 +0100 Subject: [PATCH] GrapheStage implementation for actorRef source (#25324) (#26054) * Adds internal access to materializer before initialization (#25324) * Implements new actorRef source based on graph stage (#25324) * Removes obsolete actorRef source (#25324) * Improves backwards compatibility with old implementation (#25324) * Removes dedicated new subclass for materializer access again (#25324) * Improves implementation (#25324) * Finalizes implementation (#25324) * Small improvements to API and documentation (#25324) * Completion strategy as a replacement for poison pill (#25324) * Adding more tests and updating the documentation (#25324) --- .../paradox/stream/stream-integrations.md | 5 + .../java/jdocs/stream/IntegrationDocTest.java | 3 +- .../stream/operators/SourceDocExamples.java | 3 +- .../docs/stream/StreamTestKitDocSpec.scala | 2 +- .../stream/scaladsl/ActorRefSourceSpec.scala | 80 ++++++++-- .../stream/typed/scaladsl/ActorSource.scala | 4 +- .../mima-filters/2.5.21.backwards.excludes | 12 ++ .../akka/stream/CompletionStrategy.scala | 35 ++++ .../akka/stream/impl/ActorRefSource.scala | 149 ++++++++++++++++++ .../stream/impl/ActorRefSourceActor.scala | 129 --------------- .../main/scala/akka/stream/impl/Modules.scala | 28 ---- .../impl/PhasedFusingActorMaterializer.scala | 2 +- .../scala/akka/stream/javadsl/Source.scala | 6 + .../scala/akka/stream/scaladsl/Source.scala | 28 ++-- .../scala/akka/stream/stage/GraphStage.scala | 43 ++++- 15 files changed, 328 insertions(+), 201 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/CompletionStrategy.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index 1dc2d34087..dc0e7d3c5c 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -162,10 +162,15 @@ at a rate that is faster than the stream can consume. You should consider using if you want a backpressured actor interface. The stream can be completed successfully by sending `akka.actor.Status.Success` to the actor reference. +If the content is `akka.stream.CompletionStrategy.immediately` the completion will be signaled immidiately. +If the content is `akka.stream.CompletionStrategy.draining` already buffered elements will be signaled before siganling completion. +Any other content will be ignored and fall back to the draining behaviour. The stream can be completed with failure by sending `akka.actor.Status.Failure` to the actor reference. +Note: Sending a `PoisonPill` is deprecated and will be ignored in the future. + The actor will be stopped when the stream is completed, failed or cancelled from downstream, i.e. you can watch it to get notified when that happens. diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 5e6381893e..7ecec02317 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -787,7 +787,8 @@ public class IntegrationDocTest extends AbstractJavaTest { actorRef.tell(1, ActorRef.noSender()); actorRef.tell(2, ActorRef.noSender()); actorRef.tell(3, ActorRef.noSender()); - actorRef.tell(new akka.actor.Status.Success("done"), ActorRef.noSender()); + actorRef.tell( + new akka.actor.Status.Success(CompletionStrategy.draining()), ActorRef.noSender()); // #source-actorRef } }; diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 0dd0e9f5a6..d94c01292e 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -9,6 +9,7 @@ package jdocs.stream.operators; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; +import akka.stream.CompletionStrategy; import akka.stream.Materializer; import akka.stream.javadsl.Source; // #range-imports @@ -81,7 +82,7 @@ public class SourceDocExamples { actorRef.tell("hello", ActorRef.noSender()); // The stream completes successfully with the following message - actorRef.tell(new Success("completes stream"), ActorRef.noSender()); + actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender()); // #actor-ref } } diff --git a/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala b/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala index 2fea704f05..7bf420b7b3 100644 --- a/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala @@ -89,7 +89,7 @@ class StreamTestKitDocSpec extends AkkaSpec { ref ! 1 ref ! 2 ref ! 3 - ref ! akka.actor.Status.Success("done") + ref ! akka.actor.Status.Success(CompletionStrategy.draining) val result = Await.result(future, 3.seconds) assert(result == "123") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 9a86faf0a4..d740c2ed57 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -4,25 +4,26 @@ package akka.stream.scaladsl -import scala.concurrent.duration._ -import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy } -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl._ -import akka.stream.testkit.Utils._ -import akka.stream.testkit.scaladsl.StreamTestKit._ -import akka.actor.PoisonPill -import akka.actor.Status import akka.Done +import akka.actor.{ PoisonPill, Status } +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.stream.testkit.scaladsl._ +import akka.stream._ +import akka.stream.testkit.TestSubscriber.OnComplete + +import scala.concurrent.duration._ class ActorRefSourceSpec extends StreamSpec { - implicit val materializer = ActorMaterializer() + private implicit val materializer = ActorMaterializer() "A ActorRefSource" must { "emit received messages to the stream" in { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription + val sub = s.expectSubscription() sub.request(2) ref ! 1 s.expectNext(1) @@ -35,7 +36,7 @@ class ActorRefSourceSpec extends StreamSpec { "buffer when needed" in { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription + val sub = s.expectSubscription() for (n <- 1 to 20) ref ! n sub.request(10) for (n <- 1 to 10) s.expectNext(n) @@ -65,7 +66,7 @@ class ActorRefSourceSpec extends StreamSpec { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() watch(ref) - val sub = s.expectSubscription + val sub = s.expectSubscription() sub.cancel() expectTerminated(ref) } @@ -74,7 +75,7 @@ class ActorRefSourceSpec extends StreamSpec { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() watch(ref) - val sub = s.expectSubscription + val sub = s.expectSubscription() sub.request(100) sub.cancel() expectTerminated(ref) @@ -83,7 +84,7 @@ class ActorRefSourceSpec extends StreamSpec { "signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription + val sub = s.expectSubscription() ref ! 1 ref ! 2 ref ! 3 @@ -96,7 +97,7 @@ class ActorRefSourceSpec extends StreamSpec { "signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription + val sub = s.expectSubscription() ref ! 1 ref ! 2 ref ! 3 @@ -106,10 +107,55 @@ class ActorRefSourceSpec extends StreamSpec { s.expectComplete() } + "signal buffered elements and complete the stream after receiving a Status.Success with CompletionStrategy.Draining" in assertAllStagesStopped { + val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() + + for (n <- 1 to 20) ref ! n + ref ! Status.Success(CompletionStrategy.Draining) + + s.request(20) + for (n <- 1 to 20) s.expectNext(n) + s.expectComplete() + } + + "not signal buffered elements but complete immediately the stream after receiving a Status.Success with CompletionStrategy.Immediately" in assertAllStagesStopped { + val (ref, s) = Source + .actorRef(100, OverflowStrategy.fail) + .toMat(TestSink.probe[Int].addAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both) + .run() + + for (n <- 1 to 20) ref ! n + ref ! Status.Success(CompletionStrategy.Immediately) + + s.request(20) + var e: Either[OnComplete.type, Int] = null + do { + e = s.expectNextOrComplete() + if (e.right.exists(_ > 10)) fail("Must not drain all remaining elements: " + e) + } while (e.isRight) + } + + "not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped { + val (ref, s) = Source + .actorRef(100, OverflowStrategy.fail) + .toMat(TestSink.probe[Int].addAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both) + .run() + + for (n <- 1 to 20) ref ! n + ref ! PoisonPill + + s.request(20) + var e: Either[OnComplete.type, Int] = null + do { + e = s.expectNextOrComplete() + if (e.right.exists(_ > 10)) fail("Must not drain all remaining elements: " + e) + } while (e.isRight) + } + "not buffer elements after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription + val sub = s.expectSubscription() ref ! 1 ref ! 2 ref ! 3 @@ -133,7 +179,7 @@ class ActorRefSourceSpec extends StreamSpec { "fail the stream when receiving Status.Failure" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription + s.expectSubscription() val exc = TE("testfailure") ref ! Status.Failure(exc) s.expectError(exc) diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala index cafc12886f..1e4a571460 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -5,7 +5,7 @@ package akka.stream.typed.scaladsl import akka.actor.typed._ -import akka.stream.OverflowStrategy +import akka.stream.{ CompletionStrategy, OverflowStrategy } import akka.stream.scaladsl._ /** @@ -54,7 +54,7 @@ object ActorSource { overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = Source .actorRef[T]( - completionMatcher.asInstanceOf[PartialFunction[Any, Unit]], + completionMatcher.asInstanceOf[PartialFunction[Any, Unit]].andThen(_ => CompletionStrategy.Draining), failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]], bufferSize, overflowStrategy) diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes index 53d7014390..1a923951d2 100644 --- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -31,3 +31,15 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWi ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.statefulMapConcat") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.statefulMapConcat") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.statefulMapConcat") + +# GrapheStage implementation for actorRef source #25324 +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.ActorRefSource") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.ActorRefSource.withAttributes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.newInstance") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.attributes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.label") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.create") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.ActorRefSource.this") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSourceActor$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSourceActor") diff --git a/akka-stream/src/main/scala/akka/stream/CompletionStrategy.scala b/akka-stream/src/main/scala/akka/stream/CompletionStrategy.scala new file mode 100644 index 0000000000..ec78bf0bfb --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/CompletionStrategy.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream + +import akka.annotation.{ DoNotInherit, InternalApi } + +@DoNotInherit +sealed trait CompletionStrategy + +case object CompletionStrategy { + + /** + * INTERNAL API + */ + @InternalApi + private[akka] case object Immediately extends CompletionStrategy + + /** + * INTERNAL API + */ + @InternalApi + private[akka] case object Draining extends CompletionStrategy + + /** + * The completion will be signaled immediately even if elements are still buffered. + */ + def immediately: CompletionStrategy = Immediately + + /** + * Already buffered elements will be signaled before siganling completion. + */ + def draining: CompletionStrategy = Draining +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala new file mode 100644 index 0000000000..6a8e611c47 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.{ ActorRef, PoisonPill } +import akka.annotation.InternalApi +import akka.stream.OverflowStrategies._ +import akka.stream._ +import akka.stream.stage._ +import akka.util.OptionVal + +import scala.annotation.tailrec + +private object ActorRefSource { + private sealed trait ActorRefStage { def ref: ActorRef } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class ActorRefSource[T]( + maxBuffer: Int, + overflowStrategy: OverflowStrategy, + completionMatcher: PartialFunction[Any, CompletionStrategy], + failureMatcher: PartialFunction[Any, Throwable]) + extends GraphStageWithMaterializedValue[SourceShape[T], ActorRef] { + import ActorRefSource._ + + val out: Outlet[T] = Outlet[T]("actorRefSource.out") + + override val shape: SourceShape[T] = SourceShape.of(out) + + def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) = + throw new IllegalStateException("Not supported") + + private[akka] override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes, + eagerMaterializer: Materializer): (GraphStageLogic, ActorRef) = { + val stage: GraphStageLogic with StageLogging with ActorRefStage = new GraphStageLogic(shape) with StageLogging + with ActorRefStage { + override protected def logSource: Class[_] = classOf[ActorRefSource[_]] + + private val buffer: OptionVal[Buffer[T]] = + if (maxBuffer != 0) + OptionVal(Buffer(maxBuffer, eagerMaterializer)) + else { + OptionVal.None // for backwards compatibility with old actor publisher based implementation + } + private var isCompleting: Boolean = false + + override protected def stageActorName: String = + inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) + + val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { + case (_, PoisonPill) ⇒ + log.warning("for backwards compatibility: PoisonPill will note be supported in the future") + completeStage() + case (_, m) if failureMatcher.isDefinedAt(m) ⇒ + failStage(failureMatcher(m)) + case (_, m) if completionMatcher.isDefinedAt(m) ⇒ + completionMatcher(m) match { + case CompletionStrategy.Draining => + isCompleting = true + tryPush() + case CompletionStrategy.Immediately => + completeStage() + } + case (_, m: T @unchecked) ⇒ + buffer match { + case OptionVal.None => + if (isCompleting) { + log.warning("Dropping element because Status.Success received already: [{}]", m) + } else if (isAvailable(out)) { + push(out, m) + } else { + log.debug("Dropping element because there is no downstream demand and no buffer: [{}]", m) + } + + case OptionVal.Some(buf) => + if (isCompleting) { + log.warning( + "Dropping element because Status.Success received already, only draining already buffered elements: [{}] (pending: [{}])", + m, + buf.used) + } else if (!buf.isFull) { + buf.enqueue(m) + tryPush() + } else + overflowStrategy match { + case s: DropHead ⇒ + log.log( + s.logLevel, + "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") + buf.dropHead() + buf.enqueue(m) + tryPush() + case s: DropTail ⇒ + log.log( + s.logLevel, + "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") + buf.dropTail() + buf.enqueue(m) + tryPush() + case s: DropBuffer ⇒ + log.log( + s.logLevel, + "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") + buf.clear() + buf.enqueue(m) + tryPush() + case s: DropNew ⇒ + log.log( + s.logLevel, + "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") + case s: Fail ⇒ + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + val bufferOverflowException = + BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") + failStage(bufferOverflowException) + case _: Backpressure ⇒ + // there is a precondition check in Source.actorRefSource factory method to not allow backpressure as strategy + failStage(new IllegalStateException("Backpressure is not supported")) + } + } + }.ref + + private def tryPush(): Unit = { + if (isAvailable(out) && buffer.isDefined && buffer.get.nonEmpty) { + val msg = buffer.get.dequeue() + push(out, msg) + } + + if (isCompleting && (buffer.isEmpty || buffer.get.isEmpty)) { + completeStage() + } + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + tryPush() + } + }) + } + + (stage, stage.ref) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala deleted file mode 100644 index 1b9afc9def..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package akka.stream.impl - -import akka.actor.ActorLogging -import akka.actor.Props -import akka.actor.Status -import akka.annotation.InternalApi -import akka.stream.OverflowStrategies._ -import akka.stream.{ BufferOverflowException, OverflowStrategies, OverflowStrategy } -import akka.stream.ActorMaterializerSettings - -/** - * INTERNAL API - */ -@InternalApi private[akka] object ActorRefSourceActor { - def props( - completionMatcher: PartialFunction[Any, Unit], - failureMatcher: PartialFunction[Any, Throwable], - bufferSize: Int, - overflowStrategy: OverflowStrategy, - settings: ActorMaterializerSettings) = { - require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") - val maxFixedBufferSize = settings.maxFixedBufferSize - Props(new ActorRefSourceActor(completionMatcher, failureMatcher, bufferSize, overflowStrategy, maxFixedBufferSize)) - } -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] class ActorRefSourceActor( - completionMatcher: PartialFunction[Any, Unit], - failureMatcher: PartialFunction[Any, Throwable], - bufferSize: Int, - overflowStrategy: OverflowStrategy, - maxFixedBufferSize: Int) - extends akka.stream.actor.ActorPublisher[Any] - with ActorLogging { - import akka.stream.actor.ActorPublisherMessage._ - - // when bufferSize is 0 there the buffer is not used - protected val buffer = if (bufferSize == 0) null else Buffer[Any](bufferSize, maxFixedBufferSize) - - def receive = - ({ - case Cancel => - context.stop(self) - }: Receive).orElse(requestElem).orElse(receiveFailure).orElse(receiveComplete).orElse(receiveElem) - - def receiveComplete: Receive = completionMatcher.andThen { _ => - if (bufferSize == 0 || buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully - else context.become(drainBufferThenComplete) - } - - def receiveFailure: Receive = failureMatcher.andThen { cause => - if (isActive) - onErrorThenStop(cause) - } - - def requestElem: Receive = { - case _: Request => - // totalDemand is tracked by super - if (bufferSize != 0) - while (totalDemand > 0L && !buffer.isEmpty) onNext(buffer.dequeue()) - } - - def receiveElem: Receive = { - case elem if isActive => - if (totalDemand > 0L) - onNext(elem) - else if (bufferSize == 0) - log.debug("Dropping element because there is no downstream demand: [{}]", elem) - else if (!buffer.isFull) - buffer.enqueue(elem) - else - overflowStrategy match { - case s: DropHead => - log.log(s.logLevel, "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") - buffer.dropHead() - buffer.enqueue(elem) - case s: DropTail => - log.log(s.logLevel, "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") - buffer.dropTail() - buffer.enqueue(elem) - case s: DropBuffer => - log.log( - s.logLevel, - "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") - buffer.clear() - buffer.enqueue(elem) - case s: DropNew => - // do not enqueue new element if the buffer is full - log.log(s.logLevel, "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") - case s: Fail => - log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") - onErrorThenStop(BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) - case s: Backpressure => - // there is a precondition check in Source.actorRefSource factory method - log.log(s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") - } - } - - def drainBufferThenComplete: Receive = { - case Cancel => - context.stop(self) - - case Status.Failure(cause) if isActive => - // errors must be signaled as soon as possible, - // even if previously valid completion was requested via Status.Success - onErrorThenStop(cause) - - case _: Request => - // totalDemand is tracked by super - while (totalDemand > 0L && !buffer.isEmpty) onNext(buffer.dequeue()) - - if (buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully - - case elem if isActive => - log.debug( - "Dropping element because Status.Success received already, " + - "only draining already buffered elements: [{}] (pending: [{}])", - elem, - buffer.used) - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index b9fa762b15..1e77477050 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -108,31 +108,3 @@ import akka.event.Logging override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] = new ActorPublisherSource(props, attr, amendShape(attr)) } - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class ActorRefSource[Out]( - completionMatcher: PartialFunction[Any, Unit], - failureMatcher: PartialFunction[Any, Throwable], - bufferSize: Int, - overflowStrategy: OverflowStrategy, - val attributes: Attributes, - shape: SourceShape[Out]) - extends SourceModule[Out, ActorRef](shape) { - - override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)" - - override def create(context: MaterializationContext) = { - val mat = ActorMaterializerHelper.downcast(context.materializer) - val ref = mat.actorOf( - context, - ActorRefSourceActor.props(completionMatcher, failureMatcher, bufferSize, overflowStrategy, mat.settings)) - (akka.stream.actor.ActorPublisher[Out](ref), ref) - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = - new ActorRefSource[Out](completionMatcher, failureMatcher, bufferSize, overflowStrategy, attributes, shape) - override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] = - new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, attr, amendShape(attr)) -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 2862684467..49f7e69665 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -671,7 +671,7 @@ private final case class SavedIslandData( // TODO: bail on unknown types val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]] val stage = stageModule.stage - val matAndLogic = stage.createLogicAndMaterializedValue(attributes) + val matAndLogic = stage.createLogicAndMaterializedValue(attributes, materializer) val logic = matAndLogic._1 logic.originalStage = OptionVal.Some(stage) logic.attributes = attributes diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index df8e6a0553..6c36e10aa3 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -317,6 +317,12 @@ object Source { * (whose content will be ignored) in which case already buffered elements will be signaled before signaling * completion. * + * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]. + * If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immidiately, + * otherwise if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else) + * already buffered elements will be signaled before siganling completion. + * Sending [[akka.actor.PoisonPill]] will signal completion immediately but this behavior is deprecated and scheduled to be removed. + * * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the * actor reference. In case the Actor is still draining its internal buffer (after having received * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index d460a8a567..a438fb3da4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -508,20 +508,15 @@ object Source { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ @InternalApi private[akka] def actorRef[T]( - completionMatcher: PartialFunction[Any, Unit], + completionMatcher: PartialFunction[Any, CompletionStrategy], failureMatcher: PartialFunction[Any, Throwable], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") require(!overflowStrategy.isBackpressure, "Backpressure overflowStrategy not supported") - fromGraph( - new ActorRefSource( - completionMatcher, - failureMatcher, - bufferSize, - overflowStrategy, - DefaultAttributes.actorRefSource, - shape("ActorRefSource"))) + Source + .fromGraph(new ActorRefSource(bufferSize, overflowStrategy, completionMatcher, failureMatcher)) + .withAttributes(DefaultAttributes.actorRefSource) } /** @@ -539,9 +534,11 @@ object Source { * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after * this Source; as such, it is never safe to assume the downstream will always generate demand. * - * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]] - * (whose content will be ignored) in which case already buffered elements will be signaled before signaling - * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]. + * If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immidiately, + * otherwise if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else) + * already buffered elements will be signaled before siganling completion. + * Sending [[akka.actor.PoisonPill]] will signal completion immediately but this behavior is deprecated and scheduled to be removed. * * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the * actor reference. In case the Actor is still draining its internal buffer (after having received @@ -559,9 +556,10 @@ object Source { */ def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = actorRef({ - case akka.actor.Status.Success => - case akka.actor.Status.Success(_) => - }, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy) + case akka.actor.Status.Success(s: CompletionStrategy) => s + case akka.actor.Status.Success(_) => CompletionStrategy.Draining + case akka.actor.Status.Success => CompletionStrategy.Draining + }, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy) /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index f8007e745c..3f16ac3e2b 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -36,6 +36,16 @@ import scala.concurrent.{ Future, Promise } */ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { + /** + * Grants eager access to materializer for special purposes. + * + * INTERNAL API + */ + @InternalApi + private[akka] def createLogicAndMaterializedValue( + inheritedAttributes: Attributes, + materializer: Materializer): (GraphStageLogic, M) = createLogicAndMaterializedValue(inheritedAttributes) + @throws(classOf[Exception]) def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) @@ -184,14 +194,23 @@ object GraphStageLogic { materializer: ActorMaterializer, getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], initialReceive: StageActorRef.Receive, - name: String) { + name: String, + poisonPillFallback: Boolean) { // internal fallback to support deprecated SourceActorRef implementation replacement + + def this( + materializer: akka.stream.ActorMaterializer, + getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], + initialReceive: StageActorRef.Receive, + name: String) { + this(materializer, getAsyncCallback, initialReceive, name, false) + } // not really needed, but let's keep MiMa happy def this( materializer: akka.stream.ActorMaterializer, getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], initialReceive: StageActorRef.Receive) { - this(materializer, getAsyncCallback, initialReceive, "") + this(materializer, getAsyncCallback, initialReceive, "", false) } private val callback = getAsyncCallback(internalReceive) @@ -205,6 +224,8 @@ object GraphStageLogic { private val functionRef: FunctionRef = cell.addFunctionRef( { + case (r, PoisonPill) if poisonPillFallback ⇒ + callback.invoke((r, PoisonPill)) case (_, m @ (PoisonPill | Kill)) => materializer.logger.warning( "{} message sent to StageActor({}) will be ignored, since it is not a real Actor." + @@ -1195,13 +1216,23 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * @param receive callback that will be called upon receiving of a message by this special Actor * @return minimal actor with watch method */ - // FIXME: I don't like the Pair allocation :( @ApiMayChange final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor = + getEagerStageActor(interpreter.materializer, poisonPillCompatibility = false)(receive) + + /** + * INTERNAL API + */ + @InternalApi + protected[akka] def getEagerStageActor( + eagerMaterializer: Materializer, + poisonPillCompatibility: Boolean)( // fallback required for source actor backwards compatibility + receive: ((ActorRef, Any)) ⇒ Unit): StageActor = _stageActor match { - case null => - val actorMaterializer = ActorMaterializerHelper.downcast(interpreter.materializer) - _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive, stageActorName) + case null ⇒ + val actorMaterializer = ActorMaterializerHelper.downcast(eagerMaterializer) + _stageActor = + new StageActor(actorMaterializer, getAsyncCallback, receive, stageActorName, poisonPillCompatibility) _stageActor case existing => existing.become(receive)