remove Future from StreamRefs mat val, #24372 (#26847)

This commit is contained in:
Patrik Nordwall 2019-05-02 16:54:37 +02:00 committed by GitHub
parent 1128024797
commit 82c761f026
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 109 additions and 117 deletions

View file

@ -47,7 +47,7 @@ class SourceRefBenchmark {
@Setup(Level.Invocation) @Setup(Level.Invocation)
def setup(): Unit = { def setup(): Unit = {
sourceRef = Await.result(Source.fromGraph(new BenchTestSource(100000)).runWith(StreamRefs.sourceRef()), 10.seconds) sourceRef = Source.fromGraph(new BenchTestSource(100000)).runWith(StreamRefs.sourceRef())
} }
@TearDown @TearDown

View file

@ -15,7 +15,6 @@ import akka.actor.ActorIdentity
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Identify import akka.actor.Identify
import akka.actor.Props import akka.actor.Props
import akka.pattern.pipe
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.Direction
@ -59,7 +58,7 @@ object StreamRefSpec extends MultiNodeConfig {
def receive = { def receive = {
case RequestLogs(streamId) => case RequestLogs(streamId) =>
// materialize the SourceRef: // materialize the SourceRef:
val (done: Future[Done], ref: Future[SourceRef[String]]) = val (done: Future[Done], ref: SourceRef[String]) =
Source Source
.fromIterator(() => Iterator.from(1)) .fromIterator(() => Iterator.from(1))
.map(n => s"elem-$n") .map(n => s"elem-$n")
@ -77,10 +76,10 @@ object StreamRefSpec extends MultiNodeConfig {
} }
// wrap the SourceRef in some domain message, such that the sender knows what source it is // wrap the SourceRef in some domain message, such that the sender knows what source it is
val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _)) val reply = LogsOffer(streamId, ref)
// reply to sender // reply to sender
reply.pipeTo(sender()) sender() ! reply
} }
} }
@ -101,7 +100,7 @@ object StreamRefSpec extends MultiNodeConfig {
def receive = { def receive = {
case PrepareUpload(nodeId) => case PrepareUpload(nodeId) =>
// materialize the SinkRef (the remote is like a source of data for us): // materialize the SinkRef (the remote is like a source of data for us):
val (ref: Future[SinkRef[String]], done: Future[Done]) = val (ref: SinkRef[String], done: Future[Done]) =
StreamRefs StreamRefs
.sinkRef[String]() .sinkRef[String]()
.throttle(1, 1.second) .throttle(1, 1.second)
@ -118,10 +117,10 @@ object StreamRefSpec extends MultiNodeConfig {
} }
// wrap the SinkRef in some domain message, such that the sender knows what source it is // wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _)) val reply = MeasurementsSinkReady(nodeId, ref)
// reply to sender // reply to sender
reply.pipeTo(sender()) sender() ! reply
} }
} }

View file

@ -193,7 +193,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
Cluster(sys2).join(Cluster(sys2).selfAddress) Cluster(sys2).join(Cluster(sys2).selfAddress)
probe.expectMsgType[MemberUp] probe.expectMsgType[MemberUp]
val mat = ActorMaterializer()(sys2) val mat = ActorMaterializer()(sys2)
val sink = Await.result(StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat), 10.seconds) val sink = StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat)
Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat) Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat)
CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason)

View file

