Support cancellation propagation in StreamRefs #28317

This commit is contained in:
Johan Andrén 2020-01-20 10:28:26 +01:00 committed by GitHub
parent 41ef4bb66e
commit fc0c98e17a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 663 additions and 212 deletions

View file

@ -4,18 +4,21 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.NotUsed import akka.{ Done, NotUsed }
import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props } import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props }
import akka.actor.Status.Failure
import akka.pattern._ import akka.pattern._
import akka.stream._ import akka.stream._
import akka.stream.impl.streamref.{ SinkRefImpl, SourceRefImpl } import akka.stream.impl.streamref.{ SinkRefImpl, SourceRefImpl }
import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestPublisher
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit, TestProbe } import akka.testkit.{ AkkaSpec, TestKit, TestProbe }
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.config._ import com.typesafe.config._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Promise
import scala.concurrent.{ Await, Future } import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -23,13 +26,15 @@ import scala.util.control.NoStackTrace
object StreamRefsSpec { object StreamRefsSpec {
object DataSourceActor { object DataSourceActor {
def props(probe: ActorRef): Props = def props(): Props = Props(new DataSourceActor()).withDispatcher("akka.test.stream-dispatcher")
Props(new DataSourceActor(probe)).withDispatcher("akka.test.stream-dispatcher")
} }
class DataSourceActor(probe: ActorRef) extends Actor with ActorLogging { case class Command(cmd: String, probe: ActorRef)
class DataSourceActor() extends Actor with ActorLogging {
import context.system import context.system
import context.dispatcher
def receive = { def receive = {
case "give" => case "give" =>
@ -43,6 +48,26 @@ object StreamRefsSpec {
sender() ! ref sender() ! ref
case "give-nothing-watch" =>
val source: Source[String, NotUsed] = Source.future(Future.never.mapTo[String])
val (done: Future[Done], ref: SourceRef[String]) =
source.watchTermination()(Keep.right).toMat(StreamRefs.sourceRef())(Keep.both).run()
sender() ! ref
import context.dispatcher
done.pipeTo(sender())
case "give-only-one-watch" =>
val source: Source[String, NotUsed] = Source.single("hello").concat(Source.future(Future.never))
val (done: Future[Done], ref: SourceRef[String]) =
source.watchTermination()(Keep.right).toMat(StreamRefs.sourceRef())(Keep.both).run()
sender() ! ref
import context.dispatcher
done.pipeTo(sender())
case "give-infinite" => case "give-infinite" =>
val source: Source[String, NotUsed] = Source.fromIterator(() => Iterator.from(1)).map("ping-" + _) val source: Source[String, NotUsed] = Source.fromIterator(() => Iterator.from(1)).map("ping-" + _)
val (_: NotUsed, ref: SourceRef[String]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run() val (_: NotUsed, ref: SourceRef[String]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run()
@ -57,6 +82,12 @@ object StreamRefsSpec {
val ref = Source.empty.runWith(StreamRefs.sourceRef()) val ref = Source.empty.runWith(StreamRefs.sourceRef())
sender() ! ref sender() ! ref
case "give-maybe" =>
val ((maybe, termination), sourceRef) =
Source.maybe[String].watchTermination()(Keep.both).toMat(StreamRefs.sourceRef())(Keep.both).run()
sender() ! (maybe -> sourceRef)
termination.pipeTo(sender())
case "give-subscribe-timeout" => case "give-subscribe-timeout" =>
val ref = Source val ref = Source
.repeat("is anyone there?") .repeat("is anyone there?")
@ -65,18 +96,7 @@ object StreamRefsSpec {
.run() .run()
sender() ! ref sender() ! ref
// case "send-bulk" => case Command("receive", probe) =>
// /*
// * Here we're able to send a source to a remote recipient
// * The source is a "bulk transfer one, in which we're ready to send a lot of data"
// *
// * For them it's a Source; for us it is a Sink we run data "into"
// */
// val source: Source[ByteString, NotUsed] = Source.single(ByteString("huge-file-"))
// val ref: SourceRef[ByteString] = source.runWith(SourceRef.bulkTransfer())
// sender() ! BulkSourceMsg(ref)
case "receive" =>
/* /*
* We write out code, knowing that the other side will stream the data into it. * We write out code, knowing that the other side will stream the data into it.
* *
@ -86,12 +106,31 @@ object StreamRefsSpec {
StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "<COMPLETE>", f => "<FAILED>: " + f.getMessage)).run() StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "<COMPLETE>", f => "<FAILED>: " + f.getMessage)).run()
sender() ! sink sender() ! sink
case Command("receive-one-cancel", probe) =>
// will shutdown the stream after the first element using a kill switch
val (sink, done) =
StreamRefs
.sinkRef[String]()
.viaMat(KillSwitches.single)(Keep.both)
.alsoToMat(Sink.head)(Keep.both)
.mapMaterializedValue {
case ((sink, ks), firstF) =>
// shutdown the stream after first element
firstF.foreach(_ => ks.shutdown())(context.dispatcher)
sink
}
.watchTermination()(Keep.both)
.to(Sink.actorRef(probe, "<COMPLETE>", f => "<FAILED>: " + f.getMessage))
.run()
sender() ! sink
done.pipeTo(sender())
case "receive-ignore" => case "receive-ignore" =>
val sink = val sink =
StreamRefs.sinkRef[String]().to(Sink.ignore).run() StreamRefs.sinkRef[String]().to(Sink.ignore).run()
sender() ! sink sender() ! sink
case "receive-subscribe-timeout" => case Command("receive-subscribe-timeout", probe) =>
val sink = StreamRefs val sink = StreamRefs
.sinkRef[String]() .sinkRef[String]()
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
@ -99,7 +138,7 @@ object StreamRefsSpec {
.run() .run()
sender() ! sink sender() ! sink
case "receive-32" => case Command("receive-32", probe) =>
val (sink, driver) = StreamRefs.sinkRef[String]().toMat(TestSink.probe(context.system))(Keep.both).run() val (sink, driver) = StreamRefs.sinkRef[String]().toMat(TestSink.probe(context.system))(Keep.both).run()
import context.dispatcher import context.dispatcher
@ -117,22 +156,7 @@ object StreamRefsSpec {
sender() ! sink sender() ! sink
// case "receive-bulk" =>
// /*
// * We write out code, knowing that the other side will stream the data into it.
// * This will open a dedicated connection per transfer.
// *
// * For them it's a Sink; for us it's a Source.
// */
// val sink: SinkRef[ByteString] =
// SinkRef.bulkTransferSource()
// .to(Sink.actorRef(probe, "<COMPLETE>"))
// .run()
//
//
// sender() ! BulkSinkMsg(sink)
} }
} }
// ------------------------- // -------------------------
@ -171,7 +195,7 @@ object StreamRefsSpec {
} }
} }
class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSender { class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) {
import StreamRefsSpec._ import StreamRefsSpec._
val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config()) val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config())
@ -179,55 +203,61 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
override protected def beforeTermination(): Unit = override protected def beforeTermination(): Unit =
TestKit.shutdownActorSystem(remoteSystem) TestKit.shutdownActorSystem(remoteSystem)
val p = TestProbe()
// obtain the remoteActor ref via selection in order to use _real_ remoting in this test // obtain the remoteActor ref via selection in order to use _real_ remoting in this test
val remoteActor = { val remoteActor = {
val it = remoteSystem.actorOf(DataSourceActor.props(p.ref), "remoteActor") val probe = TestProbe()(remoteSystem)
val it = remoteSystem.actorOf(DataSourceActor.props(), "remoteActor")
val remoteAddress = remoteSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress val remoteAddress = remoteSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress
system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi") system.actorSelection(it.path.toStringWithAddress(remoteAddress)).tell(Identify("hi"), probe.ref)
expectMsgType[ActorIdentity].ref.get probe.expectMsgType[ActorIdentity].ref.get
} }
"A SourceRef" must { "A SourceRef" must {
"send messages via remoting" in { "send messages via remoting" in {
remoteActor ! "give" val remoteProbe = TestProbe()(remoteSystem)
val sourceRef = expectMsgType[SourceRef[String]] remoteActor.tell("give", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>", _ => "<FAILED>")) val localProbe = TestProbe()
sourceRef.runWith(Sink.actorRef(localProbe.ref, "<COMPLETE>", ex => s"<FAILED> ${ex.getMessage}"))
p.expectMsg("hello") localProbe.expectMsg(5.seconds, "hello")
p.expectMsg("world") localProbe.expectMsg("world")
p.expectMsg("<COMPLETE>") localProbe.expectMsg("<COMPLETE>")
} }
"fail when remote source failed" in { "fail when remote source failed" in {
remoteActor ! "give-fail" val remoteProbe = TestProbe()(remoteSystem)
val sourceRef = expectMsgType[SourceRef[String]] remoteActor.tell("give-fail", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>", t => "<FAILED>: " + t.getMessage)) val localProbe = TestProbe()
sourceRef.runWith(Sink.actorRef(localProbe.ref, "<COMPLETE>", t => "<FAILED>: " + t.getMessage))
val f = p.expectMsgType[String] val f = localProbe.expectMsgType[String]
f should include("Remote stream (") f should include("Remote stream (")
// actor name here, for easier identification // actor name here, for easier identification
f should include("failed, reason: Booooom!") f should include("failed, reason: Booooom!")
} }
"complete properly when remote source is empty" in { "complete properly when remote source is empty" in {
val remoteProbe = TestProbe()(remoteSystem)
// this is a special case since it makes sure that the remote stage is still there when we connect to it // this is a special case since it makes sure that the remote stage is still there when we connect to it
remoteActor ! "give-complete-asap" remoteActor.tell("give-complete-asap", remoteProbe.ref)
val sourceRef = expectMsgType[SourceRef[String]] val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>", _ => "<FAILED>")) val localProbe = TestProbe()
sourceRef.runWith(Sink.actorRef(localProbe.ref, "<COMPLETE>", _ => "<FAILED>"))
p.expectMsg("<COMPLETE>") localProbe.expectMsg("<COMPLETE>")
} }
"respect back-pressure from (implied by target Sink)" in { "respect back-pressure from (implied by target Sink)" in {
remoteActor ! "give-infinite" val remoteProbe = TestProbe()(remoteSystem)
val sourceRef = expectMsgType[SourceRef[String]] remoteActor.tell("give-infinite", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
val probe = sourceRef.runWith(TestSink.probe) val probe = sourceRef.runWith(TestSink.probe)
@ -251,10 +281,12 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
} }
"receive timeout if subscribing too late to the source ref" in { "receive timeout if subscribing too late to the source ref" in {
remoteActor ! "give-subscribe-timeout" val remoteProbe = TestProbe()(remoteSystem)
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] remoteActor.tell("give-subscribe-timeout", remoteProbe.ref)
val remoteSource: SourceRef[String] = remoteProbe.expectMsgType[SourceRef[String]]
// not materializing it, awaiting the timeout... // not materializing it, awaiting the timeout...
Thread.sleep(800) // the timeout is 500ms Thread.sleep(800) // the timeout is 500ms
val probe = remoteSource.runWith(TestSink.probe[String](system)) val probe = remoteSource.runWith(TestSink.probe[String](system))
@ -270,8 +302,9 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
// bug #24626 // bug #24626
"not receive subscription timeout when got subscribed" in { "not receive subscription timeout when got subscribed" in {
remoteActor ! "give-subscribe-timeout" val remoteProbe = TestProbe()(remoteSystem)
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] remoteActor.tell("give-subscribe-timeout", remoteProbe.ref)
val remoteSource: SourceRef[String] = remoteProbe.expectMsgType[SourceRef[String]]
// materialize directly and start consuming, timeout is 500ms // materialize directly and start consuming, timeout is 500ms
val eventualStrings: Future[immutable.Seq[String]] = remoteSource val eventualStrings: Future[immutable.Seq[String]] = remoteSource
.throttle(1, 100.millis, 1, ThrottleMode.Shaping) .throttle(1, 100.millis, 1, ThrottleMode.Shaping)
@ -283,8 +316,9 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
// bug #24934 // bug #24934
"not receive timeout while data is being sent" in { "not receive timeout while data is being sent" in {
remoteActor ! "give-infinite" val remoteProbe = TestProbe()(remoteSystem)
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] remoteActor.tell("give-infinite", remoteProbe.ref)
val remoteSource: SourceRef[String] = remoteProbe.expectMsgType[SourceRef[String]]
val done = val done =
remoteSource remoteSource
@ -294,58 +328,168 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
Await.result(done, 8.seconds) Await.result(done, 8.seconds)
} }
"pass cancellation upstream across remoting after elements passed through" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor.tell("give-only-one-watch", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
val localProbe = TestProbe()
val ks =
sourceRef
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.actorRef(localProbe.ref, "<COMPLETE>", ex => s"<FAILED> ${ex.getMessage}"))
.run()
localProbe.expectMsg("hello")
ks.shutdown()
localProbe.expectMsg("<COMPLETE>")
remoteProbe.expectMsg(Done)
}
"pass cancellation upstream across remoting before elements has been emitted" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor.tell("give-nothing-watch", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
val localProbe = TestProbe()
val ks =
sourceRef
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.actorRef(localProbe.ref, "<COMPLETE>", ex => s"<FAILED> ${ex.getMessage}"))
.run()
ks.shutdown()
localProbe.expectMsg("<COMPLETE>")
remoteProbe.expectMsg(Done)
}
"pass failure upstream across remoting before elements has been emitted" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor.tell("give-nothing-watch", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
val localProbe = TestProbe()
val ks =
sourceRef
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.actorRef(localProbe.ref, "<COMPLETE>", ex => s"<FAILED> ${ex.getMessage}"))
.run()
ks.abort(TE("det gick åt skogen"))
localProbe.expectMsg("<FAILED> det gick åt skogen")
remoteProbe.expectMsgType[Failure].cause shouldBe a[RemoteStreamRefActorTerminatedException]
}
"pass failure upstream across remoting after elements passed through" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor.tell("give-only-one-watch", remoteProbe.ref)
val sourceRef = remoteProbe.expectMsgType[SourceRef[String]]
val localProbe = TestProbe()
val ks =
sourceRef
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.actorRef(localProbe.ref, "<COMPLETE>", ex => s"<FAILED> ${ex.getMessage}"))
.run()
localProbe.expectMsg("hello")
ks.abort(TE("det gick åt pipan"))
localProbe.expectMsg("<FAILED> det gick åt pipan")
remoteProbe.expectMsgType[Failure].cause shouldBe a[RemoteStreamRefActorTerminatedException]
}
"handle concurrent cancel and failure" in {
// this is not possible to deterministically trigger but what we try to
// do is have a cancel in the SourceRef and a complete on the SinkRef side happen
// concurrently before they have managed to tell each other about it
val remoteProbe = TestProbe()(remoteSystem)
remoteActor.tell("give-maybe", remoteProbe.ref)
// this is somewhat weird, but we are local to the remote system with the remoteProbe so promise
// is not sent across the wire
val (remoteControl, sourceRef) = remoteProbe.expectMsgType[(Promise[Option[String]], SourceRef[String])]
val localProbe = TestProbe()
val ks =
sourceRef
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.actorRef(localProbe.ref, "<COMPLETE>", ex => s"<FAILED> ${ex.getMessage}"))
.run()
// "concurrently"
ks.shutdown()
remoteControl.success(None)
// since it is a race we can only confirm that it either completes or fails both sides
// if it didn't work
val localComplete = localProbe.expectMsgType[String]
localComplete should startWith("<COMPLETE>").or(startWith("<FAILED>"))
val remoteCompleted = remoteProbe.expectMsgType[AnyRef]
remoteCompleted match {
case Done =>
case Failure(_) =>
case _ => fail()
}
}
} }
"A SinkRef" must { "A SinkRef" must {
"receive elements via remoting" in { "receive elements via remoting" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor ! "receive" val elementProbe = TestProbe()(remoteSystem)
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
Source("hello" :: "world" :: Nil).to(remoteSink).run() Source("hello" :: "world" :: Nil).to(remoteSink).run()
p.expectMsg("hello") elementProbe.expectMsg("hello")
p.expectMsg("world") elementProbe.expectMsg("world")
p.expectMsg("<COMPLETE>") elementProbe.expectMsg("<COMPLETE>")
} }
"fail origin if remote Sink gets a failure" in { "fail origin if remote Sink gets a failure" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor ! "receive" val elementProbe = TestProbe()(remoteSystem)
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
val remoteFailureMessage = "Booom!" val remoteFailureMessage = "Booom!"
Source.failed(new Exception(remoteFailureMessage)).to(remoteSink).run() Source.failed(new Exception(remoteFailureMessage)).to(remoteSink).run()
val f = p.expectMsgType[String] val f = elementProbe.expectMsgType[String]
f should include(s"Remote stream (") f should include(s"Remote stream (")
// actor name ere, for easier identification // actor name ere, for easier identification
f should include(s"failed, reason: $remoteFailureMessage") f should include(s"failed, reason: $remoteFailureMessage")
} }
"receive hundreds of elements via remoting" in { "receive hundreds of elements via remoting" in {
remoteActor ! "receive" val remoteProbe = TestProbe()(remoteSystem)
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] val elementProbe = TestProbe()(remoteSystem)
remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
val msgs = (1 to 100).toList.map(i => s"payload-$i") val msgs = (1 to 100).toList.map(i => s"payload-$i")
Source(msgs).runWith(remoteSink) Source(msgs).runWith(remoteSink)
msgs.foreach(t => p.expectMsg(t)) msgs.foreach(t => elementProbe.expectMsg(t))
p.expectMsg("<COMPLETE>") elementProbe.expectMsg("<COMPLETE>")
} }
"receive timeout if subscribing too late to the sink ref" in { "receive timeout if subscribing too late to the sink ref" in {
remoteActor ! "receive-subscribe-timeout" val remoteProbe = TestProbe()(remoteSystem)
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] val elementProbe = TestProbe()(remoteSystem)
remoteActor.tell(Command("receive-subscribe-timeout", elementProbe.ref), remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
// not materializing it, awaiting the timeout... // not materializing it, awaiting the timeout...
Thread.sleep(800) // the timeout is 500ms Thread.sleep(800) // the timeout is 500ms
val probe = TestSource.probe[String](system).to(remoteSink).run() val probe = TestSource.probe[String](system).to(remoteSink).run()
val failure = p.expectMsgType[String] val failure = elementProbe.expectMsgType[String]
failure should include("Remote side did not subscribe (materialize) handed out Sink reference") failure should include("Remote side did not subscribe (materialize) handed out Sink reference")
// the local "remote sink" should cancel, since it should notice the origin target actor is dead // the local "remote sink" should cancel, since it should notice the origin target actor is dead
@ -354,8 +498,10 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
// bug #24626 // bug #24626
"not receive timeout if subscribing is already done to the sink ref" in { "not receive timeout if subscribing is already done to the sink ref" in {
remoteActor ! "receive-subscribe-timeout" val remoteProbe = TestProbe()(remoteSystem)
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] val elementProbe = TestProbe()(remoteSystem)
remoteActor.tell(Command("receive-subscribe-timeout", elementProbe.ref), remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
Source Source
.repeat("whatever") .repeat("whatever")
.throttle(1, 100.millis) .throttle(1, 100.millis)
@ -363,15 +509,16 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
.runWith(remoteSink) .runWith(remoteSink)
(0 to 9).foreach { _ => (0 to 9).foreach { _ =>
p.expectMsg("whatever") elementProbe.expectMsg("whatever")
} }
p.expectMsg("<COMPLETE>") elementProbe.expectMsg("<COMPLETE>")
} }
// bug #24934 // bug #24934
"not receive timeout while data is being sent" in { "not receive timeout while data is being sent" in {
remoteActor ! "receive-ignore" val remoteProbe = TestProbe()(remoteSystem)
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] remoteActor.tell("receive-ignore", remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
val done = val done =
Source Source
@ -386,18 +533,37 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
} }
"respect back -pressure from (implied by origin Sink)" in { "respect back -pressure from (implied by origin Sink)" in {
remoteActor ! "receive-32" val remoteProbe = TestProbe()(remoteSystem)
val sinkRef = expectMsgType[SinkRef[String]] val elementProbe = TestProbe()(remoteSystem)
remoteActor.tell(Command("receive-32", elementProbe.ref), remoteProbe.ref)
val sinkRef = remoteProbe.expectMsgType[SinkRef[String]]
Source.repeat("hello").runWith(sinkRef) Source.repeat("hello").runWith(sinkRef)
// if we get this message, it means no checks in the request/expect semantics were broken, good! // if we get this message, it means no checks in the request/expect semantics were broken, good!
p.expectMsg("<COMPLETED>") elementProbe.expectMsg("<COMPLETED>")
}
"trigger local shutdown on remote shutdown" in {
val remoteProbe = TestProbe()(remoteSystem)
val elementProbe = TestProbe()(remoteSystem)
remoteActor.tell(Command("receive-one-cancel", elementProbe.ref), remoteProbe.ref)
val remoteSink: SinkRef[String] = remoteProbe.expectMsgType[SinkRef[String]]
val done =
Source.single("hello").concat(Source.future(Future.never)).watchTermination()(Keep.right).to(remoteSink).run()
elementProbe.expectMsg("hello")
elementProbe.expectMsg("<COMPLETE>")
remoteProbe.expectMsg(Done)
Await.result(done, 5.seconds) shouldBe Done
} }
"not allow materializing multiple times" in { "not allow materializing multiple times" in {
remoteActor ! "receive" val remoteProbe = TestProbe()(remoteSystem)
val sinkRef = expectMsgType[SinkRef[String]] val elementProbe = TestProbe()(remoteSystem)
remoteActor.tell(Command("receive", elementProbe.ref), remoteProbe.ref)
val sinkRef = remoteProbe.expectMsgType[SinkRef[String]]
val p1: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run() val p1: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run()
val p2: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run() val p2: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run()

View file

@ -95,7 +95,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
private var completedBeforeRemoteConnected: OptionVal[Try[Done]] = OptionVal.None private var completedBeforeRemoteConnected: OptionVal[Try[Done]] = OptionVal.None
// Some when this side of the stream has completed/failed, and we await the Terminated() signal back from the partner // When this side of the stream has completed/failed, and we await the Terminated() signal back from the partner
// so we can safely shut down completely; This is to avoid *our* Terminated() signal to reach the partner before the // so we can safely shut down completely; This is to avoid *our* Terminated() signal to reach the partner before the
// Complete/Fail message does, which can happen on transports such as Artery which use a dedicated lane for system messages (Terminated) // Complete/Fail message does, which can happen on transports such as Artery which use a dedicated lane for system messages (Terminated)
private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None
@ -103,6 +103,11 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
override def preStart(): Unit = { override def preStart(): Unit = {
initialPartnerRef match { initialPartnerRef match {
case OptionVal.Some(ref) => case OptionVal.Some(ref) =>
log.debug(
"[{}] Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}",
stageActorName,
initialPartnerRef,
self.ref)
// this will set the `partnerRef` // this will set the `partnerRef`
observeAndValidateSender( observeAndValidateSender(
ref, ref,
@ -110,19 +115,26 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
"usage and complete stack trace on the issue tracker: https://github.com/akka/akka") "usage and complete stack trace on the issue tracker: https://github.com/akka/akka")
tryPull() tryPull()
case OptionVal.None => case OptionVal.None =>
log.debug(
"[{}] Created SinkRef with initial partner, local worker: {}, subscription timeout: {}",
stageActorName,
self.ref,
PrettyDuration.format(subscriptionTimeout.timeout))
// only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor
// has not been provided with a valid initialPartnerRef) // has not been provided with a valid initialPartnerRef)
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
} }
log.debug(
"Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}",
initialPartnerRef,
self.ref)
} }
def initialReceive: ((ActorRef, Any)) => Unit = { def initialReceive: ((ActorRef, Any)) => Unit = {
case (_, Terminated(ref)) => case (_, Terminated(ref)) =>
log.debug(
"[{}] remote terminated [{}], partnerRef: [{}], finishedWithAwaitingPartnerTermination: [{}]",
stageActorName,
ref,
partnerRef,
finishedWithAwaitingPartnerTermination)
if (ref == getPartnerRef) if (ref == getPartnerRef)
finishedWithAwaitingPartnerTermination match { finishedWithAwaitingPartnerTermination match {
case OptionVal.Some(Failure(ex)) => case OptionVal.Some(Failure(ex)) =>
@ -143,20 +155,42 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
if (remoteCumulativeDemandReceived < d) { if (remoteCumulativeDemandReceived < d) {
remoteCumulativeDemandReceived = d remoteCumulativeDemandReceived = d
log.debug( log.debug(
"Received cumulative demand [{}], consumable demand: [{}]", "[{}] Received cumulative demand [{}], consumable demand: [{}]",
stageActorName,
StreamRefsProtocol.CumulativeDemand(d), StreamRefsProtocol.CumulativeDemand(d),
remoteCumulativeDemandReceived - remoteCumulativeDemandConsumed) remoteCumulativeDemandReceived - remoteCumulativeDemandConsumed)
} }
tryPull() tryPull()
case (_, _) => // keep the compiler happy (stage actor receive is total) case (sender, StreamRefsProtocol.RemoteStreamCompleted(_)) =>
// unless we already sent a completed/failed downstream and are awaiting Terminated as ack for that
if (finishedWithAwaitingPartnerTermination.isEmpty) {
log.debug("[{}] Remote downstream cancelled", stageActorName)
self.unwatch(sender)
// remote only sent this after unwatching so cancelling is ok
cancelStage(SubscriptionWithCancelException.NoMoreElementsNeeded)
sender ! StreamRefsProtocol.Ack
}
case (sender, StreamRefsProtocol.RemoteStreamFailure(msg)) =>
// unless we already sent a completed/failed downstream and are awaiting Terminated as ack for that
if (finishedWithAwaitingPartnerTermination.isEmpty) {
log.debug("[{}] Remote downstream failed: {}", stageActorName, msg)
self.unwatch(sender)
// remote only sent this after unwatching so cancelling is ok
cancelStage(RemoteStreamRefActorTerminatedException(s"Remote downstream failed: $msg"))
sender ! StreamRefsProtocol.Ack
}
case (sender, msg) => // keep the compiler happy (stage actor receive is total)
log.debug("[{}] Unexpected message {} from {}", stageActorName, msg, sender)
} }
override def onPush(): Unit = { override def onPush(): Unit = {
val elem = grabSequenced(in) val elem = grabSequenced(in)
getPartnerRef ! elem getPartnerRef ! elem
log.debug("Sending sequenced: {} to {}", elem, getPartnerRef) log.debug("[{}] Sending sequenced: {} to {}", stageActorName, elem, getPartnerRef)
tryPull() tryPull()
} }
@ -167,6 +201,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
override protected def onTimer(timerKey: Any): Unit = timerKey match { override protected def onTimer(timerKey: Any): Unit = timerKey match {
case SubscriptionTimeoutTimerKey => case SubscriptionTimeoutTimerKey =>
log.debug("[{}] Subscription timed out", stageActorName)
val ex = StreamRefSubscriptionTimeoutException( val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart // we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [$ref], " + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [$ref], " +
@ -182,6 +217,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
} }
override def onUpstreamFailure(ex: Throwable): Unit = { override def onUpstreamFailure(ex: Throwable): Unit = {
log.debug("[{}] Upstream failure, partnerRef [{}]", stageActorName, partnerRef)
partnerRef match { partnerRef match {
case OptionVal.Some(ref) => case OptionVal.Some(ref) =>
ref ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) ref ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
@ -196,7 +232,8 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
} }
} }
override def onUpstreamFinish(): Unit = override def onUpstreamFinish(): Unit = {
log.debug("[{}] Upstream finish, partnerRef [{}]", stageActorName, partnerRef)
partnerRef match { partnerRef match {
case OptionVal.Some(ref) => case OptionVal.Some(ref) =>
ref ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) ref ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed)
@ -207,6 +244,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
// not terminating on purpose, since other side may subscribe still and then we want to complete it // not terminating on purpose, since other side may subscribe still and then we want to complete it
setKeepGoing(true) setKeepGoing(true)
} }
}
@throws[InvalidPartnerActorException] @throws[InvalidPartnerActorException]
def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = { def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = {
@ -219,14 +257,15 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
completedBeforeRemoteConnected match { completedBeforeRemoteConnected match {
case OptionVal.Some(scala.util.Failure(ex)) => case OptionVal.Some(scala.util.Failure(ex)) =>
log.warning( log.warning(
"Stream already terminated with exception before remote side materialized, sending failure: {}", "[{}] Stream already terminated with exception before remote side materialized, sending failure: {}",
stageActorName,
ex) ex)
partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex)) finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex))
setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message)
case OptionVal.Some(scala.util.Success(Done)) => case OptionVal.Some(scala.util.Success(Done)) =>
log.warning("Stream already completed before remote side materialized, failing now.") log.warning("[{}] Stream already completed before remote side materialized, failing now.", stageActorName)
partner ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) partner ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed)
finishedWithAwaitingPartnerTermination = OptionVal(Success(Done)) finishedWithAwaitingPartnerTermination = OptionVal(Success(Done))
setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message)

