From 94af88c5dec42eabd2da45184ec1f2afa12cf350 Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Fri, 5 Feb 2021 11:46:34 +0100 Subject: [PATCH] Remove support for PoisonPill in ActorRefSource #26622 --- .../stream/scaladsl/ActorRefSourceSpec.scala | 34 +++---------------- .../29586-sink-internal-api.excludes | 0 .../26622-stagelogic-internal-api.excludes | 2 ++ .../impl/ActorRefBackpressureSource.scala | 2 +- .../akka/stream/impl/ActorRefSource.scala | 8 ++--- .../stream/impl/streamref/SinkRefImpl.scala | 2 +- .../stream/impl/streamref/SourceRefImpl.scala | 2 +- .../scala/akka/stream/scaladsl/Source.scala | 3 +- .../scala/akka/stream/stage/GraphStage.scala | 12 ++----- 9 files changed, 16 insertions(+), 49 deletions(-) rename akka-stream/src/main/mima-filters/{2.6.x.backwards.excludes => 2.6.10.backwards.excludes}/29586-sink-internal-api.excludes (100%) create mode 100644 akka-stream/src/main/mima-filters/2.6.12.backwards.excludes/26622-stagelogic-internal-api.excludes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index e949b08ab3..266104243c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration._ import org.reactivestreams.Publisher import akka.Done -import akka.actor.{ ActorRef, PoisonPill, Status } +import akka.actor.{ ActorRef, Status } import akka.stream.{ OverflowStrategy, _ } import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -169,30 +169,6 @@ class ActorRefSourceSpec extends StreamSpec { verifyNext(1) } - "not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped { - val (ref, s) = Source - .actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.fail) - .toMat(TestSink.probe[Int])(Keep.both) - .run() - - for (n <- 1 to 20) ref ! n - ref ! PoisonPill - - s.request(10) - - def verifyNext(n: Int): Unit = { - if (n > 10) - s.expectComplete() - else - s.expectNextOrComplete() match { - case Right(`n`) => verifyNext(n + 1) - case Right(x) => fail(s"expected $n, got $x") - case Left(_) => // ok, completed - } - } - verifyNext(1) - } - "not buffer elements after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source @@ -212,7 +188,7 @@ class ActorRefSourceSpec extends StreamSpec { s.expectComplete() } - "complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped { + "complete and materialize the stream after receiving completion message" in assertAllStagesStopped { val (ref, done) = { Source .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer) @@ -223,7 +199,7 @@ class ActorRefSourceSpec extends StreamSpec { done.futureValue should be(Done) } - "fail the stream when receiving Status.Failure" in assertAllStagesStopped { + "fail the stream when receiving failure message" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source .actorRef(PartialFunction.empty, { case Status.Failure(exc) => exc }, 10, OverflowStrategy.fail) @@ -239,12 +215,12 @@ class ActorRefSourceSpec extends StreamSpec { val s = TestSubscriber.manualProbe[Int]() val name = "SomeCustomName" val ref = Source - .actorRef(PartialFunction.empty, PartialFunction.empty, 10, OverflowStrategy.fail) + .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 10, OverflowStrategy.fail) .withAttributes(Attributes.name(name)) .to(Sink.fromSubscriber(s)) .run() ref.path.name.contains(name) should ===(true) - ref ! PoisonPill + ref ! "ok" } "be possible to run immediately, reproducer of #26714" in { diff --git a/akka-stream/src/main/mima-filters/2.6.x.backwards.excludes/29586-sink-internal-api.excludes b/akka-stream/src/main/mima-filters/2.6.10.backwards.excludes/29586-sink-internal-api.excludes similarity index 100% rename from akka-stream/src/main/mima-filters/2.6.x.backwards.excludes/29586-sink-internal-api.excludes rename to akka-stream/src/main/mima-filters/2.6.10.backwards.excludes/29586-sink-internal-api.excludes diff --git a/akka-stream/src/main/mima-filters/2.6.12.backwards.excludes/26622-stagelogic-internal-api.excludes b/akka-stream/src/main/mima-filters/2.6.12.backwards.excludes/26622-stagelogic-internal-api.excludes new file mode 100644 index 0000000000..3ed2108594 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.12.backwards.excludes/26622-stagelogic-internal-api.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic.getEagerStageActor") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic#StageActor.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala index 9f08cd6b76..21cf831477 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala @@ -48,7 +48,7 @@ private object ActorRefBackpressureSource { override protected def stageActorName: String = inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) - val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false) { + val ref: ActorRef = getEagerStageActor(eagerMaterializer) { case (_, m) if failureMatcher.isDefinedAt(m) => failStage(failureMatcher(m)) case (_, m) if completionMatcher.isDefinedAt(m) => diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala index 5377145d19..84ec89e0f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -4,7 +4,7 @@ package akka.stream.impl -import akka.actor.{ ActorRef, PoisonPill } +import akka.actor.ActorRef import akka.annotation.InternalApi import akka.stream._ import akka.stream.OverflowStrategies._ @@ -52,11 +52,7 @@ private object ActorRefSource { override protected def stageActorName: String = inheritedAttributes.nameForActorRef(super.stageActorName) private val name = inheritedAttributes.nameOrDefault(getClass.toString) - override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { - case (_, PoisonPill) => - log.warning( - "PoisonPill only completes ActorRefSource for backwards compatibility and not be supported in the future. Send Status.Success(CompletionStrategy) instead") - completeStage() + override val ref: ActorRef = getEagerStageActor(eagerMaterializer) { case (_, m) if failureMatcher.isDefinedAt(m) => failStage(failureMatcher(m)) case (_, m) if completionMatcher.isDefinedAt(m) => diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 97aa522aae..79c043d733 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -76,7 +76,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn override protected val stageActorName: String = streamRefsMaster.nextSinkRefStageName() private[this] val self: GraphStageLogic.StageActor = - getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive) + getEagerStageActor(eagerMaterializer)(initialReceive) override val ref: ActorRef = self.ref implicit def selfSender: ActorRef = ref diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 55ea31c1ec..939c4a670c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -151,7 +151,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName() private[this] val self: GraphStageLogic.StageActor = - getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(receiveRemoteMessage) + getEagerStageActor(eagerMaterializer)(receiveRemoteMessage) override val ref: ActorRef = self.ref private[this] implicit def selfSender: ActorRef = ref diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index e2c45ef8ba..86b78ec478 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -677,8 +677,7 @@ object Source { * If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immediately. * Otherwise, if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else) * already buffered elements will be sent out before signaling completion. - * Sending [[akka.actor.PoisonPill]] will signal completion immediately but this behavior is deprecated and scheduled to be removed. - * Using [[akka.actor.ActorSystem.stop]] to stop the actor and complete the stream is *not supported*. + * Using [[akka.actor.PoisonPill]] or [[akka.actor.ActorSystem.stop]] to stop the actor and complete the stream is *not supported*. * * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the * actor reference. In case the Actor is still draining its internal buffer (after having received diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index bb26ad8f13..0817a2c2c7 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -197,7 +197,6 @@ object GraphStageLogic { materializer: Materializer, getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], initialReceive: StageActorRef.Receive, - poisonPillFallback: Boolean, // internal fallback to support deprecated SourceActorRef implementation replacement name: String) { private val callback = getAsyncCallback(internalReceive) @@ -209,8 +208,6 @@ object GraphStageLogic { } private val functionRef: FunctionRef = { val f: (ActorRef, Any) => Unit = { - case (r, PoisonPill) if poisonPillFallback => - callback.invoke((r, PoisonPill)) case (_, m @ (PoisonPill | Kill)) => materializer.logger.warning( "{} message sent to StageActor({}) will be ignored, since it is not a real Actor." + @@ -1318,7 +1315,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * @return minimal actor with watch method */ final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor = - getEagerStageActor(interpreter.materializer, poisonPillCompatibility = false)(receive) + getEagerStageActor(interpreter.materializer)(receive) /** * INTERNAL API @@ -1327,14 +1324,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * materialization or one of the methods invoked by the graph operator machinery, such as `onPush` and `onPull`. */ @InternalApi - protected[akka] def getEagerStageActor( - eagerMaterializer: Materializer, - poisonPillCompatibility: Boolean)( // fallback required for source actor backwards compatibility + protected[akka] def getEagerStageActor(eagerMaterializer: Materializer)( receive: ((ActorRef, Any)) => Unit): StageActor = _stageActor match { case null => - _stageActor = - new StageActor(eagerMaterializer, getAsyncCallback _, receive, poisonPillCompatibility, stageActorName) + _stageActor = new StageActor(eagerMaterializer, getAsyncCallback _, receive, stageActorName) _stageActor case existing => existing.become(receive)