@ -103,6 +103,13 @@ Classic remoting over UDP has been deprecated since `2.5.0` and now has been rem
To continue to use UDP configure @ref[Artery UDP](../remoting-artery.md#configuring-ssl-tls-for-akka-remoting) or migrate to Artery TCP. To continue to use UDP configure @ref[Artery UDP](../remoting-artery.md#configuring-ssl-tls-for-akka-remoting) or migrate to Artery TCP.
A full cluster restart is required to change to Artery. A full cluster restart is required to change to Artery.
## Streams
### StreamRefs
The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in
`Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern.
## Cluster Sharding ## Cluster Sharding
### Passivate idle entity ### Passivate idle entity

View file

@ -77,8 +77,7 @@ can be offered to a remote actor system in order for it to consume some source o
locally. locally.
In order to share a `Source` with a remote endpoint you need to materialize it by running it into the `Sink.sourceRef`. In order to share a `Source` with a remote endpoint you need to materialize it by running it into the `Sink.sourceRef`.
That `Sink` materializes the `SourceRef` that you can then send to other nodes. Please note that it materializes into a That `Sink` materializes the `SourceRef` that you can then send to other nodes.
`Future` so you will have to use `pipeTo`.
Scala Scala
: @@snip [FlowStreamRefsDocSpec.scala](/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source } : @@snip [FlowStreamRefsDocSpec.scala](/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source }

View file

@ -55,10 +55,9 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
private void handleRequestLogs(RequestLogs requestLogs) { private void handleRequestLogs(RequestLogs requestLogs) {
Source<String, NotUsed> logs = streamLogs(requestLogs.streamId); Source<String, NotUsed> logs = streamLogs(requestLogs.streamId);
CompletionStage<SourceRef<String>> logsRef = logs.runWith(StreamRefs.sourceRef(), mat); SourceRef<String> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
Patterns.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher()) getSender().tell(new LogsOffer(logsRef), getSelf());
.to(sender());
} }
private Source<String, NotUsed> streamLogs(long streamId) { private Source<String, NotUsed> streamLogs(long streamId) {
@ -111,13 +110,9 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
PrepareUpload.class, PrepareUpload.class,
prepare -> { prepare -> {
Sink<String, NotUsed> sink = logsSinkFor(prepare.id); Sink<String, NotUsed> sink = logsSinkFor(prepare.id);
CompletionStage<SinkRef<String>> sinkRef = SinkRef<String> sinkRef = StreamRefs.<String>sinkRef().to(sink).run(mat);
StreamRefs.<String>sinkRef().to(sink).run(mat);
Patterns.pipe( getSender().tell(new MeasurementsSinkReady(prepare.id, sinkRef), getSelf());
sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)),
context().dispatcher())
.to(sender());
}) })
.build(); .build();
} }

View file

@ -23,7 +23,6 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) case class LogsOffer(streamId: Int, sourceRef: SourceRef[String])
class DataSource extends Actor { class DataSource extends Actor {
import context.dispatcher
implicit val mat = ActorMaterializer()(context) implicit val mat = ActorMaterializer()(context)
def receive = { def receive = {
@ -32,13 +31,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
val source: Source[String, NotUsed] = streamLogs(streamId) val source: Source[String, NotUsed] = streamLogs(streamId)
// materialize the SourceRef: // materialize the SourceRef:
val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef()) val ref: SourceRef[String] = source.runWith(StreamRefs.sourceRef())
// wrap the SourceRef in some domain message, such that the sender knows what source it is // wrap the SourceRef in some domain message, such that the sender knows what source it is
val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _)) val reply = LogsOffer(streamId, ref)
// reply to sender // reply to sender
reply.pipeTo(sender()) sender() ! reply
} }
def streamLogs(streamId: Long): Source[String, NotUsed] = ??? def streamLogs(streamId: Long): Source[String, NotUsed] = ???
@ -70,7 +69,6 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
class DataReceiver extends Actor { class DataReceiver extends Actor {
import context.dispatcher
implicit val mat = ActorMaterializer()(context) implicit val mat = ActorMaterializer()(context)
def receive = { def receive = {
@ -79,13 +77,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
val sink: Sink[String, NotUsed] = logsSinkFor(nodeId) val sink: Sink[String, NotUsed] = logsSinkFor(nodeId)
// materialize the SinkRef (the remote is like a source of data for us): // materialize the SinkRef (the remote is like a source of data for us):
val ref: Future[SinkRef[String]] = StreamRefs.sinkRef[String]().to(sink).run() val ref: SinkRef[String] = StreamRefs.sinkRef[String]().to(sink).run()
// wrap the SinkRef in some domain message, such that the sender knows what source it is // wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _)) val reply = MeasurementsSinkReady(nodeId, ref)
// reply to sender // reply to sender
reply.pipeTo(sender()) sender() ! reply
} }
def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ??? def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ???

View file

@ -39,25 +39,23 @@ object StreamRefsSpec {
* For them it's a Source; for us it is a Sink we run data "into" * For them it's a Source; for us it is a Sink we run data "into"
*/ */
val source: Source[String, NotUsed] = Source(List("hello", "world")) val source: Source[String, NotUsed] = Source(List("hello", "world"))
val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef()) val ref: SourceRef[String] = source.runWith(StreamRefs.sourceRef())
ref.pipeTo(sender()) sender() ! ref
case "give-infinite" => case "give-infinite" =>
val source: Source[String, NotUsed] = Source.fromIterator(() => Iterator.from(1)).map("ping-" + _) val source: Source[String, NotUsed] = Source.fromIterator(() => Iterator.from(1)).map("ping-" + _)
val (r: NotUsed, ref: Future[SourceRef[String]]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run() val (r: NotUsed, ref: SourceRef[String]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run()
ref.pipeTo(sender()) sender() ! ref
case "give-fail" => case "give-fail" =>
val ref = Source.failed[String](new Exception("Booooom!") with NoStackTrace).runWith(StreamRefs.sourceRef()) val ref = Source.failed[String](new Exception("Booooom!") with NoStackTrace).runWith(StreamRefs.sourceRef())
sender() ! ref
ref.pipeTo(sender())
case "give-complete-asap" => case "give-complete-asap" =>
val ref = Source.empty.runWith(StreamRefs.sourceRef()) val ref = Source.empty.runWith(StreamRefs.sourceRef())
sender() ! ref
ref.pipeTo(sender())
case "give-subscribe-timeout" => case "give-subscribe-timeout" =>
val ref = Source val ref = Source
@ -65,8 +63,8 @@ object StreamRefsSpec {
.toMat(StreamRefs.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef .toMat(StreamRefs.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
.run() .run()
sender() ! ref
ref.pipeTo(sender())
// case "send-bulk" => // case "send-bulk" =>
// /* // /*
// * Here we're able to send a source to a remote recipient // * Here we're able to send a source to a remote recipient
@ -86,14 +84,12 @@ object StreamRefsSpec {
*/ */
val sink = val sink =
StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "<COMPLETE>")).run() StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "<COMPLETE>")).run()
sender() ! sink
sink.pipeTo(sender())
case "receive-ignore" => case "receive-ignore" =>
val sink = val sink =
StreamRefs.sinkRef[String]().to(Sink.ignore).run() StreamRefs.sinkRef[String]().to(Sink.ignore).run()
sender() ! sink
sink.pipeTo(sender())
case "receive-subscribe-timeout" => case "receive-subscribe-timeout" =>
val sink = StreamRefs val sink = StreamRefs
@ -101,8 +97,7 @@ object StreamRefsSpec {
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
.to(Sink.actorRef(probe, "<COMPLETE>")) .to(Sink.actorRef(probe, "<COMPLETE>"))
.run() .run()
sender() ! sink
sink.pipeTo(sender())
case "receive-32" => case "receive-32" =>
val (sink, driver) = StreamRefs.sinkRef[String]().toMat(TestSink.probe(context.system))(Keep.both).run() val (sink, driver) = StreamRefs.sinkRef[String]().toMat(TestSink.probe(context.system))(Keep.both).run()
@ -120,7 +115,7 @@ object StreamRefsSpec {
"<COMPLETED>" "<COMPLETED>"
}.pipeTo(probe) }.pipeTo(probe)
sink.pipeTo(sender()) sender() ! sink
// case "receive-bulk" => // case "receive-bulk" =>
// /* // /*

View file

@ -60,4 +60,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestGraph") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestGraph")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatest") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatest")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith")
# #24372 No Future/CompletionStage in StreamRefs
# FIXME why was change not detected?

View file

@ -51,7 +51,7 @@ private object ActorRefSource {
override protected def stageActorName: String = override protected def stageActorName: String =
inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName)
val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) {
case (_, PoisonPill) => case (_, PoisonPill) =>
log.warning("for backwards compatibility: PoisonPill will not be supported in the future") log.warning("for backwards compatibility: PoisonPill will not be supported in the future")
completeStage() completeStage()

View file

@ -14,7 +14,6 @@ import akka.stream.scaladsl.Sink
import akka.stream.stage._ import akka.stream.stage._
import akka.util.{ OptionVal, PrettyDuration } import akka.util.{ OptionVal, PrettyDuration }
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
/** INTERNAL API: Implementation class, not intended to be touched directly by end-users */ /** INTERNAL API: Implementation class, not intended to be touched directly by end-users */
@ -24,6 +23,13 @@ private[stream] final case class SinkRefImpl[In](initialPartnerRef: ActorRef) ex
Sink.fromGraph(new SinkRefStageImpl[In](OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed) Sink.fromGraph(new SinkRefStageImpl[In](OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed)
} }
/**
* INTERNAL API
*/
@InternalApi private[stream] object SinkRefStageImpl {
private sealed trait ActorRefStage { def ref: ActorRef }
}
/** /**
* INTERNAL API: Actual operator implementation backing [[SinkRef]]s. * INTERNAL API: Actual operator implementation backing [[SinkRef]]s.
* *
@ -32,7 +38,8 @@ private[stream] final case class SinkRefImpl[In](initialPartnerRef: ActorRef) ex
*/ */
@InternalApi @InternalApi
private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartnerRef: OptionVal[ActorRef]) private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartnerRef: OptionVal[ActorRef])
extends GraphStageWithMaterializedValue[SinkShape[In], Future[SourceRef[In]]] { extends GraphStageWithMaterializedValue[SinkShape[In], SourceRef[In]] {
import SinkRefStageImpl.ActorRefStage
val in: Inlet[In] = Inlet[In](s"${Logging.simpleName(getClass)}($initialRefName).in") val in: Inlet[In] = Inlet[In](s"${Logging.simpleName(getClass)}($initialRefName).in")
override def shape: SinkShape[In] = SinkShape.of(in) override def shape: SinkShape[In] = SinkShape.of(in)
@ -43,24 +50,30 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
case OptionVal.None => "<no-initial-ref>" case OptionVal.None => "<no-initial-ref>"
} }
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SourceRef[In]) =
val promise = Promise[SourceRefImpl[In]] throw new IllegalStateException("Not supported")
val logic = new TimerGraphStageLogic(shape) with StageLogging with InHandler { private[akka] override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes,
eagerMaterializer: Materializer): (GraphStageLogic, SourceRef[In]) = {
private[this] lazy val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(materializer).system) val logic = new TimerGraphStageLogic(shape) with StageLogging with ActorRefStage with InHandler {
private[this] val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(eagerMaterializer).system)
// settings --- // settings ---
import StreamRefAttributes._ import StreamRefAttributes._
private[this] lazy val settings = ActorMaterializerHelper.downcast(materializer).settings.streamRefSettings private[this] val settings = ActorMaterializerHelper.downcast(eagerMaterializer).settings.streamRefSettings
private[this] lazy val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout]( private[this] val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout](
SubscriptionTimeout(settings.subscriptionTimeout)) SubscriptionTimeout(settings.subscriptionTimeout))
// end of settings --- // end of settings ---
override protected lazy val stageActorName: String = streamRefsMaster.nextSinkRefStageName() override protected val stageActorName: String = streamRefsMaster.nextSinkRefStageName()
private[this] var self: GraphStageLogic.StageActor = _ private[this] val self: GraphStageLogic.StageActor =
implicit def selfSender: ActorRef = self.ref getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive)
override val ref: ActorRef = self.ref
implicit def selfSender: ActorRef = ref
private var partnerRef: OptionVal[ActorRef] = OptionVal.None private var partnerRef: OptionVal[ActorRef] = OptionVal.None
private def getPartnerRef: ActorRef = private def getPartnerRef: ActorRef =
@ -84,8 +97,6 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None
override def preStart(): Unit = { override def preStart(): Unit = {
self = getStageActor(initialReceive)
initialPartnerRef match { initialPartnerRef match {
case OptionVal.Some(ref) => case OptionVal.Some(ref) =>
// this will set the `partnerRef` // this will set the `partnerRef`
@ -104,11 +115,9 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
"Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}", "Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}",
initialPartnerRef, initialPartnerRef,
self.ref) self.ref)
promise.success(SourceRefImpl(self.ref))
} }
lazy val initialReceive: ((ActorRef, Any)) => Unit = { def initialReceive: ((ActorRef, Any)) => Unit = {
case (_, Terminated(ref)) => case (_, Terminated(ref)) =>
if (ref == getPartnerRef) if (ref == getPartnerRef)
finishedWithAwaitingPartnerTermination match { finishedWithAwaitingPartnerTermination match {
@ -156,7 +165,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
case SubscriptionTimeoutTimerKey => case SubscriptionTimeoutTimerKey =>
val ex = StreamRefSubscriptionTimeoutException( val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart // we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [$ref], " +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex throw ex
@ -231,7 +240,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
setHandler(in, this) setHandler(in, this)
} }
(logic, promise.future) (logic, SourceRefImpl(logic.ref))
} }
override def toString = s"${Logging.simpleName(getClass)}($initialRefName)" override def toString = s"${Logging.simpleName(getClass)}($initialRefName)"

View file

@ -15,8 +15,6 @@ import akka.stream.scaladsl.Source
import akka.stream.stage._ import akka.stream.stage._
import akka.util.{ OptionVal, PrettyDuration } import akka.util.{ OptionVal, PrettyDuration }
import scala.concurrent.{ Future, Promise }
/** INTERNAL API: Implementation class, not intended to be touched directly by end-users */ /** INTERNAL API: Implementation class, not intended to be touched directly by end-users */
@InternalApi @InternalApi
private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) extends SourceRef[T] { private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) extends SourceRef[T] {
@ -24,6 +22,13 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
Source.fromGraph(new SourceRefStageImpl(OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed) Source.fromGraph(new SourceRefStageImpl(OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed)
} }
/**
* INTERNAL API
*/
@InternalApi private[stream] object SourceRefStageImpl {
private sealed trait ActorRefStage { def ref: ActorRef }
}
/** /**
* INTERNAL API: Actual operator implementation backing [[SourceRef]]s. * INTERNAL API: Actual operator implementation backing [[SourceRef]]s.
* *
@ -32,7 +37,8 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
*/ */
@InternalApi @InternalApi
private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef]) private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef])
extends GraphStageWithMaterializedValue[SourceShape[Out], Future[SinkRef[Out]]] { stage => extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage =>
import SourceRefStageImpl.ActorRefStage
val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out") val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out")
override def shape = SourceShape.of(out) override def shape = SourceShape.of(out)
@ -43,24 +49,29 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
case _ => "<no-initial-ref>" case _ => "<no-initial-ref>"
} }
override def createLogicAndMaterializedValue( override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SinkRef[Out]) =
inheritedAttributes: Attributes): (GraphStageLogic, Future[SinkRef[Out]]) = { throw new IllegalStateException("Not supported")
val promise = Promise[SinkRefImpl[Out]]()
val logic = new TimerGraphStageLogic(shape) with StageLogging with OutHandler { private[akka] override def createLogicAndMaterializedValue(
private[this] lazy val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(materializer).system) inheritedAttributes: Attributes,
eagerMaterializer: Materializer): (GraphStageLogic, SinkRef[Out]) = {
val logic = new TimerGraphStageLogic(shape) with StageLogging with ActorRefStage with OutHandler {
private[this] val streamRefsMaster = StreamRefsMaster(ActorMaterializerHelper.downcast(eagerMaterializer).system)
// settings --- // settings ---
import StreamRefAttributes._ import StreamRefAttributes._
private[this] lazy val settings = ActorMaterializerHelper.downcast(materializer).settings.streamRefSettings private[this] val settings = ActorMaterializerHelper.downcast(eagerMaterializer).settings.streamRefSettings
private[this] lazy val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout]( private[this] val subscriptionTimeout = inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout](
SubscriptionTimeout(settings.subscriptionTimeout)) SubscriptionTimeout(settings.subscriptionTimeout))
// end of settings --- // end of settings ---
override protected lazy val stageActorName: String = streamRefsMaster.nextSourceRefStageName() override protected val stageActorName: String = streamRefsMaster.nextSourceRefStageName()
private[this] var self: GraphStageLogic.StageActor = _ private[this] val self: GraphStageLogic.StageActor =
private[this] implicit def selfSender: ActorRef = self.ref getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false)(initialReceive)
override val ref: ActorRef = self.ref
private[this] implicit def selfSender: ActorRef = ref
val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey" val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey"
val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey" val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey"
@ -88,15 +99,12 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
receiveBuffer = FixedSizeBuffer[Out](settings.bufferCapacity) receiveBuffer = FixedSizeBuffer[Out](settings.bufferCapacity)
requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity)
self = getStageActor(initialReceive)
log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref) log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref)
if (initialPartnerRef.isDefined) // this will set the partnerRef if (initialPartnerRef.isDefined) // this will set the partnerRef
observeAndValidateSender( observeAndValidateSender(
initialPartnerRef.get, initialPartnerRef.get,
"Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.") "Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.")
promise.success(SinkRefImpl(self.ref))
//this timer will be cancelled if we receive the handshake from the remote SinkRef //this timer will be cancelled if we receive the handshake from the remote SinkRef
// either created in this method and provided as self.ref as initialPartnerRef // either created in this method and provided as self.ref as initialPartnerRef
// or as the response to first CumulativeDemand request sent to remote SinkRef // or as the response to first CumulativeDemand request sent to remote SinkRef
@ -133,7 +141,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
case SubscriptionTimeoutTimerKey => case SubscriptionTimeoutTimerKey =>
val ex = StreamRefSubscriptionTimeoutException( val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart // we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [$ref]," +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
@ -149,7 +157,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
"(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.")) "(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down."))
} }
lazy val initialReceive: ((ActorRef, Any)) => Unit = { def initialReceive: ((ActorRef, Any)) => Unit = {
case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) => case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) =>
cancelTimer(SubscriptionTimeoutTimerKey) cancelTimer(SubscriptionTimeoutTimerKey)
observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext") observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext")
@ -181,9 +189,9 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
self.unwatch(sender) self.unwatch(sender)
failStage(RemoteStreamRefActorTerminatedException(s"Remote stream (${sender.path}) failed, reason: $reason")) failStage(RemoteStreamRefActorTerminatedException(s"Remote stream (${sender.path}) failed, reason: $reason"))
case (_, Terminated(ref)) => case (_, Terminated(p)) =>
partnerRef match { partnerRef match {
case OptionVal.Some(`ref`) => case OptionVal.Some(`p`) =>
// we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail // we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail
// will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref // will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref
// the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there. // the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there.
@ -193,7 +201,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
// this should not have happened! It should be impossible that we watched some other actor // this should not have happened! It should be impossible that we watched some other actor
failStage( failStage(
RemoteStreamRefActorTerminatedException( RemoteStreamRefActorTerminatedException(
s"Received UNEXPECTED Terminated($ref) message! " + s"Received UNEXPECTED Terminated($p) message! " +
s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down.")) s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down."))
} }
@ -228,8 +236,8 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
partnerRef = OptionVal(partner) partnerRef = OptionVal(partner)
self.watch(partner) self.watch(partner)
case OptionVal.Some(ref) => case OptionVal.Some(p) =>
if (partner != ref) { if (partner != p) {
val ex = InvalidPartnerActorException(partner, getPartnerRef, msg) val ex = InvalidPartnerActorException(partner, getPartnerRef, msg)
partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
throw ex throw ex
@ -248,7 +256,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
setHandler(out, this) setHandler(out, this)
} }
(logic, promise.future) (logic, SinkRefImpl(logic.ref))
} }
override def toString: String = override def toString: String =

View file

@ -4,21 +4,12 @@
package akka.stream.javadsl package akka.stream.javadsl
import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.stream._ import akka.stream._
/** /**
* API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value
* will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked
* as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details.
*
* Factories for creating stream refs. * Factories for creating stream refs.
*/ */
@ApiMayChange
object StreamRefs { object StreamRefs {
import scala.compat.java8.FutureConverters._
/** /**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
@ -28,9 +19,8 @@ object StreamRefs {
* *
* See more detailed documentation on [[SourceRef]]. * See more detailed documentation on [[SourceRef]].
*/ */
@ApiMayChange def sourceRef[T](): javadsl.Sink[T, SourceRef[T]] =
def sourceRef[T](): javadsl.Sink[T, CompletionStage[SourceRef[T]]] = scaladsl.StreamRefs.sourceRef[T]().asJava
scaladsl.StreamRefs.sourceRef[T]().mapMaterializedValue(_.toJava).asJava
/** /**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
@ -40,8 +30,7 @@ object StreamRefs {
* *
* See more detailed documentation on [[SinkRef]]. * See more detailed documentation on [[SinkRef]].
*/ */
@ApiMayChange def sinkRef[T](): javadsl.Source[T, SinkRef[T]] =
def sinkRef[T](): javadsl.Source[T, CompletionStage[SinkRef[T]]] = scaladsl.StreamRefs.sinkRef[T]().asJava
scaladsl.StreamRefs.sinkRef[T]().mapMaterializedValue(_.toJava).asJava
} }

View file

@ -4,44 +4,34 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.annotation.ApiMayChange
import akka.stream.{ SinkRef, SourceRef } import akka.stream.{ SinkRef, SourceRef }
import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl } import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl }
import akka.util.OptionVal import akka.util.OptionVal
import scala.concurrent.Future
/** /**
* API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value
* will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked
* as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details.
*
* Factories for creating stream refs. * Factories for creating stream refs.
*/ */
@ApiMayChange
object StreamRefs { object StreamRefs {
/** /**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached directly in place of the local Sink. * to consume data from this local stream, as if they were attached directly in place of the local Sink.
* *
* Adheres to [[StreamRefAttributes]]. * Adheres to [[akka.stream.StreamRefAttributes]].
* *
* See more detailed documentation on [[SourceRef]]. * See more detailed documentation on [[SourceRef]].
*/ */
@ApiMayChange def sourceRef[T](): Sink[T, SourceRef[T]] =
def sourceRef[T](): Sink[T, Future[SourceRef[T]]] =
Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None)) Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None))
/** /**
* A local [[Source]] which materializes a [[SinkRef]] which can be used by other streams (including remote ones), * A local [[Source]] which materializes a [[SinkRef]] which can be used by other streams (including remote ones),
* to publish data to this local stream, as if they were attached directly in place of the local Source. * to publish data to this local stream, as if they were attached directly in place of the local Source.
* *
* Adheres to [[StreamRefAttributes]]. * Adheres to [[akka.stream.StreamRefAttributes]].
* *
* See more detailed documentation on [[SinkRef]]. * See more detailed documentation on [[SinkRef]].
*/ */
@ApiMayChange def sinkRef[T](): Source[T, SinkRef[T]] =
def sinkRef[T](): Source[T, Future[SinkRef[T]]] =
Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None)) Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None))
} }