View file

@ -64,6 +64,29 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
highWatermark - remainingRequested highWatermark - remainingRequested
else 0 else 0
} }
private sealed trait State
private sealed trait WeKnowPartner extends State {
def partner: ActorRef
}
// we are the "origin", and awaiting the other side to start when we'll receive this ref
private case object AwaitingPartner extends State
// we're the "remote" for an already active Source on the other side (the "origin")
private case class AwaitingSubscription(partner: ActorRef) extends WeKnowPartner
// subscription aquired and up and running
private final case class Running(partner: ActorRef) extends WeKnowPartner
// downstream cancelled or failed, waiting for remote upstream to ack
private final case class WaitingForCancelAck(partner: ActorRef, cause: Throwable) extends WeKnowPartner
// upstream completed, we are waiting to allow
private final case class UpstreamCompleted(partner: ActorRef) extends WeKnowPartner
private final case class UpstreamTerminated(partner: ActorRef) extends State
val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey"
val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey"
val TerminationDeadlineTimerKey = "TerminationDeadlineTimerKey"
val CancellationDeadlineTimerKey = "CancellationDeadlineTimerKey"
} }
/** /**
@ -75,8 +98,7 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
@InternalApi @InternalApi
private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef]) private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef])
extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage => extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage =>
import SourceRefStageImpl.ActorRefStage import SourceRefStageImpl._
import SourceRefStageImpl.WatermarkRequestStrategy
val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out") val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out")
override def shape = SourceShape.of(out) override def shape = SourceShape.of(out)
@ -128,16 +150,20 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName() override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName()
private[this] val self: GraphStageLogic.StageActor = private[this] val self: GraphStageLogic.StageActor =
getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive) getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(receiveRemoteMessage)
override val ref: ActorRef = self.ref override val ref: ActorRef = self.ref
private[this] implicit def selfSender: ActorRef = ref private[this] implicit def selfSender: ActorRef = ref
val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey"
val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey"
val TerminationDeadlineTimerKey = "TerminationDeadlineTimerKey"
// demand management --- // demand management ---
private var completed = false private var state: State = initialPartnerRef match {
case OptionVal.Some(ref) =>
// this means we're the "remote" for an already active Source on the other side (the "origin")
self.watch(ref)
AwaitingSubscription(ref)
case OptionVal.None =>
// we are the "origin", and awaiting the other side to start when we'll receive their partherRef
AwaitingPartner
}
private var expectingSeqNr: Long = 0L private var expectingSeqNr: Long = 0L
private var localCumulativeDemand: Long = 0L private var localCumulativeDemand: Long = 0L
@ -145,21 +171,16 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity) private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity)
private val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy( private val requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity)
highWatermark = receiveBuffer.capacity)
// end of demand management --- // end of demand management ---
// initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin")
// null otherwise, in which case we allocated first -- we are the "origin", and awaiting the other side to start when we'll receive this ref
private var partnerRef: OptionVal[ActorRef] = OptionVal.None
private def getPartnerRef = partnerRef.get
override def preStart(): Unit = { override def preStart(): Unit = {
log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref) log.debug(
if (initialPartnerRef.isDefined) // this will set the partnerRef "[{}] Starting up with, self ref: {}, state: {}, subscription timeout: {}",
observeAndValidateSender( stageActorName,
initialPartnerRef.get, self.ref,
"Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.") state,
PrettyDuration.format(subscriptionTimeout.timeout))
// This timer will be cancelled if we receive the handshake from the remote SinkRef // This timer will be cancelled if we receive the handshake from the remote SinkRef
// either created in this method and provided as self.ref as initialPartnerRef // either created in this method and provided as self.ref as initialPartnerRef
@ -172,103 +193,312 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
triggerCumulativeDemand() triggerCumulativeDemand()
} }
def triggerCumulativeDemand(): Unit = { def receiveRemoteMessage: ((ActorRef, Any)) => Unit = {
val i = receiveBuffer.remainingCapacity - localRemainingRequested
if (partnerRef.isDefined && i > 0) {
val addDemand = requestStrategy.requestDemand(receiveBuffer.used + localRemainingRequested)
// only if demand has increased we shoot it right away
// otherwise it's the same demand level, so it'd be triggered via redelivery anyway
if (addDemand > 0) {
localCumulativeDemand += addDemand
localRemainingRequested += addDemand
val demand = StreamRefsProtocol.CumulativeDemand(localCumulativeDemand)
log.debug("[{}] Demanding until [{}] (+{})", stageActorName, localCumulativeDemand, addDemand)
getPartnerRef ! demand
scheduleDemandRedelivery()
}
}
}
def scheduleDemandRedelivery(): Unit =
scheduleOnce(DemandRedeliveryTimerKey, demandRedeliveryInterval)
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case SubscriptionTimeoutTimerKey =>
val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [$ref]," +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
case DemandRedeliveryTimerKey =>
log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand)
getPartnerRef ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand)
scheduleDemandRedelivery()
case TerminationDeadlineTimerKey =>
failStage(RemoteStreamRefActorTerminatedException(
s"Remote partner [$partnerRef] has terminated unexpectedly and no clean completion/failure message was received " +
"(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down."))
}
def initialReceive: ((ActorRef, Any)) => Unit = {
case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) => case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) =>
cancelTimer(SubscriptionTimeoutTimerKey) state match {
observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext") case AwaitingPartner =>
log.debug("[{}] Received handshake {} from {}", stageActorName, msg, sender) cancelTimer(SubscriptionTimeoutTimerKey)
log.debug(
"[{}] Received on subscribe handshake {} while awaiting partner from {}",
stageActorName,
msg,
remoteRef)
state = Running(remoteRef)
self.watch(remoteRef)
triggerCumulativeDemand()
case AwaitingSubscription(partner) =>
verifyPartner(sender, partner)
cancelTimer(SubscriptionTimeoutTimerKey)
log.debug(
"[{}] Received on subscribe handshake {} while awaiting subscription from {}",
stageActorName,
msg,
remoteRef)
state = Running(remoteRef)
triggerCumulativeDemand()
triggerCumulativeDemand() case other =>
throw new IllegalStateException(s"[$stageActorName] Got unexpected $msg in state $other")
}
case (sender, msg @ StreamRefsProtocol.SequencedOnNext(seqNr, payload: Out @unchecked)) => case (sender, msg @ StreamRefsProtocol.SequencedOnNext(seqNr, payload: Out @unchecked)) =>
observeAndValidateSender(sender, "Illegal sender in SequencedOnNext")
observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in SequencedOnNext") observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in SequencedOnNext")
log.debug("[{}] Received seq {} from {}", stageActorName, msg, sender) state match {
case AwaitingSubscription(partner) =>
verifyPartner(sender, partner)
onReceiveElement(payload) log.debug("[{}] Received seq {} from {}", stageActorName, msg, sender)
triggerCumulativeDemand() state = Running(partner)
onReceiveElement(payload)
triggerCumulativeDemand()
case Running(partner) =>
verifyPartner(sender, partner)
onReceiveElement(payload)
triggerCumulativeDemand()
case AwaitingPartner =>
throw new IllegalStateException(s"[$stageActorName] Got $msg from $sender while AwaitingPartner")
case WaitingForCancelAck(partner, _) =>
// awaiting cancellation ack from remote
verifyPartner(sender, partner)
log.warning(
"[{}] Got element from remote but downstream cancelled, dropping element of type {}",
stageActorName,
payload.getClass)
case UpstreamCompleted(partner) =>
verifyPartner(sender, partner)
throw new IllegalStateException(
s"[$stageActorName] Got completion and then received more elements from $sender, this is not supposed to happen.")
case UpstreamTerminated(partner) =>
verifyPartner(sender, partner)
log.debug("[{}] Received element after partner terminated")
onReceiveElement(payload)
}
case (sender, StreamRefsProtocol.RemoteStreamCompleted(seqNr)) => case (sender, StreamRefsProtocol.RemoteStreamCompleted(seqNr)) =>
observeAndValidateSender(sender, "Illegal sender in RemoteSinkCompleted")
observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in RemoteSinkCompleted") observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in RemoteSinkCompleted")
log.debug("[{}] The remote stream has completed, completing as well...", stageActorName) state match {
case Running(partner) =>
self.unwatch(sender) // upstream completed, continue running until we have emitted every element in buffer
completed = true // or downstream cancels
tryPush() verifyPartner(sender, partner)
log.debug(
"[{}] The remote stream has completed, emitting {} elements left in buffer before completing",
stageActorName,
receiveBuffer.used)
self.unwatch(sender)
state = UpstreamCompleted(partner)
tryPush()
case WaitingForCancelAck(_, _) =>
// upstream completed while we were waiting for it to receive cancellation and ack
// upstream may stop without seeing cancellation, but we may not see termination
// let the cancel timeout hit
log.debug("[{}] Upstream completed while waiting for cancel ack", stageActorName)
case other =>
// UpstreamCompleted, AwaitingPartner or AwaitingSubscription(_) all means a bug here
throw new IllegalStateException(
s"[$stageActorName] Saw RemoteStreamCompleted($seqNr) while in state $other, should never happen")
}
case (sender, StreamRefsProtocol.RemoteStreamFailure(reason)) => case (sender, StreamRefsProtocol.RemoteStreamFailure(reason)) =>
observeAndValidateSender(sender, "Illegal sender in RemoteSinkFailure") state match {
log.warning("[{}] The remote stream has failed, failing (reason: {})", stageActorName, reason) case weKnoPartner: WeKnowPartner =>
val partner = weKnoPartner.partner
verifyPartner(sender, partner)
log.debug("[{}] The remote stream has failed, failing (reason: {})", stageActorName, reason)
failStage(
RemoteStreamRefActorTerminatedException(
s"[$stageActorName] Remote stream (${sender.path}) failed, reason: $reason"))
case other =>
throw new IllegalStateException(
s"[$stageActorName] got RemoteStreamFailure($reason) when in state $other, should never happen")
}
self.unwatch(sender) case (sender, StreamRefsProtocol.Ack) =>
failStage(RemoteStreamRefActorTerminatedException(s"Remote stream (${sender.path}) failed, reason: $reason")) state match {
case WaitingForCancelAck(partner, cause) =>
verifyPartner(sender, partner)
log.debug(s"[$stageActorName] Got cancellation ack from remote, canceling", stageActorName)
cancelStage(cause)
case other =>
throw new IllegalStateException(s"[$stageActorName] Got an Ack when in state $other")
}
case (_, Terminated(p)) => case (_, Terminated(p)) =>
partnerRef match { state match {
case OptionVal.Some(`p`) => case weKnowPartner: WeKnowPartner =>
if (weKnowPartner.partner != p)
throw RemoteStreamRefActorTerminatedException(
s"[$stageActorName] Received UNEXPECTED Terminated($p) message! " +
s"This actor was NOT our trusted remote partner, which was: ${weKnowPartner.partner}. Tearing down.")
// we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail // we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail
// will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref // will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref
// the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there. // the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there.
scheduleOnce(TerminationDeadlineTimerKey, finalTerminationSignalDeadline) scheduleOnce(TerminationDeadlineTimerKey, finalTerminationSignalDeadline)
log.debug(
"[{}] Partner terminated, starting delayed shutdown, deadline: [{}]",
stageActorName,
finalTerminationSignalDeadline)
state = UpstreamTerminated(weKnowPartner.partner)
case weDontKnowPartner =>
throw new IllegalStateException(
s"[$stageActorName] Unexpected deathwatch message for $p before we knew partner ref, state $weDontKnowPartner")
case _ =>
// this should not have happened! It should be impossible that we watched some other actor
failStage(
RemoteStreamRefActorTerminatedException(
s"Received UNEXPECTED Terminated($p) message! " +
s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down."))
} }
case (sender, msg) =>
case (_, _) => // keep the compiler happy (stage actor receive is total) // should never happen but keep the compiler happy (stage actor receive is total)
throw new IllegalStateException(s"[$stageActorName] Unexpected message in state $state: $msg from $sender")
} }
def tryPush(): Unit = override protected def onTimer(timerKey: Any): Unit = timerKey match {
case SubscriptionTimeoutTimerKey =>
state match {
case AwaitingPartner | AwaitingSubscription(_) =>
val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [$ref]," +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
case other =>
// this is fine
log.debug("[{}] Ignoring subscription timeout in state [{}]", stageActorName, other)
}
case DemandRedeliveryTimerKey =>
state match {
case Running(ref) =>
log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand)
ref ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand)
scheduleDemandRedelivery()
case other =>
log.debug("[{}] Ignoring demand redelivery timeout in state [{}]", stageActorName, other)
}
case TerminationDeadlineTimerKey =>
state match {
case UpstreamTerminated(partner) =>
log.debug(
"[{}] Remote partner [{}] has terminated unexpectedly and no clean completion/failure message was received",
stageActorName,
partner)
failStage(RemoteStreamRefActorTerminatedException(
s"[$stageActorName] Remote partner [$partner] has terminated unexpectedly and no clean completion/failure message was received " +
"(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down."))
case AwaitingPartner =>
log.debug("[{}] Downstream cancelled, but timeout hit before we saw a partner", stageActorName)
cancelStage(SubscriptionWithCancelException.NoMoreElementsNeeded)
case other =>
throw new IllegalStateException(s"TerminationDeadlineTimerKey can't happen in state $other")
}
case CancellationDeadlineTimerKey =>
state match {
case WaitingForCancelAck(partner, cause) =>
log.debug(
"[{}] Waiting for remote ack from [{}] for downstream failure timed out, failing stage with original downstream failure",
stageActorName,
partner)
cancelStage(cause)
case other =>
throw new IllegalStateException(
s"[$stageActorName] CancellationDeadlineTimerKey can't happen in state $other")
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
state match {
case Running(ref) =>
triggerCancellationExchange(ref, cause)
case AwaitingPartner =>
// we can't do a graceful cancellation dance in this case, wait for partner and then cancel
// or timeout if we never get a partner
scheduleOnce(TerminationDeadlineTimerKey, finalTerminationSignalDeadline)
case AwaitingSubscription(ref) =>
// we didn't get an a first demand yet but have access to the partner - try a cancellation dance
triggerCancellationExchange(ref, cause)
case UpstreamCompleted(_) =>
// we saw upstream complete so let's just complete
if (receiveBuffer.nonEmpty)
log.debug(
"[{}] Downstream cancelled with elements [{}] in buffer, dropping elements",
stageActorName,
receiveBuffer.used)
cause match {
case _: SubscriptionWithCancelException => completeStage()
case failure => failStage(failure)
}
case WaitingForCancelAck(_, _) =>
// downstream can't finish twice
throw new UnsupportedOperationException(
s"[$stageActorName] Didn't expect state $state when downstream finished with $cause")
case UpstreamTerminated(_) =>
log.debug("[{}] Downstream cancelled with elements [{}] in buffer", stageActorName, receiveBuffer.used)
if (receiveBuffer.isEmpty)
failStage(RemoteStreamRefActorTerminatedException(s"[$stageActorName] unexpectedly terminated"))
else
// if there are elements left in the buffer we try to emit those
tryPush()
}
}
private def triggerCancellationExchange(partner: ActorRef, cause: Throwable): Unit = {
if (receiveBuffer.nonEmpty)
log.debug("Downstream cancelled with elements [{}] in buffer, dropping elements", receiveBuffer.used)
val message = cause match {
case _: SubscriptionWithCancelException.NonFailureCancellation =>
log.debug("[{}] Deferred stop on downstream cancel", stageActorName)
StreamRefsProtocol.RemoteStreamCompleted(expectingSeqNr) // seNr not really used in this case
case streamFailure =>
log.debug("[{}] Deferred stop on downstream failure: {}", stageActorName, streamFailure)
StreamRefsProtocol.RemoteStreamFailure("Downstream failed")
}
// sending the cancellation means it is ok for the partner to terminate
// we either get a response or hit a timeout and shutdown
self.unwatch(partner)
partner ! message
state = WaitingForCancelAck(partner, cause)
scheduleOnce(CancellationDeadlineTimerKey, subscriptionTimeout.timeout)
setKeepGoing(true)
}
def triggerCumulativeDemand(): Unit = {
val i = receiveBuffer.remainingCapacity - localRemainingRequested
if (i > 0) {
val addDemand = requestStrategy.requestDemand(receiveBuffer.used + localRemainingRequested)
// only if demand has increased we shoot it right away
// otherwise it's the same demand level, so it'd be triggered via redelivery anyway
if (addDemand > 0) {
def sendDemand(partner: ActorRef): Unit = {
localCumulativeDemand += addDemand
localRemainingRequested += addDemand
val demand = StreamRefsProtocol.CumulativeDemand(localCumulativeDemand)
partner ! demand
scheduleDemandRedelivery()
}
state match {
case Running(partner) =>
log.debug("[{}] Demanding until [{}] (+{})", stageActorName, localCumulativeDemand, addDemand)
sendDemand(partner)
case AwaitingSubscription(partner) =>
log.debug(
"[{}] Demanding, before subscription seen, until [{}] (+{})",
stageActorName,
localCumulativeDemand,
addDemand)
sendDemand(partner)
case other =>
log.debug("[{}] Partner ref not set up in state {}, demanding elements deferred", stageActorName, other)
}
}
}
}
private def tryPush(): Unit =
if (receiveBuffer.nonEmpty && isAvailable(out)) { if (receiveBuffer.nonEmpty && isAvailable(out)) {
val element = receiveBuffer.dequeue() val element = receiveBuffer.dequeue()
push(out, element) push(out, element)
} else if (receiveBuffer.isEmpty && completed) completeStage() } else if (receiveBuffer.isEmpty)
state match {
case UpstreamCompleted(_) => completeStage()
case _ => // all other are ok
}
private def onReceiveElement(payload: Out): Unit = { private def onReceiveElement(payload: Out): Unit = {
localRemainingRequested -= 1 localRemainingRequested -= 1
@ -284,32 +514,30 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
} }
} }
/** @throws InvalidPartnerActorException when partner ref is invalid */ private def verifyPartner(sender: ActorRef, partner: ActorRef): Unit = {
def observeAndValidateSender(partner: ActorRef, msg: String): Unit = if (sender != partner)
partnerRef match { throw InvalidPartnerActorException(
case OptionVal.None => partner,
log.debug("Received first message from {}, assuming it to be the remote partner for this stage", partner) sender,
partnerRef = OptionVal(partner) s"[$stageActorName] Received message from UNEXPECTED sender [$sender]! " +
self.watch(partner) s"This actor is NOT our trusted remote partner, which is [$partner]. Tearing down.")
}
case OptionVal.Some(p) =>
if (partner != p) {
val ex = InvalidPartnerActorException(partner, getPartnerRef, msg)
partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
throw ex
} // else, ref is valid and we don't need to do anything with it
}
/** @throws InvalidSequenceNumberException when sequence number is invalid */ /** @throws InvalidSequenceNumberException when sequence number is invalid */
def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit = private def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit =
if (isInvalidSequenceNr(seqNr)) { if (isInvalidSequenceNr(seqNr)) {
log.warning("[{}] {}, expected {} but was {}", stageActorName, msg, expectingSeqNr, seqNr)
throw InvalidSequenceNumberException(expectingSeqNr, seqNr, msg) throw InvalidSequenceNumberException(expectingSeqNr, seqNr, msg)
} else { } else {
expectingSeqNr += 1 expectingSeqNr += 1
} }
def isInvalidSequenceNr(seqNr: Long): Boolean =
private def isInvalidSequenceNr(seqNr: Long): Boolean =
seqNr != expectingSeqNr seqNr != expectingSeqNr
private def scheduleDemandRedelivery(): Unit =
scheduleOnce(DemandRedeliveryTimerKey, demandRedeliveryInterval)
setHandler(out, this) setHandler(out, this)
} }
(logic, SinkRefImpl(logic.ref)) (logic, SinkRefImpl(logic.ref))

