diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index c4406613a7..59a9ddba37 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -4,13 +4,14 @@ package akka.stream.scaladsl +import akka.actor.{ Actor, ActorRef, Props } import akka.stream.ActorMaterializer import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl._ -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props +import akka.testkit.TestProbe + +import scala.util.control.NoStackTrace object ActorRefSinkSpec { case class Fw(ref: ActorRef) extends Actor { @@ -18,6 +19,8 @@ object ActorRefSinkSpec { case msg => ref.forward(msg) } } + + val te = new RuntimeException("oh dear") with NoStackTrace } class ActorRefSinkSpec extends StreamSpec { @@ -44,6 +47,12 @@ class ActorRefSinkSpec extends StreamSpec { publisher.expectCancellation() } + "sends error message if upstream fails" in assertAllStagesStopped { + val actorProbe = TestProbe() + val probe = TestSource.probe[String].to(Sink.actorRef(actorProbe.ref, "complete", _ => "failure")).run() + probe.sendError(te) + actorProbe.expectMsg("failure") + } } } diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 786ca79836..3b83b4a85e 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -105,6 +105,10 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamSour ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$Continue$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSinkActor$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSink") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSinkActor") + # #25045 adding Java/Scala interop to SourceQueue and SinkQueue ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala deleted file mode 100644 index c6fdd6989e..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package akka.stream.impl - -import akka.stream.actor.ActorSubscriber -import akka.actor.ActorRef -import akka.stream.actor.ActorSubscriberMessage -import akka.stream.actor.WatermarkRequestStrategy -import akka.actor.Props -import akka.actor.Terminated -import akka.annotation.InternalApi -import com.github.ghik.silencer.silent - -/** - * INTERNAL API - */ -@InternalApi private[akka] object ActorRefSinkActor { - def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable => Any): Props = - Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage, onFailureMessage)) -} - -/** - * INTERNAL API - */ -@silent -@InternalApi private[akka] class ActorRefSinkActor( - ref: ActorRef, - highWatermark: Int, - onCompleteMessage: Any, - onFailureMessage: Throwable => Any) - extends ActorSubscriber { - import ActorSubscriberMessage._ - - override val requestStrategy = WatermarkRequestStrategy(highWatermark) - - context.watch(ref) - - def receive = { - case OnNext(elem) => - ref.tell(elem, ActorRef.noSender) - case OnError(cause) => - ref.tell(onFailureMessage(cause), ActorRef.noSender) - context.stop(self) - case OnComplete => - ref.tell(onCompleteMessage, ActorRef.noSender) - context.stop(self) - case Terminated(`ref`) => - context.stop(self) // will cancel upstream - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala new file mode 100644 index 0000000000..7502f4b952 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.{ ActorRef, Terminated } +import akka.annotation.InternalApi +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.{ AbruptStageTerminationException, Attributes, Inlet, SinkShape } +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, StageLogging } + +/** + * INTERNAL API + */ +@InternalApi +final private[akka] class ActorRefSinkStage[T]( + ref: ActorRef, + onCompleteMessage: Any, + onFailureMessage: Throwable => Any) + extends GraphStage[SinkShape[T]] { + + val in: Inlet[T] = Inlet("ActorRefSink.in") + + override def shape: SinkShape[T] = SinkShape(in) + + override protected def initialAttributes: Attributes = DefaultAttributes.actorRefSink + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with StageLogging { + + var completionSignalled = false + + override def preStart(): Unit = { + getStageActor({ + case (_, Terminated(`ref`)) => + completeStage() + case msg => + log.error("Unexpected message to stage actor {}", msg.getClass) + }).watch(ref) + pull(in) + } + + override def onPush(): Unit = { + val next = grab(in) + ref.tell(next, ActorRef.noSender) + pull(in) + } + + override def onUpstreamFinish(): Unit = { + completionSignalled = true + ref.tell(onCompleteMessage, ActorRef.noSender) + completeStage() + } + + setHandler(in, this) + + override def onUpstreamFailure(ex: Throwable): Unit = { + completionSignalled = true + ref.tell(onFailureMessage(ex), ActorRef.noSender) + failStage(ex) + } + + override def postStop(): Unit = { + if (!completionSignalled) { + ref ! onFailureMessage(new AbruptStageTerminationException(this)) + } + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index d85250d1f7..fdca1434f9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -184,32 +184,6 @@ import org.reactivestreams.Subscriber new ActorSubscriberSink[In](props, attr, amendShape(attr)) } -/** - * INTERNAL API - */ -@InternalApi private[akka] final class ActorRefSink[In]( - ref: ActorRef, - onCompleteMessage: Any, - onFailureMessage: Throwable => Any, - val attributes: Attributes, - shape: SinkShape[In]) - extends SinkModule[In, NotUsed](shape) { - - override def create(context: MaterializationContext) = { - val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) - val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max - val subscriberRef = actorMaterializer.actorOf( - context, - ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage, onFailureMessage)) - (akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed) - } - - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = - new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attributes, shape) - override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] = - new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr)) -} - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index ad9cf40474..500c473e4e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -440,8 +440,7 @@ object Sink { ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable => Any): Sink[T, NotUsed] = - fromGraph( - new ActorRefSink(ref, onCompleteMessage, onFailureMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, onFailureMessage)) /** * Sends the elements of the stream to the given `ActorRef`. @@ -459,13 +458,7 @@ object Sink { * limiting operator in front of this `Sink`. */ def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = - fromGraph( - new ActorRefSink( - ref, - onCompleteMessage, - t => Status.Failure(t), - DefaultAttributes.actorRefSink, - shape("ActorRefSink"))) + fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, t => Status.Failure(t))) /** * INTERNAL API