diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 5c4efd4fb7..791f8a601d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -4,18 +4,21 @@ package akka.stream.scaladsl -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props } +import akka.actor.Status.Failure import akka.pattern._ import akka.stream._ import akka.stream.impl.streamref.{ SinkRefImpl, SourceRefImpl } import akka.stream.testkit.TestPublisher +import akka.stream.testkit.Utils.TE import akka.stream.testkit.scaladsl._ -import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit, TestProbe } +import akka.testkit.{ AkkaSpec, TestKit, TestProbe } import akka.util.ByteString import com.typesafe.config._ import scala.collection.immutable +import scala.concurrent.Promise import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -23,13 +26,15 @@ import scala.util.control.NoStackTrace object StreamRefsSpec { object DataSourceActor { - def props(probe: ActorRef): Props = - Props(new DataSourceActor(probe)).withDispatcher("akka.test.stream-dispatcher") + def props(): Props = Props(new DataSourceActor()).withDispatcher("akka.test.stream-dispatcher") } - class DataSourceActor(probe: ActorRef) extends Actor with ActorLogging { + case class Command(cmd: String, probe: ActorRef) + + class DataSourceActor() extends Actor with ActorLogging { import context.system + import context.dispatcher def receive = { case "give" => @@ -43,6 +48,26 @@ object StreamRefsSpec { sender() ! ref + case "give-nothing-watch" => + val source: Source[String, NotUsed] = Source.future(Future.never.mapTo[String]) + val (done: Future[Done], ref: SourceRef[String]) = + source.watchTermination()(Keep.right).toMat(StreamRefs.sourceRef())(Keep.both).run() + + sender() ! ref + + import context.dispatcher + done.pipeTo(sender()) + + case "give-only-one-watch" => + val source: Source[String, NotUsed] = Source.single("hello").concat(Source.future(Future.never)) + val (done: Future[Done], ref: SourceRef[String]) = + source.watchTermination()(Keep.right).toMat(StreamRefs.sourceRef())(Keep.both).run() + + sender() ! ref + + import context.dispatcher + done.pipeTo(sender()) + case "give-infinite" => val source: Source[String, NotUsed] = Source.fromIterator(() => Iterator.from(1)).map("ping-" + _) val (_: NotUsed, ref: SourceRef[String]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run() @@ -57,6 +82,12 @@ object StreamRefsSpec { val ref = Source.empty.runWith(StreamRefs.sourceRef()) sender() ! ref + case "give-maybe" => + val ((maybe, termination), sourceRef) = + Source.maybe[String].watchTermination()(Keep.both).toMat(StreamRefs.sourceRef())(Keep.both).run() + sender() ! (maybe -> sourceRef) + termination.pipeTo(sender()) + case "give-subscribe-timeout" => val ref = Source .repeat("is anyone there?") @@ -65,18 +96,7 @@ object StreamRefsSpec { .run() sender() ! ref - // case "send-bulk" => - // /* - // * Here we're able to send a source to a remote recipient - // * The source is a "bulk transfer one, in which we're ready to send a lot of data" - // * - // * For them it's a Source; for us it is a Sink we run data "into" - // */ - // val source: Source[ByteString, NotUsed] = Source.single(ByteString("huge-file-")) - // val ref: SourceRef[ByteString] = source.runWith(SourceRef.bulkTransfer()) - // sender() ! BulkSourceMsg(ref) - - case "receive" => + case Command("receive", probe) => /* * We write out code, knowing that the other side will stream the data into it. * @@ -86,12 +106,31 @@ object StreamRefsSpec { StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "", f => ": " + f.getMessage)).run() sender() ! sink + case Command("receive-one-cancel", probe) => + // will shutdown the stream after the first element using a kill switch + val (sink, done) = + StreamRefs + .sinkRef[String]() + .viaMat(KillSwitches.single)(Keep.both) + .alsoToMat(Sink.head)(Keep.both) + .mapMaterializedValue { + case ((sink, ks), firstF) => + // shutdown the stream after first element + firstF.foreach(_ => ks.shutdown())(context.dispatcher) + sink + } + .watchTermination()(Keep.both) + .to(Sink.actorRef(probe, "", f => ": " + f.getMessage)) + .run() + sender() ! sink + done.pipeTo(sender()) + case "receive-ignore" => val sink = StreamRefs.sinkRef[String]().to(Sink.ignore).run() sender() ! sink - case "receive-subscribe-timeout" => + case Command("receive-subscribe-timeout", probe) => val sink = StreamRefs .sinkRef[String]() .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) @@ -99,7 +138,7 @@ object StreamRefsSpec { .run() sender() ! sink - case "receive-32" => + case Command("receive-32", probe) => val (sink, driver) = StreamRefs.sinkRef[String]().toMat(TestSink.probe(context.system))(Keep.both).run() import context.dispatcher @@ -117,22 +156,7 @@ object StreamRefsSpec { sender() ! sink - // case "receive-bulk" => - // /* - // * We write out code, knowing that the other side will stream the data into it. - // * This will open a dedicated connection per transfer. - // * - // * For them it's a Sink; for us it's a Source. - // */ - // val sink: SinkRef[ByteString] = - // SinkRef.bulkTransferSource() - // .to(Sink.actorRef(probe, "")) - // .run() - // - // - // sender() ! BulkSinkMsg(sink) } - } // ------------------------- @@ -171,7 +195,7 @@ object StreamRefsSpec { } } -class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSender { +class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) { import StreamRefsSpec._ val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config()) @@ -179,55 +203,61 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend override protected def beforeTermination(): Unit = TestKit.shutdownActorSystem(remoteSystem) - val p = TestProbe() - // obtain the remoteActor ref via selection in order to use _real_ remoting in this test val remoteActor = { - val it = remoteSystem.actorOf(DataSourceActor.props(p.ref), "remoteActor") + val probe = TestProbe()(remoteSystem) + val it = remoteSystem.actorOf(DataSourceActor.props(), "remoteActor") val remoteAddress = remoteSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress - system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi") - expectMsgType[ActorIdentity].ref.get + system.actorSelection(it.path.toStringWithAddress(remoteAddress)).tell(Identify("hi"), probe.ref) + probe.expectMsgType[ActorIdentity].ref.get } "A SourceRef" must { "send messages via remoting" in { - remoteActor ! "give" - val sourceRef = expectMsgType[SourceRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] - sourceRef.runWith(Sink.actorRef(p.ref, "", _ => "")) + val localProbe = TestProbe() + sourceRef.runWith(Sink.actorRef(localProbe.ref, "", ex => s" ${ex.getMessage}")) - p.expectMsg("hello") - p.expectMsg("world") - p.expectMsg("") + localProbe.expectMsg(5.seconds, "hello") + localProbe.expectMsg("world") + localProbe.expectMsg("") } "fail when remote source failed" in { - remoteActor ! "give-fail" - val sourceRef = expectMsgType[SourceRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-fail", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] - sourceRef.runWith(Sink.actorRef(p.ref, "", t => ": " + t.getMessage)) + val localProbe = TestProbe() + sourceRef.runWith(Sink.actorRef(localProbe.ref, "", t => ": " + t.getMessage)) - val f = p.expectMsgType[String] + val f = localProbe.expectMsgType[String] f should include("Remote stream (") // actor name here, for easier identification f should include("failed, reason: Booooom!") } "complete properly when remote source is empty" in { + val remoteProbe = TestProbe()(remoteSystem) // this is a special case since it makes sure that the remote stage is still there when we connect to it - remoteActor ! "give-complete-asap" - val sourceRef = expectMsgType[SourceRef[String]] + remoteActor.tell("give-complete-asap", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] - sourceRef.runWith(Sink.actorRef(p.ref, "", _ => "")) + val localProbe = TestProbe() + sourceRef.runWith(Sink.actorRef(localProbe.ref, "", _ => "")) - p.expectMsg("") + localProbe.expectMsg("") } "respect back-pressure from (implied by target Sink)" in { - remoteActor ! "give-infinite" - val sourceRef = expectMsgType[SourceRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-infinite", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] val probe = sourceRef.runWith(TestSink.probe) @@ -251,10 +281,12 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend } "receive timeout if subscribing too late to the source ref" in { - remoteActor ! "give-subscribe-timeout" - val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-subscribe-timeout", remoteProbe.ref) + val remoteSource: SourceRef[String] = remoteProbe.expectMsgType[SourceRef[String]] // not materializing it, awaiting the timeout... + Thread.sleep(800) // the timeout is 500ms val probe = remoteSource.runWith(TestSink.probe[String](system)) @@ -270,8 +302,9 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend // bug #24626 "not receive subscription timeout when got subscribed" in { - remoteActor ! "give-subscribe-timeout" - val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-subscribe-timeout", remoteProbe.ref) + val remoteSource: SourceRef[String] = remoteProbe.expectMsgType[SourceRef[String]] // materialize directly and start consuming, timeout is 500ms val eventualStrings: Future[immutable.Seq[String]] = remoteSource .throttle(1, 100.millis, 1, ThrottleMode.Shaping) @@ -283,8 +316,9 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend // bug #24934 "not receive timeout while data is being sent" in { - remoteActor ! "give-infinite" - val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-infinite", remoteProbe.ref) + val remoteSource: SourceRef[String] = remoteProbe.expectMsgType[SourceRef[String]] val done = remoteSource @@ -294,58 +328,168 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend Await.result(done, 8.seconds) } + + "pass cancellation upstream across remoting after elements passed through" in { + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-only-one-watch", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] + + val localProbe = TestProbe() + val ks = + sourceRef + .viaMat(KillSwitches.single)(Keep.right) + .to(Sink.actorRef(localProbe.ref, "", ex => s" ${ex.getMessage}")) + .run() + + localProbe.expectMsg("hello") + ks.shutdown() + localProbe.expectMsg("") + remoteProbe.expectMsg(Done) + } + + "pass cancellation upstream across remoting before elements has been emitted" in { + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-nothing-watch", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] + + val localProbe = TestProbe() + val ks = + sourceRef + .viaMat(KillSwitches.single)(Keep.right) + .to(Sink.actorRef(localProbe.ref, "", ex => s" ${ex.getMessage}")) + .run() + + ks.shutdown() + localProbe.expectMsg("") + remoteProbe.expectMsg(Done) + } + + "pass failure upstream across remoting before elements has been emitted" in { + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-nothing-watch", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] + + val localProbe = TestProbe() + val ks = + sourceRef + .viaMat(KillSwitches.single)(Keep.right) + .to(Sink.actorRef(localProbe.ref, "", ex => s" ${ex.getMessage}")) + .run() + + ks.abort(TE("det gick åt skogen")) + localProbe.expectMsg(" det gick åt skogen") + remoteProbe.expectMsgType[Failure].cause shouldBe a[RemoteStreamRefActorTerminatedException] + } + + "pass failure upstream across remoting after elements passed through" in { + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-only-one-watch", remoteProbe.ref) + val sourceRef = remoteProbe.expectMsgType[SourceRef[String]] + + val localProbe = TestProbe() + val ks = + sourceRef + .viaMat(KillSwitches.single)(Keep.right) + .to(Sink.actorRef(localProbe.ref, "", ex => s" ${ex.getMessage}")) + .run() + + localProbe.expectMsg("hello") + ks.abort(TE("det gick åt pipan")) + localProbe.expectMsg(" det gick åt pipan") + remoteProbe.expectMsgType[Failure].cause shouldBe a[RemoteStreamRefActorTerminatedException] + } + + "handle concurrent cancel and failure" in { + // this is not possible to deterministically trigger but what we try to + // do is have a cancel in the SourceRef and a complete on the SinkRef side happen + // concurrently before they have managed to tell each other about it + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("give-maybe", remoteProbe.ref) + // this is somewhat weird, but we are local to the remote system with the remoteProbe so promise + // is not sent across the wire + val (remoteControl, sourceRef) = remoteProbe.expectMsgType[(Promise[Option[String]], SourceRef[String])] + + val localProbe = TestProbe() + val ks = + sourceRef + .viaMat(KillSwitches.single)(Keep.right) + .to(Sink.actorRef(localProbe.ref, "", ex => s" ${ex.getMessage}")) + .run() + + // "concurrently" + ks.shutdown() + remoteControl.success(None) + + // since it is a race we can only confirm that it either completes or fails both sides + // if it didn't work + val localComplete = localProbe.expectMsgType[String] + localComplete should startWith("").or(startWith("")) + val remoteCompleted = remoteProbe.expectMsgType[AnyRef] + remoteCompleted match { + case Done => + case Failure(_) => + case _ => fail() + } + } + } "A SinkRef" must { "receive elements via remoting" in { - - remoteActor ! "receive" - val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] Source("hello" :: "world" :: Nil).to(remoteSink).run() - p.expectMsg("hello") - p.expectMsg("world") - p.expectMsg("") + elementProbe.expectMsg("hello") + elementProbe.expectMsg("world") + elementProbe.expectMsg("") } "fail origin if remote Sink gets a failure" in { - - remoteActor ! "receive" - val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] val remoteFailureMessage = "Booom!" Source.failed(new Exception(remoteFailureMessage)).to(remoteSink).run() - val f = p.expectMsgType[String] + val f = elementProbe.expectMsgType[String] f should include(s"Remote stream (") // actor name ere, for easier identification f should include(s"failed, reason: $remoteFailureMessage") } "receive hundreds of elements via remoting" in { - remoteActor ! "receive" - val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] val msgs = (1 to 100).toList.map(i => s"payload-$i") Source(msgs).runWith(remoteSink) - msgs.foreach(t => p.expectMsg(t)) - p.expectMsg("") + msgs.foreach(t => elementProbe.expectMsg(t)) + elementProbe.expectMsg("") } "receive timeout if subscribing too late to the sink ref" in { - remoteActor ! "receive-subscribe-timeout" - val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive-subscribe-timeout", elementProbe.ref), remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] // not materializing it, awaiting the timeout... Thread.sleep(800) // the timeout is 500ms val probe = TestSource.probe[String](system).to(remoteSink).run() - val failure = p.expectMsgType[String] + val failure = elementProbe.expectMsgType[String] failure should include("Remote side did not subscribe (materialize) handed out Sink reference") // the local "remote sink" should cancel, since it should notice the origin target actor is dead @@ -354,8 +498,10 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend // bug #24626 "not receive timeout if subscribing is already done to the sink ref" in { - remoteActor ! "receive-subscribe-timeout" - val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive-subscribe-timeout", elementProbe.ref), remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] Source .repeat("whatever") .throttle(1, 100.millis) @@ -363,15 +509,16 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend .runWith(remoteSink) (0 to 9).foreach { _ => - p.expectMsg("whatever") + elementProbe.expectMsg("whatever") } - p.expectMsg("") + elementProbe.expectMsg("") } // bug #24934 "not receive timeout while data is being sent" in { - remoteActor ! "receive-ignore" - val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + remoteActor.tell("receive-ignore", remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] val done = Source @@ -386,18 +533,37 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend } "respect back -pressure from (implied by origin Sink)" in { - remoteActor ! "receive-32" - val sinkRef = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive-32", elementProbe.ref), remoteProbe.ref) + val sinkRef = remoteProbe.expectMsgType[SinkRef[String]] Source.repeat("hello").runWith(sinkRef) // if we get this message, it means no checks in the request/expect semantics were broken, good! - p.expectMsg("") + elementProbe.expectMsg("") + } + + "trigger local shutdown on remote shutdown" in { + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive-one-cancel", elementProbe.ref), remoteProbe.ref) + val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]] + + val done = + Source.single("hello").concat(Source.future(Future.never)).watchTermination()(Keep.right).to(remoteSink).run() + + elementProbe.expectMsg("hello") + elementProbe.expectMsg("") + remoteProbe.expectMsg(Done) + Await.result(done, 5.seconds) shouldBe Done } "not allow materializing multiple times" in { - remoteActor ! "receive" - val sinkRef = expectMsgType[SinkRef[String]] + val remoteProbe = TestProbe()(remoteSystem) + val elementProbe = TestProbe()(remoteSystem) + remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref) + val sinkRef = remoteProbe.expectMsgType[SinkRef[String]] val p1: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run() val p2: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run() diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 0efc456147..45fa08d359 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -95,7 +95,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn private var completedBeforeRemoteConnected: OptionVal[Try[Done]] = OptionVal.None - // Some when this side of the stream has completed/failed, and we await the Terminated() signal back from the partner + // When this side of the stream has completed/failed, and we await the Terminated() signal back from the partner // so we can safely shut down completely; This is to avoid *our* Terminated() signal to reach the partner before the // Complete/Fail message does, which can happen on transports such as Artery which use a dedicated lane for system messages (Terminated) private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None @@ -103,6 +103,11 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn override def preStart(): Unit = { initialPartnerRef match { case OptionVal.Some(ref) => + log.debug( + "[{}] Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}", + stageActorName, + initialPartnerRef, + self.ref) // this will set the `partnerRef` observeAndValidateSender( ref, @@ -110,19 +115,26 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn "usage and complete stack trace on the issue tracker: https://github.com/akka/akka") tryPull() case OptionVal.None => + log.debug( + "[{}] Created SinkRef with initial partner, local worker: {}, subscription timeout: {}", + stageActorName, + self.ref, + PrettyDuration.format(subscriptionTimeout.timeout)) // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor // has not been provided with a valid initialPartnerRef) scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) } - log.debug( - "Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}", - initialPartnerRef, - self.ref) } def initialReceive: ((ActorRef, Any)) => Unit = { case (_, Terminated(ref)) => + log.debug( + "[{}] remote terminated [{}], partnerRef: [{}], finishedWithAwaitingPartnerTermination: [{}]", + stageActorName, + ref, + partnerRef, + finishedWithAwaitingPartnerTermination) if (ref == getPartnerRef) finishedWithAwaitingPartnerTermination match { case OptionVal.Some(Failure(ex)) => @@ -143,20 +155,42 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn if (remoteCumulativeDemandReceived < d) { remoteCumulativeDemandReceived = d log.debug( - "Received cumulative demand [{}], consumable demand: [{}]", + "[{}] Received cumulative demand [{}], consumable demand: [{}]", + stageActorName, StreamRefsProtocol.CumulativeDemand(d), remoteCumulativeDemandReceived - remoteCumulativeDemandConsumed) } tryPull() - case (_, _) => // keep the compiler happy (stage actor receive is total) + case (sender, StreamRefsProtocol.RemoteStreamCompleted(_)) => + // unless we already sent a completed/failed downstream and are awaiting Terminated as ack for that + if (finishedWithAwaitingPartnerTermination.isEmpty) { + log.debug("[{}] Remote downstream cancelled", stageActorName) + self.unwatch(sender) + // remote only sent this after unwatching so cancelling is ok + cancelStage(SubscriptionWithCancelException.NoMoreElementsNeeded) + sender ! StreamRefsProtocol.Ack + } + + case (sender, StreamRefsProtocol.RemoteStreamFailure(msg)) => + // unless we already sent a completed/failed downstream and are awaiting Terminated as ack for that + if (finishedWithAwaitingPartnerTermination.isEmpty) { + log.debug("[{}] Remote downstream failed: {}", stageActorName, msg) + self.unwatch(sender) + // remote only sent this after unwatching so cancelling is ok + cancelStage(RemoteStreamRefActorTerminatedException(s"Remote downstream failed: $msg")) + sender ! StreamRefsProtocol.Ack + } + + case (sender, msg) => // keep the compiler happy (stage actor receive is total) + log.debug("[{}] Unexpected message {} from {}", stageActorName, msg, sender) } override def onPush(): Unit = { val elem = grabSequenced(in) getPartnerRef ! elem - log.debug("Sending sequenced: {} to {}", elem, getPartnerRef) + log.debug("[{}] Sending sequenced: {} to {}", stageActorName, elem, getPartnerRef) tryPull() } @@ -167,6 +201,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn override protected def onTimer(timerKey: Any): Unit = timerKey match { case SubscriptionTimeoutTimerKey => + log.debug("[{}] Subscription timed out", stageActorName) val ex = StreamRefSubscriptionTimeoutException( // we know the future has been competed by now, since it is in preStart s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [$ref], " + @@ -182,6 +217,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn } override def onUpstreamFailure(ex: Throwable): Unit = { + log.debug("[{}] Upstream failure, partnerRef [{}]", stageActorName, partnerRef) partnerRef match { case OptionVal.Some(ref) => ref ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) @@ -196,7 +232,8 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn } } - override def onUpstreamFinish(): Unit = + override def onUpstreamFinish(): Unit = { + log.debug("[{}] Upstream finish, partnerRef [{}]", stageActorName, partnerRef) partnerRef match { case OptionVal.Some(ref) => ref ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) @@ -207,6 +244,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn // not terminating on purpose, since other side may subscribe still and then we want to complete it setKeepGoing(true) } + } @throws[InvalidPartnerActorException] def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = { @@ -219,14 +257,15 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn completedBeforeRemoteConnected match { case OptionVal.Some(scala.util.Failure(ex)) => log.warning( - "Stream already terminated with exception before remote side materialized, sending failure: {}", + "[{}] Stream already terminated with exception before remote side materialized, sending failure: {}", + stageActorName, ex) partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex)) setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) case OptionVal.Some(scala.util.Success(Done)) => - log.warning("Stream already completed before remote side materialized, failing now.") + log.warning("[{}] Stream already completed before remote side materialized, failing now.", stageActorName) partner ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) finishedWithAwaitingPartnerTermination = OptionVal(Success(Done)) setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 137ebe13f6..a6cb2ab795 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -64,6 +64,29 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e highWatermark - remainingRequested else 0 } + + private sealed trait State + private sealed trait WeKnowPartner extends State { + def partner: ActorRef + } + + // we are the "origin", and awaiting the other side to start when we'll receive this ref + private case object AwaitingPartner extends State + // we're the "remote" for an already active Source on the other side (the "origin") + private case class AwaitingSubscription(partner: ActorRef) extends WeKnowPartner + // subscription aquired and up and running + private final case class Running(partner: ActorRef) extends WeKnowPartner + + // downstream cancelled or failed, waiting for remote upstream to ack + private final case class WaitingForCancelAck(partner: ActorRef, cause: Throwable) extends WeKnowPartner + // upstream completed, we are waiting to allow + private final case class UpstreamCompleted(partner: ActorRef) extends WeKnowPartner + private final case class UpstreamTerminated(partner: ActorRef) extends State + + val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey" + val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey" + val TerminationDeadlineTimerKey = "TerminationDeadlineTimerKey" + val CancellationDeadlineTimerKey = "CancellationDeadlineTimerKey" } /** @@ -75,8 +98,7 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e @InternalApi private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef]) extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage => - import SourceRefStageImpl.ActorRefStage - import SourceRefStageImpl.WatermarkRequestStrategy + import SourceRefStageImpl._ val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out") override def shape = SourceShape.of(out) @@ -128,16 +150,20 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName() private[this] val self: GraphStageLogic.StageActor = - getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive) + getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(receiveRemoteMessage) override val ref: ActorRef = self.ref private[this] implicit def selfSender: ActorRef = ref - val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey" - val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey" - val TerminationDeadlineTimerKey = "TerminationDeadlineTimerKey" - // demand management --- - private var completed = false + private var state: State = initialPartnerRef match { + case OptionVal.Some(ref) => + // this means we're the "remote" for an already active Source on the other side (the "origin") + self.watch(ref) + AwaitingSubscription(ref) + case OptionVal.None => + // we are the "origin", and awaiting the other side to start when we'll receive their partherRef + AwaitingPartner + } private var expectingSeqNr: Long = 0L private var localCumulativeDemand: Long = 0L @@ -145,21 +171,16 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity) - private val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy( - highWatermark = receiveBuffer.capacity) + private val requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) // end of demand management --- - // initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin") - // null otherwise, in which case we allocated first -- we are the "origin", and awaiting the other side to start when we'll receive this ref - private var partnerRef: OptionVal[ActorRef] = OptionVal.None - private def getPartnerRef = partnerRef.get - override def preStart(): Unit = { - log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref) - if (initialPartnerRef.isDefined) // this will set the partnerRef - observeAndValidateSender( - initialPartnerRef.get, - "Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.") + log.debug( + "[{}] Starting up with, self ref: {}, state: {}, subscription timeout: {}", + stageActorName, + self.ref, + state, + PrettyDuration.format(subscriptionTimeout.timeout)) // This timer will be cancelled if we receive the handshake from the remote SinkRef // either created in this method and provided as self.ref as initialPartnerRef @@ -172,103 +193,312 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio triggerCumulativeDemand() } - def triggerCumulativeDemand(): Unit = { - val i = receiveBuffer.remainingCapacity - localRemainingRequested - if (partnerRef.isDefined && i > 0) { - val addDemand = requestStrategy.requestDemand(receiveBuffer.used + localRemainingRequested) - // only if demand has increased we shoot it right away - // otherwise it's the same demand level, so it'd be triggered via redelivery anyway - if (addDemand > 0) { - localCumulativeDemand += addDemand - localRemainingRequested += addDemand - val demand = StreamRefsProtocol.CumulativeDemand(localCumulativeDemand) - - log.debug("[{}] Demanding until [{}] (+{})", stageActorName, localCumulativeDemand, addDemand) - getPartnerRef ! demand - scheduleDemandRedelivery() - } - } - } - - def scheduleDemandRedelivery(): Unit = - scheduleOnce(DemandRedeliveryTimerKey, demandRedeliveryInterval) - - override protected def onTimer(timerKey: Any): Unit = timerKey match { - case SubscriptionTimeoutTimerKey => - val ex = StreamRefSubscriptionTimeoutException( - // we know the future has been competed by now, since it is in preStart - s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [$ref]," + - s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") - - throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" - - case DemandRedeliveryTimerKey => - log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand) - getPartnerRef ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand) - scheduleDemandRedelivery() - - case TerminationDeadlineTimerKey => - failStage(RemoteStreamRefActorTerminatedException( - s"Remote partner [$partnerRef] has terminated unexpectedly and no clean completion/failure message was received " + - "(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.")) - } - - def initialReceive: ((ActorRef, Any)) => Unit = { + def receiveRemoteMessage: ((ActorRef, Any)) => Unit = { case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) => - cancelTimer(SubscriptionTimeoutTimerKey) - observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext") - log.debug("[{}] Received handshake {} from {}", stageActorName, msg, sender) + state match { + case AwaitingPartner => + cancelTimer(SubscriptionTimeoutTimerKey) + log.debug( + "[{}] Received on subscribe handshake {} while awaiting partner from {}", + stageActorName, + msg, + remoteRef) + state = Running(remoteRef) + self.watch(remoteRef) + triggerCumulativeDemand() + case AwaitingSubscription(partner) => + verifyPartner(sender, partner) + cancelTimer(SubscriptionTimeoutTimerKey) + log.debug( + "[{}] Received on subscribe handshake {} while awaiting subscription from {}", + stageActorName, + msg, + remoteRef) + state = Running(remoteRef) + triggerCumulativeDemand() - triggerCumulativeDemand() + case other => + throw new IllegalStateException(s"[$stageActorName] Got unexpected $msg in state $other") + } case (sender, msg @ StreamRefsProtocol.SequencedOnNext(seqNr, payload: Out @unchecked)) => - observeAndValidateSender(sender, "Illegal sender in SequencedOnNext") observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in SequencedOnNext") - log.debug("[{}] Received seq {} from {}", stageActorName, msg, sender) + state match { + case AwaitingSubscription(partner) => + verifyPartner(sender, partner) - onReceiveElement(payload) - triggerCumulativeDemand() + log.debug("[{}] Received seq {} from {}", stageActorName, msg, sender) + state = Running(partner) + onReceiveElement(payload) + triggerCumulativeDemand() + + case Running(partner) => + verifyPartner(sender, partner) + onReceiveElement(payload) + triggerCumulativeDemand() + + case AwaitingPartner => + throw new IllegalStateException(s"[$stageActorName] Got $msg from $sender while AwaitingPartner") + + case WaitingForCancelAck(partner, _) => + // awaiting cancellation ack from remote + verifyPartner(sender, partner) + log.warning( + "[{}] Got element from remote but downstream cancelled, dropping element of type {}", + stageActorName, + payload.getClass) + + case UpstreamCompleted(partner) => + verifyPartner(sender, partner) + throw new IllegalStateException( + s"[$stageActorName] Got completion and then received more elements from $sender, this is not supposed to happen.") + + case UpstreamTerminated(partner) => + verifyPartner(sender, partner) + log.debug("[{}] Received element after partner terminated") + onReceiveElement(payload) + + } case (sender, StreamRefsProtocol.RemoteStreamCompleted(seqNr)) => - observeAndValidateSender(sender, "Illegal sender in RemoteSinkCompleted") observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in RemoteSinkCompleted") - log.debug("[{}] The remote stream has completed, completing as well...", stageActorName) - - self.unwatch(sender) - completed = true - tryPush() + state match { + case Running(partner) => + // upstream completed, continue running until we have emitted every element in buffer + // or downstream cancels + verifyPartner(sender, partner) + log.debug( + "[{}] The remote stream has completed, emitting {} elements left in buffer before completing", + stageActorName, + receiveBuffer.used) + self.unwatch(sender) + state = UpstreamCompleted(partner) + tryPush() + case WaitingForCancelAck(_, _) => + // upstream completed while we were waiting for it to receive cancellation and ack + // upstream may stop without seeing cancellation, but we may not see termination + // let the cancel timeout hit + log.debug("[{}] Upstream completed while waiting for cancel ack", stageActorName) + case other => + // UpstreamCompleted, AwaitingPartner or AwaitingSubscription(_) all means a bug here + throw new IllegalStateException( + s"[$stageActorName] Saw RemoteStreamCompleted($seqNr) while in state $other, should never happen") + } case (sender, StreamRefsProtocol.RemoteStreamFailure(reason)) => - observeAndValidateSender(sender, "Illegal sender in RemoteSinkFailure") - log.warning("[{}] The remote stream has failed, failing (reason: {})", stageActorName, reason) + state match { + case weKnoPartner: WeKnowPartner => + val partner = weKnoPartner.partner + verifyPartner(sender, partner) + log.debug("[{}] The remote stream has failed, failing (reason: {})", stageActorName, reason) + failStage( + RemoteStreamRefActorTerminatedException( + s"[$stageActorName] Remote stream (${sender.path}) failed, reason: $reason")) + case other => + throw new IllegalStateException( + s"[$stageActorName] got RemoteStreamFailure($reason) when in state $other, should never happen") + } - self.unwatch(sender) - failStage(RemoteStreamRefActorTerminatedException(s"Remote stream (${sender.path}) failed, reason: $reason")) + case (sender, StreamRefsProtocol.Ack) => + state match { + case WaitingForCancelAck(partner, cause) => + verifyPartner(sender, partner) + log.debug(s"[$stageActorName] Got cancellation ack from remote, canceling", stageActorName) + cancelStage(cause) + case other => + throw new IllegalStateException(s"[$stageActorName] Got an Ack when in state $other") + } case (_, Terminated(p)) => - partnerRef match { - case OptionVal.Some(`p`) => + state match { + case weKnowPartner: WeKnowPartner => + if (weKnowPartner.partner != p) + throw RemoteStreamRefActorTerminatedException( + s"[$stageActorName] Received UNEXPECTED Terminated($p) message! " + + s"This actor was NOT our trusted remote partner, which was: ${weKnowPartner.partner}. Tearing down.") // we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail // will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref // the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there. scheduleOnce(TerminationDeadlineTimerKey, finalTerminationSignalDeadline) + log.debug( + "[{}] Partner terminated, starting delayed shutdown, deadline: [{}]", + stageActorName, + finalTerminationSignalDeadline) + state = UpstreamTerminated(weKnowPartner.partner) + case weDontKnowPartner => + throw new IllegalStateException( + s"[$stageActorName] Unexpected deathwatch message for $p before we knew partner ref, state $weDontKnowPartner") - case _ => - // this should not have happened! It should be impossible that we watched some other actor - failStage( - RemoteStreamRefActorTerminatedException( - s"Received UNEXPECTED Terminated($p) message! " + - s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down.")) } - - case (_, _) => // keep the compiler happy (stage actor receive is total) + case (sender, msg) => + // should never happen but keep the compiler happy (stage actor receive is total) + throw new IllegalStateException(s"[$stageActorName] Unexpected message in state $state: $msg from $sender") } - def tryPush(): Unit = + override protected def onTimer(timerKey: Any): Unit = timerKey match { + case SubscriptionTimeoutTimerKey => + state match { + case AwaitingPartner | AwaitingSubscription(_) => + val ex = StreamRefSubscriptionTimeoutException( + // we know the future has been competed by now, since it is in preStart + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [$ref]," + + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") + + throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" + case other => + // this is fine + log.debug("[{}] Ignoring subscription timeout in state [{}]", stageActorName, other) + } + + case DemandRedeliveryTimerKey => + state match { + case Running(ref) => + log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand) + ref ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand) + scheduleDemandRedelivery() + + case other => + log.debug("[{}] Ignoring demand redelivery timeout in state [{}]", stageActorName, other) + } + + case TerminationDeadlineTimerKey => + state match { + case UpstreamTerminated(partner) => + log.debug( + "[{}] Remote partner [{}] has terminated unexpectedly and no clean completion/failure message was received", + stageActorName, + partner) + failStage(RemoteStreamRefActorTerminatedException( + s"[$stageActorName] Remote partner [$partner] has terminated unexpectedly and no clean completion/failure message was received " + + "(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.")) + + case AwaitingPartner => + log.debug("[{}] Downstream cancelled, but timeout hit before we saw a partner", stageActorName) + cancelStage(SubscriptionWithCancelException.NoMoreElementsNeeded) + + case other => + throw new IllegalStateException(s"TerminationDeadlineTimerKey can't happen in state $other") + } + + case CancellationDeadlineTimerKey => + state match { + case WaitingForCancelAck(partner, cause) => + log.debug( + "[{}] Waiting for remote ack from [{}] for downstream failure timed out, failing stage with original downstream failure", + stageActorName, + partner) + cancelStage(cause) + + case other => + throw new IllegalStateException( + s"[$stageActorName] CancellationDeadlineTimerKey can't happen in state $other") + } + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + state match { + case Running(ref) => + triggerCancellationExchange(ref, cause) + + case AwaitingPartner => + // we can't do a graceful cancellation dance in this case, wait for partner and then cancel + // or timeout if we never get a partner + scheduleOnce(TerminationDeadlineTimerKey, finalTerminationSignalDeadline) + + case AwaitingSubscription(ref) => + // we didn't get an a first demand yet but have access to the partner - try a cancellation dance + triggerCancellationExchange(ref, cause) + + case UpstreamCompleted(_) => + // we saw upstream complete so let's just complete + if (receiveBuffer.nonEmpty) + log.debug( + "[{}] Downstream cancelled with elements [{}] in buffer, dropping elements", + stageActorName, + receiveBuffer.used) + cause match { + case _: SubscriptionWithCancelException => completeStage() + case failure => failStage(failure) + } + + case WaitingForCancelAck(_, _) => + // downstream can't finish twice + throw new UnsupportedOperationException( + s"[$stageActorName] Didn't expect state $state when downstream finished with $cause") + + case UpstreamTerminated(_) => + log.debug("[{}] Downstream cancelled with elements [{}] in buffer", stageActorName, receiveBuffer.used) + if (receiveBuffer.isEmpty) + failStage(RemoteStreamRefActorTerminatedException(s"[$stageActorName] unexpectedly terminated")) + else + // if there are elements left in the buffer we try to emit those + tryPush() + } + } + + private def triggerCancellationExchange(partner: ActorRef, cause: Throwable): Unit = { + if (receiveBuffer.nonEmpty) + log.debug("Downstream cancelled with elements [{}] in buffer, dropping elements", receiveBuffer.used) + val message = cause match { + case _: SubscriptionWithCancelException.NonFailureCancellation => + log.debug("[{}] Deferred stop on downstream cancel", stageActorName) + StreamRefsProtocol.RemoteStreamCompleted(expectingSeqNr) // seNr not really used in this case + + case streamFailure => + log.debug("[{}] Deferred stop on downstream failure: {}", stageActorName, streamFailure) + StreamRefsProtocol.RemoteStreamFailure("Downstream failed") + } + // sending the cancellation means it is ok for the partner to terminate + // we either get a response or hit a timeout and shutdown + self.unwatch(partner) + partner ! message + state = WaitingForCancelAck(partner, cause) + scheduleOnce(CancellationDeadlineTimerKey, subscriptionTimeout.timeout) + setKeepGoing(true) + } + + def triggerCumulativeDemand(): Unit = { + val i = receiveBuffer.remainingCapacity - localRemainingRequested + if (i > 0) { + val addDemand = requestStrategy.requestDemand(receiveBuffer.used + localRemainingRequested) + // only if demand has increased we shoot it right away + // otherwise it's the same demand level, so it'd be triggered via redelivery anyway + if (addDemand > 0) { + def sendDemand(partner: ActorRef): Unit = { + localCumulativeDemand += addDemand + localRemainingRequested += addDemand + val demand = StreamRefsProtocol.CumulativeDemand(localCumulativeDemand) + partner ! demand + scheduleDemandRedelivery() + } + state match { + case Running(partner) => + log.debug("[{}] Demanding until [{}] (+{})", stageActorName, localCumulativeDemand, addDemand) + sendDemand(partner) + + case AwaitingSubscription(partner) => + log.debug( + "[{}] Demanding, before subscription seen, until [{}] (+{})", + stageActorName, + localCumulativeDemand, + addDemand) + sendDemand(partner) + case other => + log.debug("[{}] Partner ref not set up in state {}, demanding elements deferred", stageActorName, other) + } + } + } + } + + private def tryPush(): Unit = if (receiveBuffer.nonEmpty && isAvailable(out)) { val element = receiveBuffer.dequeue() push(out, element) - } else if (receiveBuffer.isEmpty && completed) completeStage() + } else if (receiveBuffer.isEmpty) + state match { + case UpstreamCompleted(_) => completeStage() + case _ => // all other are ok + } private def onReceiveElement(payload: Out): Unit = { localRemainingRequested -= 1 @@ -284,32 +514,30 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio } } - /** @throws InvalidPartnerActorException when partner ref is invalid */ - def observeAndValidateSender(partner: ActorRef, msg: String): Unit = - partnerRef match { - case OptionVal.None => - log.debug("Received first message from {}, assuming it to be the remote partner for this stage", partner) - partnerRef = OptionVal(partner) - self.watch(partner) - - case OptionVal.Some(p) => - if (partner != p) { - val ex = InvalidPartnerActorException(partner, getPartnerRef, msg) - partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) - throw ex - } // else, ref is valid and we don't need to do anything with it - } + private def verifyPartner(sender: ActorRef, partner: ActorRef): Unit = { + if (sender != partner) + throw InvalidPartnerActorException( + partner, + sender, + s"[$stageActorName] Received message from UNEXPECTED sender [$sender]! " + + s"This actor is NOT our trusted remote partner, which is [$partner]. Tearing down.") + } /** @throws InvalidSequenceNumberException when sequence number is invalid */ - def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit = + private def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit = if (isInvalidSequenceNr(seqNr)) { + log.warning("[{}] {}, expected {} but was {}", stageActorName, msg, expectingSeqNr, seqNr) throw InvalidSequenceNumberException(expectingSeqNr, seqNr, msg) } else { expectingSeqNr += 1 } - def isInvalidSequenceNr(seqNr: Long): Boolean = + + private def isInvalidSequenceNr(seqNr: Long): Boolean = seqNr != expectingSeqNr + private def scheduleDemandRedelivery(): Unit = + scheduleOnce(DemandRedeliveryTimerKey, demandRedeliveryInterval) + setHandler(out, this) } (logic, SinkRefImpl(logic.ref)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsProtocol.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsProtocol.scala index c9e3af12b4..2a89a21caa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsProtocol.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsProtocol.scala @@ -39,13 +39,19 @@ private[akka] object StreamRefsProtocol { with DeadLetterSuppression /** - * INTERNAL API: Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a Failure. + * INTERNAL API + * + * Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a Failure. + * Sent to the sender of a stream if receiver downstream failed. */ @InternalApi private[akka] final case class RemoteStreamFailure(msg: String) extends StreamRefsProtocol /** - * INTERNAL API: Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a completion. + * INTERNAL API + * + * Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a completion. + * Sent to the sender of a stream ref if receiver downstream cancelled. */ @InternalApi private[akka] final case class RemoteStreamCompleted(seqNr: Long) extends StreamRefsProtocol @@ -60,4 +66,12 @@ private[akka] object StreamRefsProtocol { if (seqNr <= 0) throw ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException } + /** + * INTERNAL API + * + * Ack that failure or completion has been seen and the remote side can stop + */ + @InternalApi + private[akka] final case object Ack extends StreamRefsProtocol with DeadLetterSuppression + } diff --git a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala index a9f55b104f..5e21921451 100644 --- a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala +++ b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala @@ -26,6 +26,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) private[this] val SourceRefManifest = "E" private[this] val SinkRefManifest = "F" private[this] val OnSubscribeHandshakeManifest = "G" + private[this] val AckManifest = "H" override def manifest(o: AnyRef): String = o match { // protocol @@ -41,6 +42,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) // case _: MaterializedSourceRef[_] => SourceRefManifest case _: SinkRefImpl[_] => SinkRefManifest // case _: MaterializedSinkRef[_] => SinkRefManifest + case StreamRefsProtocol.Ack => AckManifest } override def toBinary(o: AnyRef): Array[Byte] = o match { @@ -57,6 +59,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) // case ref: MaterializedSinkRef[_] => ??? // serializeSinkRef(ref).toByteArray case ref: SourceRefImpl[_] => serializeSourceRef(ref).toByteArray // case ref: MaterializedSourceRef[_] => serializeSourceRef(ref.).toByteArray + case StreamRefsProtocol.Ack => Array.emptyByteArray } override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { @@ -69,6 +72,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) // refs case SinkRefManifest => deserializeSinkRef(bytes) case SourceRefManifest => deserializeSourceRef(bytes) + case AckManifest => StreamRefsProtocol.Ack } // -----