View file

@ -39,13 +39,19 @@ private[akka] object StreamRefsProtocol {
with DeadLetterSuppression with DeadLetterSuppression
/** /**
* INTERNAL API: Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a Failure. * INTERNAL API
*
* Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a Failure.
* Sent to the sender of a stream if receiver downstream failed.
*/ */
@InternalApi @InternalApi
private[akka] final case class RemoteStreamFailure(msg: String) extends StreamRefsProtocol private[akka] final case class RemoteStreamFailure(msg: String) extends StreamRefsProtocol
/** /**
* INTERNAL API: Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a completion. * INTERNAL API
*
* Sent to a the receiver side of a stream ref, once the sending side of the SinkRef gets signalled a completion.
* Sent to the sender of a stream ref if receiver downstream cancelled.
*/ */
@InternalApi @InternalApi
private[akka] final case class RemoteStreamCompleted(seqNr: Long) extends StreamRefsProtocol private[akka] final case class RemoteStreamCompleted(seqNr: Long) extends StreamRefsProtocol
@ -60,4 +66,12 @@ private[akka] object StreamRefsProtocol {
if (seqNr <= 0) throw ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException if (seqNr <= 0) throw ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException
} }
/**
* INTERNAL API
*
* Ack that failure or completion has been seen and the remote side can stop
*/
@InternalApi
private[akka] final case object Ack extends StreamRefsProtocol with DeadLetterSuppression
} }

