Remove support for PoisonPill in ActorRefSource #26622
This commit is contained in:
parent
41b8079b6f
commit
94af88c5de
9 changed files with 16 additions and 49 deletions
|
|
@ -9,7 +9,7 @@ import scala.concurrent.duration._
|
||||||
import org.reactivestreams.Publisher
|
import org.reactivestreams.Publisher
|
||||||
|
|
||||||
import akka.Done
|
import akka.Done
|
||||||
import akka.actor.{ ActorRef, PoisonPill, Status }
|
import akka.actor.{ ActorRef, Status }
|
||||||
import akka.stream.{ OverflowStrategy, _ }
|
import akka.stream.{ OverflowStrategy, _ }
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
import akka.stream.testkit.Utils._
|
import akka.stream.testkit.Utils._
|
||||||
|
|
@ -169,30 +169,6 @@ class ActorRefSourceSpec extends StreamSpec {
|
||||||
verifyNext(1)
|
verifyNext(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
"not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped {
|
|
||||||
val (ref, s) = Source
|
|
||||||
.actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.fail)
|
|
||||||
.toMat(TestSink.probe[Int])(Keep.both)
|
|
||||||
.run()
|
|
||||||
|
|
||||||
for (n <- 1 to 20) ref ! n
|
|
||||||
ref ! PoisonPill
|
|
||||||
|
|
||||||
s.request(10)
|
|
||||||
|
|
||||||
def verifyNext(n: Int): Unit = {
|
|
||||||
if (n > 10)
|
|
||||||
s.expectComplete()
|
|
||||||
else
|
|
||||||
s.expectNextOrComplete() match {
|
|
||||||
case Right(`n`) => verifyNext(n + 1)
|
|
||||||
case Right(x) => fail(s"expected $n, got $x")
|
|
||||||
case Left(_) => // ok, completed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
verifyNext(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
"not buffer elements after receiving Status.Success" in assertAllStagesStopped {
|
"not buffer elements after receiving Status.Success" in assertAllStagesStopped {
|
||||||
val s = TestSubscriber.manualProbe[Int]()
|
val s = TestSubscriber.manualProbe[Int]()
|
||||||
val ref = Source
|
val ref = Source
|
||||||
|
|
@ -212,7 +188,7 @@ class ActorRefSourceSpec extends StreamSpec {
|
||||||
s.expectComplete()
|
s.expectComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
"complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped {
|
"complete and materialize the stream after receiving completion message" in assertAllStagesStopped {
|
||||||
val (ref, done) = {
|
val (ref, done) = {
|
||||||
Source
|
Source
|
||||||
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer)
|
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer)
|
||||||
|
|
@ -223,7 +199,7 @@ class ActorRefSourceSpec extends StreamSpec {
|
||||||
done.futureValue should be(Done)
|
done.futureValue should be(Done)
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail the stream when receiving Status.Failure" in assertAllStagesStopped {
|
"fail the stream when receiving failure message" in assertAllStagesStopped {
|
||||||
val s = TestSubscriber.manualProbe[Int]()
|
val s = TestSubscriber.manualProbe[Int]()
|
||||||
val ref = Source
|
val ref = Source
|
||||||
.actorRef(PartialFunction.empty, { case Status.Failure(exc) => exc }, 10, OverflowStrategy.fail)
|
.actorRef(PartialFunction.empty, { case Status.Failure(exc) => exc }, 10, OverflowStrategy.fail)
|
||||||
|
|
@ -239,12 +215,12 @@ class ActorRefSourceSpec extends StreamSpec {
|
||||||
val s = TestSubscriber.manualProbe[Int]()
|
val s = TestSubscriber.manualProbe[Int]()
|
||||||
val name = "SomeCustomName"
|
val name = "SomeCustomName"
|
||||||
val ref = Source
|
val ref = Source
|
||||||
.actorRef(PartialFunction.empty, PartialFunction.empty, 10, OverflowStrategy.fail)
|
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 10, OverflowStrategy.fail)
|
||||||
.withAttributes(Attributes.name(name))
|
.withAttributes(Attributes.name(name))
|
||||||
.to(Sink.fromSubscriber(s))
|
.to(Sink.fromSubscriber(s))
|
||||||
.run()
|
.run()
|
||||||
ref.path.name.contains(name) should ===(true)
|
ref.path.name.contains(name) should ===(true)
|
||||||
ref ! PoisonPill
|
ref ! "ok"
|
||||||
}
|
}
|
||||||
|
|
||||||
"be possible to run immediately, reproducer of #26714" in {
|
"be possible to run immediately, reproducer of #26714" in {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic.getEagerStageActor")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic#StageActor.this")
|
||||||
|
|
@ -48,7 +48,7 @@ private object ActorRefBackpressureSource {
|
||||||
override protected def stageActorName: String =
|
override protected def stageActorName: String =
|
||||||
inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName)
|
inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName)
|
||||||
|
|
||||||
val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false) {
|
val ref: ActorRef = getEagerStageActor(eagerMaterializer) {
|
||||||
case (_, m) if failureMatcher.isDefinedAt(m) =>
|
case (_, m) if failureMatcher.isDefinedAt(m) =>
|
||||||
failStage(failureMatcher(m))
|
failStage(failureMatcher(m))
|
||||||
case (_, m) if completionMatcher.isDefinedAt(m) =>
|
case (_, m) if completionMatcher.isDefinedAt(m) =>
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
package akka.stream.impl
|
package akka.stream.impl
|
||||||
|
|
||||||
import akka.actor.{ ActorRef, PoisonPill }
|
import akka.actor.ActorRef
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.stream.OverflowStrategies._
|
import akka.stream.OverflowStrategies._
|
||||||
|
|
@ -52,11 +52,7 @@ private object ActorRefSource {
|
||||||
override protected def stageActorName: String = inheritedAttributes.nameForActorRef(super.stageActorName)
|
override protected def stageActorName: String = inheritedAttributes.nameForActorRef(super.stageActorName)
|
||||||
|
|
||||||
private val name = inheritedAttributes.nameOrDefault(getClass.toString)
|
private val name = inheritedAttributes.nameOrDefault(getClass.toString)
|
||||||
override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) {
|
override val ref: ActorRef = getEagerStageActor(eagerMaterializer) {
|
||||||
case (_, PoisonPill) =>
|
|
||||||
log.warning(
|
|
||||||
"PoisonPill only completes ActorRefSource for backwards compatibility and not be supported in the future. Send Status.Success(CompletionStrategy) instead")
|
|
||||||
completeStage()
|
|
||||||
case (_, m) if failureMatcher.isDefinedAt(m) =>
|
case (_, m) if failureMatcher.isDefinedAt(m) =>
|
||||||
failStage(failureMatcher(m))
|
failStage(failureMatcher(m))
|
||||||
case (_, m) if completionMatcher.isDefinedAt(m) =>
|
case (_, m) if completionMatcher.isDefinedAt(m) =>
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
|
||||||
|
|
||||||
override protected val stageActorName: String = streamRefsMaster.nextSinkRefStageName()
|
override protected val stageActorName: String = streamRefsMaster.nextSinkRefStageName()
|
||||||
private[this] val self: GraphStageLogic.StageActor =
|
private[this] val self: GraphStageLogic.StageActor =
|
||||||
getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive)
|
getEagerStageActor(eagerMaterializer)(initialReceive)
|
||||||
override val ref: ActorRef = self.ref
|
override val ref: ActorRef = self.ref
|
||||||
implicit def selfSender: ActorRef = ref
|
implicit def selfSender: ActorRef = ref
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
|
||||||
|
|
||||||
override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName()
|
override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName()
|
||||||
private[this] val self: GraphStageLogic.StageActor =
|
private[this] val self: GraphStageLogic.StageActor =
|
||||||
getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(receiveRemoteMessage)
|
getEagerStageActor(eagerMaterializer)(receiveRemoteMessage)
|
||||||
override val ref: ActorRef = self.ref
|
override val ref: ActorRef = self.ref
|
||||||
private[this] implicit def selfSender: ActorRef = ref
|
private[this] implicit def selfSender: ActorRef = ref
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -677,8 +677,7 @@ object Source {
|
||||||
* If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immediately.
|
* If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immediately.
|
||||||
* Otherwise, if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else)
|
* Otherwise, if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else)
|
||||||
* already buffered elements will be sent out before signaling completion.
|
* already buffered elements will be sent out before signaling completion.
|
||||||
* Sending [[akka.actor.PoisonPill]] will signal completion immediately but this behavior is deprecated and scheduled to be removed.
|
* Using [[akka.actor.PoisonPill]] or [[akka.actor.ActorSystem.stop]] to stop the actor and complete the stream is *not supported*.
|
||||||
* Using [[akka.actor.ActorSystem.stop]] to stop the actor and complete the stream is *not supported*.
|
|
||||||
*
|
*
|
||||||
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
||||||
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
||||||
|
|
|
||||||
|
|
@ -197,7 +197,6 @@ object GraphStageLogic {
|
||||||
materializer: Materializer,
|
materializer: Materializer,
|
||||||
getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)],
|
getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)],
|
||||||
initialReceive: StageActorRef.Receive,
|
initialReceive: StageActorRef.Receive,
|
||||||
poisonPillFallback: Boolean, // internal fallback to support deprecated SourceActorRef implementation replacement
|
|
||||||
name: String) {
|
name: String) {
|
||||||
|
|
||||||
private val callback = getAsyncCallback(internalReceive)
|
private val callback = getAsyncCallback(internalReceive)
|
||||||
|
|
@ -209,8 +208,6 @@ object GraphStageLogic {
|
||||||
}
|
}
|
||||||
private val functionRef: FunctionRef = {
|
private val functionRef: FunctionRef = {
|
||||||
val f: (ActorRef, Any) => Unit = {
|
val f: (ActorRef, Any) => Unit = {
|
||||||
case (r, PoisonPill) if poisonPillFallback =>
|
|
||||||
callback.invoke((r, PoisonPill))
|
|
||||||
case (_, m @ (PoisonPill | Kill)) =>
|
case (_, m @ (PoisonPill | Kill)) =>
|
||||||
materializer.logger.warning(
|
materializer.logger.warning(
|
||||||
"{} message sent to StageActor({}) will be ignored, since it is not a real Actor." +
|
"{} message sent to StageActor({}) will be ignored, since it is not a real Actor." +
|
||||||
|
|
@ -1318,7 +1315,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
* @return minimal actor with watch method
|
* @return minimal actor with watch method
|
||||||
*/
|
*/
|
||||||
final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor =
|
final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor =
|
||||||
getEagerStageActor(interpreter.materializer, poisonPillCompatibility = false)(receive)
|
getEagerStageActor(interpreter.materializer)(receive)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -1327,14 +1324,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
* materialization or one of the methods invoked by the graph operator machinery, such as `onPush` and `onPull`.
|
* materialization or one of the methods invoked by the graph operator machinery, such as `onPush` and `onPull`.
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
protected[akka] def getEagerStageActor(
|
protected[akka] def getEagerStageActor(eagerMaterializer: Materializer)(
|
||||||
eagerMaterializer: Materializer,
|
|
||||||
poisonPillCompatibility: Boolean)( // fallback required for source actor backwards compatibility
|
|
||||||
receive: ((ActorRef, Any)) => Unit): StageActor =
|
receive: ((ActorRef, Any)) => Unit): StageActor =
|
||||||
_stageActor match {
|
_stageActor match {
|
||||||
case null =>
|
case null =>
|
||||||
_stageActor =
|
_stageActor = new StageActor(eagerMaterializer, getAsyncCallback _, receive, stageActorName)
|
||||||
new StageActor(eagerMaterializer, getAsyncCallback _, receive, poisonPillCompatibility, stageActorName)
|
|
||||||
_stageActor
|
_stageActor
|
||||||
case existing =>
|
case existing =>
|
||||||
existing.become(receive)
|
existing.become(receive)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue