From 82c761f026ea54632b5af18d61b995755fc347ef Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 2 May 2019 16:54:37 +0200 Subject: [PATCH] remove Future from StreamRefs mat val, #24372 (#26847) --- .../akka/stream/SourceRefBenchmark.scala | 2 +- .../scala/akka/cluster/StreamRefSpec.scala | 13 ++--- .../test/scala/akka/cluster/ClusterSpec.scala | 2 +- .../project/migration-guide-2.5.x-2.6.x.md | 7 +++ .../src/main/paradox/stream/stream-refs.md | 3 +- .../jdocs/stream/FlowStreamRefsDocTest.java | 13 ++--- .../docs/stream/FlowStreamRefsDocSpec.scala | 14 ++--- .../akka/stream/scaladsl/StreamRefsSpec.scala | 27 ++++----- .../mima-filters/2.5.x.backwards.excludes | 5 +- .../akka/stream/impl/ActorRefSource.scala | 2 +- .../stream/impl/streamref/SinkRefImpl.scala | 45 +++++++++------ .../stream/impl/streamref/SourceRefImpl.scala | 56 +++++++++++-------- .../akka/stream/javadsl/StreamRefs.scala | 19 ++----- .../akka/stream/scaladsl/StreamRefs.scala | 18 ++---- 14 files changed, 109 insertions(+), 117 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala index cb00eebcf3..f4c0099f39 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala @@ -47,7 +47,7 @@ class SourceRefBenchmark { @Setup(Level.Invocation) def setup(): Unit = { - sourceRef = Await.result(Source.fromGraph(new BenchTestSource(100000)).runWith(StreamRefs.sourceRef()), 10.seconds) + sourceRef = Source.fromGraph(new BenchTestSource(100000)).runWith(StreamRefs.sourceRef()) } @TearDown diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala index 7496db4c18..7adfaeb297 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala @@ -15,7 +15,6 @@ import akka.actor.ActorIdentity import akka.actor.ActorRef import akka.actor.Identify import akka.actor.Props -import akka.pattern.pipe import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction @@ -59,7 +58,7 @@ object StreamRefSpec extends MultiNodeConfig { def receive = { case RequestLogs(streamId) => // materialize the SourceRef: - val (done: Future[Done], ref: Future[SourceRef[String]]) = + val (done: Future[Done], ref: SourceRef[String]) = Source .fromIterator(() => Iterator.from(1)) .map(n => s"elem-$n") @@ -77,10 +76,10 @@ object StreamRefSpec extends MultiNodeConfig { } // wrap the SourceRef in some domain message, such that the sender knows what source it is - val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _)) + val reply = LogsOffer(streamId, ref) // reply to sender - reply.pipeTo(sender()) + sender() ! reply } } @@ -101,7 +100,7 @@ object StreamRefSpec extends MultiNodeConfig { def receive = { case PrepareUpload(nodeId) => // materialize the SinkRef (the remote is like a source of data for us): - val (ref: Future[SinkRef[String]], done: Future[Done]) = + val (ref: SinkRef[String], done: Future[Done]) = StreamRefs .sinkRef[String]() .throttle(1, 1.second) @@ -118,10 +117,10 @@ object StreamRefSpec extends MultiNodeConfig { } // wrap the SinkRef in some domain message, such that the sender knows what source it is - val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _)) + val reply = MeasurementsSinkReady(nodeId, ref) // reply to sender - reply.pipeTo(sender()) + sender() ! reply } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 7d92e7fc24..0bf336e94d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -193,7 +193,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { Cluster(sys2).join(Cluster(sys2).selfAddress) probe.expectMsgType[MemberUp] val mat = ActorMaterializer()(sys2) - val sink = Await.result(StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat), 10.seconds) + val sink = StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat) Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat) CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 71c2006a78..cf6186dcc0 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -103,6 +103,13 @@ Classic remoting over UDP has been deprecated since `2.5.0` and now has been rem To continue to use UDP configure @ref[Artery UDP](../remoting-artery.md#configuring-ssl-tls-for-akka-remoting) or migrate to Artery TCP. A full cluster restart is required to change to Artery. +## Streams + +### StreamRefs + +The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in +`Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern. + ## Cluster Sharding ### Passivate idle entity diff --git a/akka-docs/src/main/paradox/stream/stream-refs.md b/akka-docs/src/main/paradox/stream/stream-refs.md index e80ac06f8b..db5f6236fb 100644 --- a/akka-docs/src/main/paradox/stream/stream-refs.md +++ b/akka-docs/src/main/paradox/stream/stream-refs.md @@ -77,8 +77,7 @@ can be offered to a remote actor system in order for it to consume some source o locally. In order to share a `Source` with a remote endpoint you need to materialize it by running it into the `Sink.sourceRef`. -That `Sink` materializes the `SourceRef` that you can then send to other nodes. Please note that it materializes into a -`Future` so you will have to use `pipeTo`. +That `Sink` materializes the `SourceRef` that you can then send to other nodes. Scala : @@snip [FlowStreamRefsDocSpec.scala](/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source } diff --git a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java index 9ff432412b..a7603fdb33 100644 --- a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java @@ -55,10 +55,9 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { private void handleRequestLogs(RequestLogs requestLogs) { Source logs = streamLogs(requestLogs.streamId); - CompletionStage> logsRef = logs.runWith(StreamRefs.sourceRef(), mat); + SourceRef logsRef = logs.runWith(StreamRefs.sourceRef(), mat); - Patterns.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher()) - .to(sender()); + getSender().tell(new LogsOffer(logsRef), getSelf()); } private Source streamLogs(long streamId) { @@ -111,13 +110,9 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { PrepareUpload.class, prepare -> { Sink sink = logsSinkFor(prepare.id); - CompletionStage> sinkRef = - StreamRefs.sinkRef().to(sink).run(mat); + SinkRef sinkRef = StreamRefs.sinkRef().to(sink).run(mat); - Patterns.pipe( - sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), - context().dispatcher()) - .to(sender()); + getSender().tell(new MeasurementsSinkReady(prepare.id, sinkRef), getSelf()); }) .build(); } diff --git a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala index 2988a38e76..8bc2709e4d 100644 --- a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala @@ -23,7 +23,6 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) class DataSource extends Actor { - import context.dispatcher implicit val mat = ActorMaterializer()(context) def receive = { @@ -32,13 +31,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { val source: Source[String, NotUsed] = streamLogs(streamId) // materialize the SourceRef: - val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef()) + val ref: SourceRef[String] = source.runWith(StreamRefs.sourceRef()) // wrap the SourceRef in some domain message, such that the sender knows what source it is - val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _)) + val reply = LogsOffer(streamId, ref) // reply to sender - reply.pipeTo(sender()) + sender() ! reply } def streamLogs(streamId: Long): Source[String, NotUsed] = ??? @@ -70,7 +69,6 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { class DataReceiver extends Actor { - import context.dispatcher implicit val mat = ActorMaterializer()(context) def receive = { @@ -79,13 +77,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { val sink: Sink[String, NotUsed] = logsSinkFor(nodeId) // materialize the SinkRef (the remote is like a source of data for us): - val ref: Future[SinkRef[String]] = StreamRefs.sinkRef[String]().to(sink).run() + val ref: SinkRef[String] = StreamRefs.sinkRef[String]().to(sink).run() // wrap the SinkRef in some domain message, such that the sender knows what source it is - val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _)) + val reply = MeasurementsSinkReady(nodeId, ref) // reply to sender - reply.pipeTo(sender()) + sender() ! reply } def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ??? diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 2697499b9f..b4705bd297 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -39,25 +39,23 @@ object StreamRefsSpec { * For them it's a Source; for us it is a Sink we run data "into" */ val source: Source[String, NotUsed] = Source(List("hello", "world")) - val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef()) + val ref: SourceRef[String] = source.runWith(StreamRefs.sourceRef()) - ref.pipeTo(sender()) + sender() ! ref case "give-infinite" => val source: Source[String, NotUsed] = Source.fromIterator(() => Iterator.from(1)).map("ping-" + _) - val (r: NotUsed, ref: Future[SourceRef[String]]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run() + val (r: NotUsed, ref: SourceRef[String]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run() - ref.pipeTo(sender()) + sender() ! ref case "give-fail" => val ref = Source.failed[String](new Exception("Booooom!") with NoStackTrace).runWith(StreamRefs.sourceRef()) - - ref.pipeTo(sender()) + sender() ! ref case "give-complete-asap" => val ref = Source.empty.runWith(StreamRefs.sourceRef()) - - ref.pipeTo(sender()) + sender() ! ref case "give-subscribe-timeout" => val ref = Source @@ -65,8 +63,8 @@ object StreamRefsSpec { .toMat(StreamRefs.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .run() + sender() ! ref - ref.pipeTo(sender()) // case "send-bulk" => // /* // * Here we're able to send a source to a remote recipient @@ -86,14 +84,12 @@ object StreamRefsSpec { */ val sink = StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "")).run() - - sink.pipeTo(sender()) + sender() ! sink case "receive-ignore" => val sink = StreamRefs.sinkRef[String]().to(Sink.ignore).run() - - sink.pipeTo(sender()) + sender() ! sink case "receive-subscribe-timeout" => val sink = StreamRefs @@ -101,8 +97,7 @@ object StreamRefsSpec { .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .to(Sink.actorRef(probe, "")) .run() - - sink.pipeTo(sender()) + sender() ! sink case "receive-32" => val (sink, driver) = StreamRefs.sinkRef[String]().toMat(TestSink.probe(context.system))(Keep.both).run() @@ -120,7 +115,7 @@ object StreamRefsSpec { "" }.pipeTo(probe) - sink.pipeTo(sender()) + sender() ! sink // case "receive-bulk" => // /* diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 5698fa3a3d..c8214c2b69 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -60,4 +60,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestGraph") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatest") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith") \ No newline at end of file +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith") + +# #24372 No Future/CompletionStage in StreamRefs +# FIXME why was change not detected? diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala index f9507c33f9..b0330e09c6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -51,7 +51,7 @@ private object ActorRefSource { override protected def stageActorName: String = inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) - val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { + override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { case (_, PoisonPill) => log.warning("for backwards compatibility: PoisonPill will not be supported in the future") completeStage() diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 2128264a10..6822cd4583 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -14,7 +14,6 @@ import akka.stream.scaladsl.Sink import akka.stream.stage._ import akka.util.{ OptionVal, PrettyDuration } -import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success, Try } /** INTERNAL API: Implementation class, not intended to be touched directly by end-users */ @@ -24,6 +23,13 @@ private[stream] final case class SinkRefImpl[In](initialPartnerRef: ActorRef) ex Sink.fromGraph(new SinkRefStageImpl[In](OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed) } +/** + * INTERNAL API + */ +@InternalApi private[stream] object SinkRefStageImpl { + private sealed trait ActorRefStage { def ref: ActorRef } +} + /** * INTERNAL API: Actual operator implementation backing [[SinkRef]]s. * @@ -32,7 +38,8 @@ private[stream] final case class SinkRefImpl[In](initialPartnerRef: ActorRef) ex */ @InternalApi private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartnerRef: OptionVal[ActorRef]) - extends GraphStageWithMaterializedValue[SinkShape[In], Future[SourceRef[In]]] { + extends GraphStageWithMaterializedValue[SinkShape[In], SourceRef[In]] { + import SinkRefStageImpl.ActorRefStage val in: Inlet[In] = Inlet[In](s"${Logging.simpleName(getClass)}($initialRefName).in") override def shape: SinkShape[In] = SinkShape.of(in) @@ -43,24 +50,30 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn case OptionVal.None => "" } - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val promise = Promise[SourceRefImpl[In]] + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SourceRef[In]) = + throw new IllegalStateException("Not supported") - val logic = new TimerGraphStageLogic(shape) with StageLogging with InHandler { + private[akka] override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes, + eagerMaterializer: Materializer): (GraphStageLogic, SourceRef[In]) = { - private[this] lazy val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(materializer).system) + val logic = new TimerGraphStageLogic(shape) with StageLogging with ActorRefStage with InHandler { + + private[this] val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(eagerMaterializer).system) // settings --- import StreamRefAttributes._ - private[this] lazy val settings = ActorMaterializerHelper.downcast(materializer).settings.streamRefSettings + private[this] val settings = ActorMaterializerHelper.downcast(eagerMaterializer).settings.streamRefSettings - private[this] lazy val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout]( + private[this] val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout]( SubscriptionTimeout(settings.subscriptionTimeout)) // end of settings --- - override protected lazy val stageActorName: String = streamRefsMaster.nextSinkRefStageName() - private[this] var self: GraphStageLogic.StageActor = _ - implicit def selfSender: ActorRef = self.ref + override protected val stageActorName: String = streamRefsMaster.nextSinkRefStageName() + private[this] val self: GraphStageLogic.StageActor = + getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive) + override val ref: ActorRef = self.ref + implicit def selfSender: ActorRef = ref private var partnerRef: OptionVal[ActorRef] = OptionVal.None private def getPartnerRef: ActorRef = @@ -84,8 +97,6 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None override def preStart(): Unit = { - self = getStageActor(initialReceive) - initialPartnerRef match { case OptionVal.Some(ref) => // this will set the `partnerRef` @@ -104,11 +115,9 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn "Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}", initialPartnerRef, self.ref) - - promise.success(SourceRefImpl(self.ref)) } - lazy val initialReceive: ((ActorRef, Any)) => Unit = { + def initialReceive: ((ActorRef, Any)) => Unit = { case (_, Terminated(ref)) => if (ref == getPartnerRef) finishedWithAwaitingPartnerTermination match { @@ -156,7 +165,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn case SubscriptionTimeoutTimerKey => val ex = StreamRefSubscriptionTimeoutException( // we know the future has been competed by now, since it is in preStart - s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " + + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [$ref], " + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") throw ex @@ -231,7 +240,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn setHandler(in, this) } - (logic, promise.future) + (logic, SourceRefImpl(logic.ref)) } override def toString = s"${Logging.simpleName(getClass)}($initialRefName)" diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 00be15872a..7187fc1c14 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -15,8 +15,6 @@ import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.util.{ OptionVal, PrettyDuration } -import scala.concurrent.{ Future, Promise } - /** INTERNAL API: Implementation class, not intended to be touched directly by end-users */ @InternalApi private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) extends SourceRef[T] { @@ -24,6 +22,13 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e Source.fromGraph(new SourceRefStageImpl(OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed) } +/** + * INTERNAL API + */ +@InternalApi private[stream] object SourceRefStageImpl { + private sealed trait ActorRefStage { def ref: ActorRef } +} + /** * INTERNAL API: Actual operator implementation backing [[SourceRef]]s. * @@ -32,7 +37,8 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e */ @InternalApi private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef]) - extends GraphStageWithMaterializedValue[SourceShape[Out], Future[SinkRef[Out]]] { stage => + extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage => + import SourceRefStageImpl.ActorRefStage val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out") override def shape = SourceShape.of(out) @@ -43,24 +49,29 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio case _ => "" } - override def createLogicAndMaterializedValue( - inheritedAttributes: Attributes): (GraphStageLogic, Future[SinkRef[Out]]) = { - val promise = Promise[SinkRefImpl[Out]]() + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SinkRef[Out]) = + throw new IllegalStateException("Not supported") - val logic = new TimerGraphStageLogic(shape) with StageLogging with OutHandler { - private[this] lazy val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(materializer).system) + private[akka] override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes, + eagerMaterializer: Materializer): (GraphStageLogic, SinkRef[Out]) = { + + val logic = new TimerGraphStageLogic(shape) with StageLogging with ActorRefStage with OutHandler { + private[this] val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(eagerMaterializer).system) // settings --- import StreamRefAttributes._ - private[this] lazy val settings = ActorMaterializerHelper.downcast(materializer).settings.streamRefSettings + private[this] val settings = ActorMaterializerHelper.downcast(eagerMaterializer).settings.streamRefSettings - private[this] lazy val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout]( + private[this] val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout]( SubscriptionTimeout(settings.subscriptionTimeout)) // end of settings --- - override protected lazy val stageActorName: String = streamRefsMaster.nextSourceRefStageName() - private[this] var self: GraphStageLogic.StageActor = _ - private[this] implicit def selfSender: ActorRef = self.ref + override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName() + private[this] val self: GraphStageLogic.StageActor = + getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive) + override val ref: ActorRef = self.ref + private[this] implicit def selfSender: ActorRef = ref val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey" val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey" @@ -88,15 +99,12 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio receiveBuffer = FixedSizeBuffer[Out](settings.bufferCapacity) requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) - self = getStageActor(initialReceive) log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref) if (initialPartnerRef.isDefined) // this will set the partnerRef observeAndValidateSender( initialPartnerRef.get, "Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.") - promise.success(SinkRefImpl(self.ref)) - //this timer will be cancelled if we receive the handshake from the remote SinkRef // either created in this method and provided as self.ref as initialPartnerRef // or as the response to first CumulativeDemand request sent to remote SinkRef @@ -133,7 +141,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio case SubscriptionTimeoutTimerKey => val ex = StreamRefSubscriptionTimeoutException( // we know the future has been competed by now, since it is in preStart - s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," + + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [$ref]," + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" @@ -149,7 +157,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio "(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.")) } - lazy val initialReceive: ((ActorRef, Any)) => Unit = { + def initialReceive: ((ActorRef, Any)) => Unit = { case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) => cancelTimer(SubscriptionTimeoutTimerKey) observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext") @@ -181,9 +189,9 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio self.unwatch(sender) failStage(RemoteStreamRefActorTerminatedException(s"Remote stream (${sender.path}) failed, reason: $reason")) - case (_, Terminated(ref)) => + case (_, Terminated(p)) => partnerRef match { - case OptionVal.Some(`ref`) => + case OptionVal.Some(`p`) => // we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail // will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref // the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there. @@ -193,7 +201,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio // this should not have happened! It should be impossible that we watched some other actor failStage( RemoteStreamRefActorTerminatedException( - s"Received UNEXPECTED Terminated($ref) message! " + + s"Received UNEXPECTED Terminated($p) message! " + s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down.")) } @@ -228,8 +236,8 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio partnerRef = OptionVal(partner) self.watch(partner) - case OptionVal.Some(ref) => - if (partner != ref) { + case OptionVal.Some(p) => + if (partner != p) { val ex = InvalidPartnerActorException(partner, getPartnerRef, msg) partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) throw ex @@ -248,7 +256,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio setHandler(out, this) } - (logic, promise.future) + (logic, SinkRefImpl(logic.ref)) } override def toString: String = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala index 596f384f56..147ab60104 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala @@ -4,21 +4,12 @@ package akka.stream.javadsl -import java.util.concurrent.CompletionStage - -import akka.annotation.ApiMayChange import akka.stream._ /** - * API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value - * will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked - * as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details. - * * Factories for creating stream refs. */ -@ApiMayChange object StreamRefs { - import scala.compat.java8.FutureConverters._ /** * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), @@ -28,9 +19,8 @@ object StreamRefs { * * See more detailed documentation on [[SourceRef]]. */ - @ApiMayChange - def sourceRef[T](): javadsl.Sink[T, CompletionStage[SourceRef[T]]] = - scaladsl.StreamRefs.sourceRef[T]().mapMaterializedValue(_.toJava).asJava + def sourceRef[T](): javadsl.Sink[T, SourceRef[T]] = + scaladsl.StreamRefs.sourceRef[T]().asJava /** * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), @@ -40,8 +30,7 @@ object StreamRefs { * * See more detailed documentation on [[SinkRef]]. */ - @ApiMayChange - def sinkRef[T](): javadsl.Source[T, CompletionStage[SinkRef[T]]] = - scaladsl.StreamRefs.sinkRef[T]().mapMaterializedValue(_.toJava).asJava + def sinkRef[T](): javadsl.Source[T, SinkRef[T]] = + scaladsl.StreamRefs.sinkRef[T]().asJava } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala index 81a2d3f159..70caec9d83 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala @@ -4,44 +4,34 @@ package akka.stream.scaladsl -import akka.annotation.ApiMayChange import akka.stream.{ SinkRef, SourceRef } import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl } import akka.util.OptionVal -import scala.concurrent.Future - /** - * API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value - * will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked - * as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details. - * * Factories for creating stream refs. */ -@ApiMayChange object StreamRefs { /** * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), * to consume data from this local stream, as if they were attached directly in place of the local Sink. * - * Adheres to [[StreamRefAttributes]]. + * Adheres to [[akka.stream.StreamRefAttributes]]. * * See more detailed documentation on [[SourceRef]]. */ - @ApiMayChange - def sourceRef[T](): Sink[T, Future[SourceRef[T]]] = + def sourceRef[T](): Sink[T, SourceRef[T]] = Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None)) /** * A local [[Source]] which materializes a [[SinkRef]] which can be used by other streams (including remote ones), * to publish data to this local stream, as if they were attached directly in place of the local Source. * - * Adheres to [[StreamRefAttributes]]. + * Adheres to [[akka.stream.StreamRefAttributes]]. * * See more detailed documentation on [[SinkRef]]. */ - @ApiMayChange - def sinkRef[T](): Source[T, Future[SinkRef[T]]] = + def sinkRef[T](): Source[T, SinkRef[T]] = Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None)) }