View file

@ -26,6 +26,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem)
private[this] val SourceRefManifest = "E" private[this] val SourceRefManifest = "E"
private[this] val SinkRefManifest = "F" private[this] val SinkRefManifest = "F"
private[this] val OnSubscribeHandshakeManifest = "G" private[this] val OnSubscribeHandshakeManifest = "G"
private[this] val AckManifest = "H"
override def manifest(o: AnyRef): String = o match { override def manifest(o: AnyRef): String = o match {
// protocol // protocol
@ -41,6 +42,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem)
// case _: MaterializedSourceRef[_] => SourceRefManifest // case _: MaterializedSourceRef[_] => SourceRefManifest
case _: SinkRefImpl[_] => SinkRefManifest case _: SinkRefImpl[_] => SinkRefManifest
// case _: MaterializedSinkRef[_] => SinkRefManifest // case _: MaterializedSinkRef[_] => SinkRefManifest
case StreamRefsProtocol.Ack => AckManifest
} }
override def toBinary(o: AnyRef): Array[Byte] = o match { override def toBinary(o: AnyRef): Array[Byte] = o match {
@ -57,6 +59,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem)
// case ref: MaterializedSinkRef[_] => ??? // serializeSinkRef(ref).toByteArray // case ref: MaterializedSinkRef[_] => ??? // serializeSinkRef(ref).toByteArray
case ref: SourceRefImpl[_] => serializeSourceRef(ref).toByteArray case ref: SourceRefImpl[_] => serializeSourceRef(ref).toByteArray
// case ref: MaterializedSourceRef[_] => serializeSourceRef(ref.).toByteArray // case ref: MaterializedSourceRef[_] => serializeSourceRef(ref.).toByteArray
case StreamRefsProtocol.Ack => Array.emptyByteArray
} }
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
@ -69,6 +72,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem)
// refs // refs
case SinkRefManifest => deserializeSinkRef(bytes) case SinkRefManifest => deserializeSinkRef(bytes)
case SourceRefManifest => deserializeSourceRef(bytes) case SourceRefManifest => deserializeSourceRef(bytes)
case AckManifest => StreamRefsProtocol.Ack
} }
// ----- // -----