diff --git a/akka-docs/src/main/paradox/images/sink-ref-animation.gif b/akka-docs/src/main/paradox/images/sink-ref-animation.gif index ae572ddd13..52e26d2103 100644 Binary files a/akka-docs/src/main/paradox/images/sink-ref-animation.gif and b/akka-docs/src/main/paradox/images/sink-ref-animation.gif differ diff --git a/akka-docs/src/main/paradox/images/source-ref-animation.gif b/akka-docs/src/main/paradox/images/source-ref-animation.gif index 073d64fdd0..76fd2c936d 100644 Binary files a/akka-docs/src/main/paradox/images/source-ref-animation.gif and b/akka-docs/src/main/paradox/images/source-ref-animation.gif differ diff --git a/akka-docs/src/main/paradox/stream/stream-refs.md b/akka-docs/src/main/paradox/stream/stream-refs.md index 8540a61955..1d1b09a614 100644 --- a/akka-docs/src/main/paradox/stream/stream-refs.md +++ b/akka-docs/src/main/paradox/stream/stream-refs.md @@ -5,8 +5,7 @@ an Akka Cluster. Unlike heavier "streaming data processing" frameworks, Akka Streams are not "deployed" nor automatically distributed. Akka stream refs are, as the name implies, references to existing parts of a stream, and can be used to create a -distributed processing framework or introduce such capabilities in specific parts of your application, however they -are not on that level of abstraction by themselves. +distributed processing framework or introduce such capabilities in specific parts of your application. Stream refs are trivial to make use of in existing clustered Akka applications, and require no additional configuration or setup. They automatically maintain flow-control / back-pressure over the network, and employ Akka's failure detection @@ -19,7 +18,7 @@ implement manually. A useful way to think about stream refs is: "like an `ActorRef`, but for Akka Streams's `Source` and `Sink`". - Since they refer to an already existing, possibly remote, `Sink` or `Source`. + Stream refs refer to an already existing, possibly remote, `Sink` or `Source`. This is not to be mistaken with deploying streams remotely, which this feature is not intended for. @@@ @@ -48,7 +47,7 @@ Actors would usually be used to establish the stream, by means of some initial m "I want to offer you many log elements (the stream ref)", or alternatively in the opposite way "If you need to send me much data, here is the stream ref you can use to do so". -Since the two sides ("local" and "remote") of reach reference may be confusing to simply refer to as +Since the two sides ("local" and "remote") of each reference may be confusing to simply refer to as "remote" and "local" -- since either side can be seen as "local" or "remote" depending how we look at it -- we propose to use the terminology "origin" and "target", which is defined by where the stream ref was created. For `SourceRef`s, the "origin" is the side which has the data that it is going to stream out. For `SinkRef`s @@ -56,7 +55,7 @@ the "origin" side is the actor system that is ready to receive the data and has two may be seen as duals of each other, however to explain patterns about sharing references, we found this wording to be rather useful. -### Source Refs - offering streaming data over network +### Source Refs - offering streaming data to a remote system A @scala[@scaladoc[`SourceRef`](akka.stream.SourceRef)]@java[@javadoc[`SourceRef`](akka.stream.SourceRef)] can be offered to a remote actor system in order for it to consume some source of data that we have prepared @@ -89,16 +88,17 @@ The process of preparing and running a `SourceRef` powered distributed stream is A `SourceRef` is *by design* "single-shot". i.e. it may only be materialized once. This is in order to not complicate the mental model what materializing such value would mean. - By being single-shot, we always know what it means, and on top of those semantics offer a fan-out - by emitting multiple `SourceRef`s which target the same `Source` that uses `Broadcast`. - This also allows for fine grained control how many streams a system can expect to be running - at the same time, which is useful for capacity planning and "allowed number of concurrent streams - limiting" of clients. + While stream refs are designed to be single shot, you may use them to mimic multicast scenarios, + simply by starting a `Broadcast` stage once, and attaching multiple new streams to it, for each + emitting a new stream ref. This way each output of the broadcast is by itself an unique single-shot + reference, however they can all be powered using a single `Source` -- located before the `Broadcast` stage. @@@ -### Sink Refs - offering to receive streaming data +### Sink Refs - offering to receive streaming data from a remote system -The dual of source references are A @scala[@scaladoc[`SourceRef`](akka.stream.SinkRef)]@java[@javadoc[`SourceRef`](akka.stream.SinkRef)]s. They can be used to offer the other side the capability to +The dual of @scala[@scaladoc[`SourceRef`](akka.stream.SinkRef)]@java[@javadoc[`SourceRef`](akka.stream.SinkRef)]s. + +They can be used to offer the other side the capability to send to the *origin* side data in a streaming, flow-controlled fashion. The origin here allocates a Sink, which could be as simple as a `Sink.foreach` or as advanced as a complex sink which streams the incoming data into various other systems (e.g. any of the Alpakka provided Sinks). @@ -128,7 +128,7 @@ The process of preparing and running a `SinkRef` powered distributed stream is s @@@ warning - A `SinkeRef` is *by design* "single-shot". i.e. it may only be materialized once. + A `SinkRef` is *by design* "single-shot". i.e. it may only be materialized once. This is in order to not complicate the mental model what materializing such value would mean. If you have an use case for building a fan-in operation accepting writes from multiple remote nodes, @@ -153,10 +153,9 @@ of data such as huge log files, messages or even media, with as much ease as if All stream references have a subscription timeout, which is intended to prevent resource leaks in situations in which a remote node would requests the allocation of many streams yet never actually run -them. In order to prevent this, each stream reference has a default timeout (of 30 seconds), after which -if it's "handed out" side has not been materialized, the origin will terminate with a timeout exception, -and IF the remote side eventually would be run afterwards, it would also immediately fail with an exception -pointing out that the origin seems to be missing. +them. In order to prevent this, each stream reference has a default timeout (of 30 seconds), after which the +origin will abort the stream offer if the target has not materialized the stream ref in time. After the +timeout has triggered, materialization of the target side will fail pointing out that the origin is missing. Since these timeouts are often very different based on the kind of stream offered, and there can be many different kinds of them in the same application, it is possible to not only configure this setting diff --git a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java index 2428c4a694..00fd0dc37f 100644 --- a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java @@ -8,6 +8,7 @@ import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.pattern.PatternsCS; import akka.remote.WireFormats; import akka.stream.*; import akka.stream.javadsl.*; @@ -21,6 +22,7 @@ import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletionStage; import static org.junit.Assert.assertEquals; @@ -61,9 +63,10 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { private void handleRequestLogs(RequestLogs requestLogs) { Source logs = streamLogs(requestLogs.streamId); - SourceRef logsRef = logs.runWith(Sink.sourceRef(), mat); - LogsOffer offer = new LogsOffer(logsRef); - sender().tell(offer, self()); + CompletionStage> logsRef = logs.runWith(StreamRefs.sourceRef(), mat); + + PatternsCS.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher()) + .to(sender()); } private Source streamLogs(long streamId) { @@ -89,18 +92,18 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { } //#offer-sink - class PrepareUpload { + static class PrepareUpload { final String id; public PrepareUpload(String id) { this.id = id; } } - class MeasurementsSinkReady { + static class MeasurementsSinkReady { final String id; final SinkRef sinkRef; - public PrepareUpload(String id, SinkRef ref) { + public MeasurementsSinkReady(String id, SinkRef ref) { this.id = id; this.sinkRef = ref; } @@ -112,15 +115,16 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { return receiveBuilder() .match(PrepareUpload.class, prepare -> { Sink sink = logsSinkFor(prepare.id); - SinkRef sinkRef = Source.sinkRef().to(sink).run(mat); + CompletionStage> sinkRef = StreamRefs.sinkRef().to(sink).run(mat); - sender().tell(new MeasurementsSinkReady(sinkRef), self()); + PatternsCS.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher()) + .to(sender()); }) - .create(); + .build(); } private Sink logsSinkFor(String id) { - return Sink.ignore(); // would be actual useful Sink in reality + return Sink.ignore().mapMaterializedValue(done -> NotUsed.getInstance()); } } //#offer-sink @@ -132,10 +136,10 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { ActorRef receiver = system.actorOf(Props.create(DataReceiver.class), "dataReceiver"); receiver.tell(new PrepareUpload("system-42-tmp"), getTestActor()); - MeasurementsSinkReady ready = expectMsgClass(LogsOffer.class); + MeasurementsSinkReady ready = expectMsgClass(MeasurementsSinkReady.class); Source.repeat("hello") - .runWith(ready.sinkRef, mat); + .runWith(ready.sinkRef.getSink(), mat); //#offer-sink-use }}; } @@ -149,11 +153,11 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { // configuring Sink.sourceRef (notice that we apply the attributes to the Sink!): Source.repeat("hello") - .runWith(Sink.sourceRef().addAttributes(timeoutAttributes), mat); + .runWith(StreamRefs.sourceRef().addAttributes(timeoutAttributes), mat); // configuring SinkRef.source: - Source.sinkRef().addAttributes(timeoutAttributes) - .runWith(Sink.ignore(), mat); // not very interesting sink, just an example + StreamRefs.sinkRef().addAttributes(timeoutAttributes) + .runWith(Sink.ignore(), mat); // not very interesting sink, just an example //#attr-sub-timeout }}; diff --git a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala index 3dd8f5be05..a289d01b1b 100644 --- a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala @@ -9,18 +9,20 @@ import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.testkit.AkkaSpec import docs.CompileOnlySpec +import scala.concurrent.Future class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { "offer a source ref" in compileOnlySpec { //#offer-source import akka.stream.SourceRef + import akka.pattern.pipe case class RequestLogs(streamId: Int) case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) class DataSource extends Actor { - + import context.dispatcher implicit val mat = ActorMaterializer()(context) def receive = { @@ -29,13 +31,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { val source: Source[String, NotUsed] = streamLogs(streamId) // materialize the SourceRef: - val ref: SourceRef[String] = source.runWith(Sink.sourceRef()) + val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef()) // wrap the SourceRef in some domain message, such that the sender knows what source it is - val reply: LogsOffer = LogsOffer(streamId, ref) + val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _)) // reply to sender - sender() ! reply + reply pipeTo sender() } def streamLogs(streamId: Long): Source[String, NotUsed] = ??? @@ -59,7 +61,7 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { "offer a sink ref" in compileOnlySpec { //#offer-sink - import akka.pattern._ + import akka.pattern.pipe import akka.stream.SinkRef case class PrepareUpload(id: String) @@ -76,13 +78,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { val sink: Sink[String, NotUsed] = logsSinkFor(nodeId) // materialize the SinkRef (the remote is like a source of data for us): - val ref: SinkRef[String] = Source.sinkRef[String]().to(sink).run() + val ref: Future[SinkRef[String]] = StreamRefs.sinkRef[String]().to(sink).run() // wrap the SinkRef in some domain message, such that the sender knows what source it is - val reply: MeasurementsSinkReady = MeasurementsSinkReady(nodeId, ref) + val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _)) // reply to sender - sender() ! reply + reply pipeTo sender() } def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ??? @@ -114,10 +116,10 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec { // configuring Sink.sourceRef (notice that we apply the attributes to the Sink!): Source.repeat("hello") - .runWith(Sink.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))) + .runWith(StreamRefs.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))) // configuring SinkRef.source: - Source.sinkRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)) + StreamRefs.sinkRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)) .runWith(Sink.ignore) // not very interesting Sink, just an example //#attr-sub-timeout } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 60a9339d1c..06b9333c1d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -11,9 +11,10 @@ import akka.stream.testkit.TestPublisher import akka.stream.testkit.scaladsl._ import akka.stream.{ ActorMaterializer, SinkRef, SourceRef, StreamRefAttributes } import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe } -import akka.util.ByteString +import akka.util.{ ByteString, PrettyDuration } import com.typesafe.config._ +import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.Future import scala.util.control.NoStackTrace @@ -27,6 +28,7 @@ object StreamRefsSpec { } class DataSourceActor(probe: ActorRef) extends Actor with ActorLogging { + import context.dispatcher implicit val mat = ActorMaterializer() def receive = { @@ -37,35 +39,35 @@ object StreamRefsSpec { * For them it's a Source; for us it is a Sink we run data "into" */ val source: Source[String, NotUsed] = Source(List("hello", "world")) - val ref: SourceRef[String] = source.runWith(Sink.sourceRef()) + val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef()) - sender() ! ref + ref pipeTo sender() case "give-infinite" ⇒ val source: Source[String, NotUsed] = Source.fromIterator(() ⇒ Iterator.from(1)).map("ping-" + _) - val (r: NotUsed, ref: SourceRef[String]) = source.toMat(Sink.sourceRef())(Keep.both).run() + val (r: NotUsed, ref: Future[SourceRef[String]]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run() - sender() ! ref + ref pipeTo sender() case "give-fail" ⇒ val ref = Source.failed[String](new Exception("Booooom!") with NoStackTrace) - .runWith(Sink.sourceRef()) + .runWith(StreamRefs.sourceRef()) - sender() ! ref + ref pipeTo sender() case "give-complete-asap" ⇒ val ref = Source.empty - .runWith(Sink.sourceRef()) + .runWith(StreamRefs.sourceRef()) - sender() ! ref + ref pipeTo sender() case "give-subscribe-timeout" ⇒ val ref = Source.repeat("is anyone there?") - .toMat(Sink.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef + .toMat(StreamRefs.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .run() - sender() ! ref + ref pipeTo sender() // case "send-bulk" ⇒ // /* @@ -84,23 +86,23 @@ object StreamRefsSpec { * * For them it's a Sink; for us it's a Source. */ - val sink: SinkRef[String] = - Source.sinkRef[String]() + val sink = + StreamRefs.sinkRef[String]() .to(Sink.actorRef(probe, "")) .run() - sender() ! sink + sink pipeTo sender() case "receive-subscribe-timeout" ⇒ - val sink = Source.sinkRef[String]() + val sink = StreamRefs.sinkRef[String]() .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .to(Sink.actorRef(probe, "")) .run() - sender() ! sink + sink pipeTo sender() case "receive-32" ⇒ - val (sink, driver) = Source.sinkRef[String]() + val (sink, driver) = StreamRefs.sinkRef[String]() .toMat(TestSink.probe(context.system))(Keep.both) .run() @@ -117,7 +119,7 @@ object StreamRefsSpec { "" } pipeTo probe - sender() ! sink + sink pipeTo sender() // case "receive-bulk" ⇒ // /* diff --git a/akka-stream/src/main/mima-filters/2.5.10.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.10.backwards.excludes index 3308b3e3e6..e69de29bb2 100644 --- a/akka-stream/src/main/mima-filters/2.5.10.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.10.backwards.excludes @@ -1,3 +0,0 @@ -# #24230 stream refs - SourceRef / SinkRef -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializerSettings.this") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic#StageActor.this") diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index ce597be99a..cb9e8731c8 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -9,13 +9,11 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.event.LoggingAdapter import akka.util.Helpers.toRootLowerCase -import akka.stream.ActorMaterializerSettings.defaultMaxFixedBufferSize import akka.stream.impl._ import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ import akka.japi.function -import akka.stream.impl.streamref.StreamRefSettingsImpl import akka.stream.stage.GraphStageLogic import scala.util.control.NoStackTrace @@ -249,7 +247,8 @@ object ActorMaterializerSettings { maxFixedBufferSize: Int) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize, defaultIoSettings, defaultStreamRefSettings) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), + StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) /** * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala). @@ -294,7 +293,8 @@ object ActorMaterializerSettings { maxFixedBufferSize: Int) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize, defaultIoSettings, defaultStreamRefSettings) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), + StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) /** * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java). @@ -308,10 +308,6 @@ object ActorMaterializerSettings { def create(config: Config): ActorMaterializerSettings = apply(config) - private val defaultMaxFixedBufferSize = 1000 - private val defaultIoSettings = IOSettings(tcpWriteBufferSize = 16 * 1024) - // sadly due to the existence of the create-from-individual-parts methods, we need to replicate the defaults here from reference.conf... - private val defaultStreamRefSettings: StreamRefSettings = StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref")) } /** @@ -345,6 +341,57 @@ final class ActorMaterializerSettings private ( requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize") require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") + // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + def this( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + fuzzingMode: Boolean, + autoFusing: Boolean, + maxFixedBufferSize: Int, + syncProcessingLimit: Int, + ioSettings: IOSettings) = + this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, + StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + + // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + def this( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + fuzzingMode: Boolean, + autoFusing: Boolean, + maxFixedBufferSize: Int, + syncProcessingLimit: Int) = + this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, + IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + + // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + def this( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + fuzzingMode: Boolean, + autoFusing: Boolean, + maxFixedBufferSize: Int) = + this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), + StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + private def copy( initialInputBufferSize: Int = this.initialInputBufferSize, maxInputBufferSize: Int = this.maxInputBufferSize, diff --git a/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala b/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala index 5b1646d279..f10c5c6424 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala @@ -46,7 +46,7 @@ trait StreamRefSettings { // --- with... methods --- def withBufferCapacity(value: Int): StreamRefSettings - def withDemandRedeliveryInterval(value: scala.concurrent.duration.FiniteDuration): StreamRefSettings + def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings } diff --git a/akka-stream/src/main/scala/akka/stream/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/StreamRefs.scala index 7a208e511a..39656b5484 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamRefs.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamRefs.scala @@ -23,7 +23,7 @@ object SinkRef { * to accept some data from it, and the remote one decides to accept the request to send data in a back-pressured * streaming fashion -- using a sink ref. * - * To create a [[SinkRef]] you have to materialize the `Sink` that you want to obtain a reference to by attaching it to a `Source.sinkRef`. + * To create a [[SinkRef]] you have to materialize the `Sink` that you want to obtain a reference to by attaching it to a `StreamRefs.sinkRef()`. * * Stream refs can be seen as Reactive Streams over network boundaries. * See also [[akka.stream.SourceRef]] which is the dual of a `SinkRef`. @@ -69,14 +69,16 @@ trait SourceRef[T] { // --- exceptions --- final case class TargetRefNotInitializedYetException() - extends IllegalStateException("Internal remote target actor ref not yet resolved, yet attempted to send messages to it. This should not happen due to proper flow-control, please open a ticket on the issue tracker: https://github.com/akka/akka") + extends IllegalStateException("Internal remote target actor ref not yet resolved, yet attempted to send messages to it. " + + "This should not happen due to proper flow-control, please open a ticket on the issue tracker: https://github.com/akka/akka") final case class StreamRefSubscriptionTimeoutException(msg: String) extends IllegalStateException(msg) final case class RemoteStreamRefActorTerminatedException(msg: String) extends RuntimeException(msg) final case class InvalidSequenceNumberException(expectedSeqNr: Long, gotSeqNr: Long, msg: String) - extends IllegalStateException(s"$msg (expected: $expectedSeqNr, got: $gotSeqNr)") + extends IllegalStateException(s"$msg (expected: $expectedSeqNr, got: $gotSeqNr). " + + s"In most cases this means that message loss on this connection has occurred and the stream will fail eagerly.") /** * Stream refs establish a connection between a local and remote actor, representing the origin and remote sides @@ -89,4 +91,7 @@ final case class InvalidSequenceNumberException(expectedSeqNr: Long, gotSeqNr: L * This is not meant as a security feature, but rather as plain sanity-check. */ final case class InvalidPartnerActorException(expectedRef: ActorRef, gotRef: ActorRef, msg: String) - extends IllegalStateException(s"$msg (expected: $expectedRef, got: $gotRef)") + extends IllegalStateException(s"$msg (expected: $expectedRef, got: $gotRef). " + + s"This may happen due to 'double-materialization' on the other side of this stream ref. " + + s"Do note that stream refs are one-shot references and have to be paired up in 1:1 pairs. " + + s"Multi-cast such as broadcast etc can be implemented by sharing multiple new stream references. ") diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 25e3791dd8..959bc79001 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -24,11 +24,8 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber } import scala.collection.immutable.Map import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, ExecutionContextExecutor, Future } -import scala.annotation.tailrec -import akka.util.{ OptionVal, PrettyDuration } - -import scala.util.{ Failure, Success } +import scala.concurrent.ExecutionContextExecutor +import akka.util.OptionVal /** * INTERNAL API @@ -433,7 +430,6 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff val traversalStack = new java.util.ArrayDeque[Traversal](16) traversalStack.addLast(current) - var needsFlattening = false val matValueStack = new java.util.ArrayDeque[Any](8) if (Debug) { @@ -481,18 +477,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff case compose: Compose ⇒ val second = matValueStack.removeLast() val first = matValueStack.removeLast() - - val result = - if (needsFlattening && (first.isInstanceOf[FlattenMatHolder[_]] || second.isInstanceOf[FlattenMatHolder[_]])) { - (first, second) match { - case (FlattenMatHolder(f1, t1), FlattenMatHolder(f2, t2)) ⇒ - FlattenMatHolder[Any](f1.zip(f2).map({ case (left, right) ⇒ compose(left, right) })(system.dispatcher), t1) // FIXME dedicate a dispatcher thread? - case (FlattenMatHolder(f1, t1), v2) ⇒ - FlattenMatHolder(f1.map(compose(_, v2))(system.dispatcher), t1) // FIXME dedicate a dispatcher thread? - case (v1, FlattenMatHolder(f2, t2)) ⇒ - FlattenMatHolder(f2.map(compose(v1, _))(system.dispatcher), t2) // FIXME dedicate a dispatcher thread? - } - } else compose(first, second) + val result = compose(first, second) matValueStack.addLast(result) if (Debug) println(s"COMP: $matValueStack") case PushAttributes(attr) ⇒ @@ -505,13 +490,6 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff islandTracking.enterIsland(tag, attributesStack.getLast) case ExitIsland ⇒ islandTracking.exitIsland() - case flatten: FlattenMat ⇒ - val prev = matValueStack.removeLast() - if (!prev.isInstanceOf[Future[_]]) throw new IllegalArgumentException("flattenMaterializedValue MUST be applied immediately after a materialized value ") - val result = FlattenMatHolder(prev.asInstanceOf[Future[Mat]], flatten.timeout) - needsFlattening = true - matValueStack.addLast(result) - if (Debug) println(s"FLTN: $matValueStack") case _ ⇒ } current = nextStep @@ -525,23 +503,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff islandTracking.allNestedIslandsReady() if (Debug) println("--- Finished materialization") - matValueStack.peekLast() match { - case FlattenMatHolder(f: Future[Mat @unchecked], t) ⇒ - f.value match { - case Some(Success(m)) ⇒ m - case Some(Failure(ex)) ⇒ throw new Exception("Flattened materialized value failed!", ex) - case None ⇒ - // last resort, await - val start = System.currentTimeMillis() - val mat = Await.result(f, t) - val stop = System.currentTimeMillis() - import scala.concurrent.duration._ - println(s"Waiting took: >>> ${PrettyDuration.format((stop - start).millis)} <<<") - mat - } - - case mat: Mat ⇒ mat - } + matValueStack.peekLast().asInstanceOf[Mat] } finally { if (isShutdown) throw shutdownWhileMaterializingFailure diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 8e79282e15..f3ce424bd2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -125,7 +125,6 @@ import akka.stream._ val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") - val futureFlattenSink = name("futureFlattenSink") val actorRefWithAck = name("actorRefWithAckSink") val actorSubscriberSink = name("actorSubscriberSink") val queueSink = name("queueSink") diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 601265c8ac..a9f99c9f6c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -13,8 +13,6 @@ import akka.util.OptionVal import scala.language.existentials import scala.collection.immutable.Map.Map1 -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, Future } /** * INTERNAL API @@ -156,12 +154,6 @@ import scala.concurrent.{ Await, Future } @InternalApi private[akka] final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp { def apply(arg: Any): Any = mapper.asInstanceOf[Any ⇒ Any](arg) } -/** - * INTERNAL API - */ -@InternalApi private[akka] final case class FlattenMat(timeout: FiniteDuration) extends MaterializedValueOp -/** INTERNAL API */ -@InternalApi private[akka] final case class FlattenMatHolder[M](f: Future[M], timeout: FiniteDuration) /** * INTERNAL API @@ -373,8 +365,6 @@ import scala.concurrent.{ Await, Future } */ def transformMat(f: AnyFunction1): TraversalBuilder - def flattenMat(timeout: FiniteDuration): TraversalBuilder - protected def internalSetAttributes(attributes: Attributes): TraversalBuilder def setAttributes(attributes: Attributes): TraversalBuilder = { @@ -490,9 +480,6 @@ import scala.concurrent.{ Await, Future } override def transformMat(f: AnyFunction1): TraversalBuilder = copy(traversalSoFar = traversalSoFar.concat(Transform(f))) - override def flattenMat(timeout: FiniteDuration): TraversalBuilder = - copy(traversalSoFar = traversalSoFar.concat(FlattenMat(timeout))) - override def offsetOf(in: InPort): Int = inToOffset(in) override def isTraversalComplete: Boolean = true @@ -545,9 +532,6 @@ import scala.concurrent.{ Await, Future } override def transformMat(f: AnyFunction1): TraversalBuilder = TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f) - override def flattenMat(timeout: FiniteDuration): TraversalBuilder = - TraversalBuilder.empty().add(this, module.shape, Keep.right).flattenMat(timeout) - override val inSlots: Int = module.shape.inlets.size override def offsetOfModule(out: OutPort): Int = 0 @@ -1066,15 +1050,6 @@ import scala.concurrent.{ Await, Future } copy(traversalSoFar = traversalSoFar.concat(Transform(f))) } - /** - * UNSAFE API: May cause blocking DURING MATERIALIZATION. - * Use this API only in places where it is KNOWN that the flattened Future will be completed ASAP (less than a few ms, zero at best). - * WITH GREAT POWER, COMES GREAT RESPONSIBILITY. - */ - override def flattenMat(timeout: FiniteDuration): LinearTraversalBuilder = { - copy(traversalSoFar = traversalSoFar.concat(FlattenMat(timeout))) - } - /** * Wraps the builder in an island that can be materialized differently, using async boundaries to bridge * between islands. @@ -1326,9 +1301,6 @@ import scala.concurrent.{ Await, Future } override def transformMat(f: AnyFunction1): TraversalBuilder = { copy(finalSteps = finalSteps.concat(Transform(f))) } - override def flattenMat(timeout: FiniteDuration): TraversalBuilder = { - copy(finalSteps = finalSteps.concat(FlattenMat(timeout))) - } override def makeIsland(islandTag: IslandTag): TraversalBuilder = { this.islandTag match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index c76ad1b98e..9a979c5d14 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -371,90 +371,6 @@ import scala.util.control.NonFatal override def toString: String = "FutureFlattenSource" } - // final class FutureFlattenSink[T, M](futureSink: Future[Graph[SinkShape[T], M]]) - // extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - // ReactiveStreamsCompliance.requireNonNullElement(futureSink) - // - // val in: Inlet[T] = Inlet("FutureFlattenSink.in") - // override val shape = SinkShape(in) - // - // override def initialAttributes = DefaultAttributes.futureFlattenSink - // - // override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = { - // val materialized = Promise[M]() - // - // val logic = new GraphStageLogic(shape) with InHandler with OutHandler { - // private val sourceOut = new SubSourceOutlet[T]("FutureFlattenSink.out") - // - // override def preStart(): Unit = - // futureSink.value match { - // case Some(it) ⇒ - // // this optimisation avoids going through any execution context, in similar vein to FastFuture - // onFutureSinkCompleted(it) - // case _ ⇒ - // val cb = getAsyncCallback[Try[Graph[SinkShape[T], M]]](onFutureSinkCompleted).invoke _ - // futureSink.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) // could be optimised FastFuture-like - // } - // - // // initial handler (until future completes) - // setHandler(in, new InHandler { - // override def onPush(): Unit = grab(it) - // - //// def onPull(): Unit = {} - //// - //// override def onDownstreamFinish(): Unit = { - //// if (!materialized.isCompleted) { - //// // we used to try to materialize the "inner" source here just to get - //// // the materialized value, but that is not safe and may cause the graph shell - //// // to leak/stay alive after the stage completes - //// - //// materialized.tryFailure(new StreamDetachedException("Stream cancelled before Source Future completed")) - //// } - //// - //// super.onDownstreamFinish() - //// } - //}) - // - // def onPush(): Unit = - // push(out, sourceOut.grab()) - // - // def onPull(): Unit = - // sourceOut.pull() - // - // override def onUpstreamFinish(): Unit = - // completeStage() - // - // override def postStop(): Unit = - // if (!sourceOut.isClosed) sourceOut.cancel() - // - // def onFutureSinkCompleted(result: Try[Graph[SinkShape[T], M]]): Unit = { - // result.map { graph ⇒ - // val runnable = Sink.fromGraph(graph).toMat(sourceOut.sink)(Keep.left) - // val matVal = interpreter.subFusingMaterializer.materialize(runnable, defaultAttributes = attr) - // materialized.success(matVal) - // - // setHandler(out, this) - // sourceOut.setHandler(this) - // - // if (isAvailable(out)) { - // sourceOut.pull() - // } - // - // }.recover { - // case t ⇒ - // sourceOut.cancel() - // materialized.failure(t) - // failStage(t) - // } - // } - // } - // - // (logic, materialized.future) - // } - // - // override def toString: String = "FutureFlattenSource" - // } - final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] { ReactiveStreamsCompliance.requireNonNullElement(future) val shape = SourceShape(Outlet[T]("FutureSource.out")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala deleted file mode 100644 index ecbb496012..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala +++ /dev/null @@ -1,47 +0,0 @@ -///** -// * Copyright (C) 2018 Lightbend Inc. -// */ -//package akka.stream.impl.streamref -// -//import akka.NotUsed -//import akka.annotation.InternalApi -//import akka.stream.{ SinkRef, javadsl } -//import akka.stream.scaladsl.{ Sink, Source } -// -//import scala.concurrent.{ Await, Future } -//import scala.util.{ Failure, Success } -//import scala.concurrent.duration._ -// -///** -// * INTERNAL API -// * Allows users to directly use the SinkRef, even though we do have to go through the Future in order to be able -// * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SinkRefStageImpl]] for usage. -// */ -//@InternalApi -//private[akka] final case class MaterializedSinkRef[In](futureSink: Future[SinkRefImpl[In]]) extends SinkRef[In] { -// -// override def sink: Sink[In, NotUsed] = -// futureSink.value match { -// -// case Some(Success(ready)) ⇒ -// // the normal case, since once materialization finishes, the future is guaranteed to have been completed -// ready.sink -// -// case Some(Failure(cause)) ⇒ -// // materialization failed -// Sink.cancelled -// -// case None ⇒ -// ??? -// // not yet materialized, awaiting the preStart to be run... -// // Source.fromFutureSource(futureSource.map(ref => ref.source)(ex)).mapMaterializedValue(_ ⇒ NotUsed) -// } -// -// override def toString: String = -// futureSink.value match { -// case None ⇒ s"SinkRef()" -// case Some(Success(ready)) ⇒ ready.toString -// case Some(Failure(ex)) ⇒ s"SinkRef()" -// } -// -//} diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala deleted file mode 100644 index 8d606c4011..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala +++ /dev/null @@ -1,79 +0,0 @@ -///** -// * Copyright (C) 2018 Lightbend Inc. -// */ -//package akka.stream.impl.streamref -// -//import java.util.concurrent.atomic.AtomicReference -// -//import akka.NotUsed -//import akka.actor.ActorRef -//import akka.annotation.InternalApi -//import akka.stream.scaladsl.Source -//import akka.stream.{ OverflowStrategy, SourceRef, javadsl } -// -//import scala.concurrent.{ ExecutionContext, Future } -//import scala.util.{ Failure, Success } -// -///** -// * INTERNAL API -// * Allows users to directly use the SourceRef, even though we do have to go through the Future in order to be able -// * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SourceRefStageImpl]] for usage. -// */ -//@InternalApi -//private[akka] final case class MaterializedSourceRef[Out](futureSource: Future[SourceRefImpl[Out]]) extends SourceRef[Out] { -// -// class State -// final case class Initializing(buffer: Vector[Out]) extends State -// final case class Initialized(ref: SourceRefImpl[Out]) extends State -// -// val it = new AtomicReference[State](Initializing(Vector.empty)) -// -// // the advanced logic here is in order to allow RUNNING a stream locally ASAP even while materialization is still in-flight (preStart has not completed futureSource) -// override def source: Source[Out, NotUsed] = -// futureSource.value match { -// -// case Some(Success(ready)) ⇒ -// // the normal case, since once materialization finishes, the future is guaranteed to have been completed -// ready.source -// -// case Some(Failure(cause)) ⇒ -// // materialization failed -// Source.failed(cause).named("SourceRef") -// -// case None ⇒ -// // not yet materialized -- in reality this case should not happen, since once materialization is finished, this Future is already completed -// // this impl is kept in case materialization semantics would change for some reason -// Source.fromFutureSource(futureSource.map(ref ⇒ ref.source)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)).mapMaterializedValue(_ ⇒ NotUsed) -// } -// -// // def forSerializationRef: SourceRef[Out] = -// // futureSource.value match { -// // case Some(Success(ready)) ⇒ -// // // the normal case, since once materialization finishes, the future is guaranteed to have been completed -// // ready -// // -// // case Some(Failure(cause)) ⇒ -// // throw new IllegalStateException("Illegal serialization attempt, this stream has never materialized the SourceRef!", cause) -// // -// // case None ⇒ -// // // preStart has not finished yet, so we need to create and serialize a proxy ref -// // val proxy = mkProxy() -// // -// // new SourceRef[Out] { -// // override def source: Source[Out, NotUsed] = -// // ??? -// // } -// // } -// -// override def toString: String = -// futureSource.value match { -// case None ⇒ s"SourceRef()" -// case Some(Success(ready)) ⇒ ready.toString -// case Some(Failure(ex)) ⇒ s"SourceRef()" -// } -// -//} -// -//case class BufferedRef() { -// -//} diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 575de4eb4c..409811d898 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -106,6 +106,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( s"Local stream terminating, message loss (on remote side) may have happened.")) case (sender, StreamRefsProtocol.CumulativeDemand(d)) ⇒ + // the other side may attempt to "double subscribe", which we want to fail eagerly since we're 1:1 pairings observeAndValidateSender(sender, "Illegal sender for CumulativeDemand") if (remoteCumulativeDemandReceived < d) { @@ -154,6 +155,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( case _ ⇒ completedBeforeRemoteConnected = OptionVal(scala.util.Failure(ex)) // not terminating on purpose, since other side may subscribe still and then we want to fail it + // the stage will be terminated either by timeout, or by the handling in `observeAndValidateSender` setKeepGoing(true) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala index 15a8d407ab..cf8790b56f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala @@ -18,7 +18,7 @@ private[akka] final case class StreamRefSettingsImpl private ( ) extends StreamRefSettings { override def withBufferCapacity(value: Int): StreamRefSettings = copy(bufferCapacity = value) - override def withDemandRedeliveryInterval(value: scala.concurrent.duration.FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value) + override def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value) override def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings = copy(subscriptionTimeout = value) override def productPrefix: String = Logging.simpleName(classOf[StreamRefSettings]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala index c6d4079050..0249d766c5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala @@ -32,16 +32,6 @@ private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extend private[this] val sourceRefStageNames = SeqActorName("SourceRef") // "local target" private[this] val sinkRefStageNames = SeqActorName("SinkRef") // "remote sender" - // // needed because serialization may want to serialize a ref, before we have finished materialization - // def bufferedRefFor[T](allocatedName: String, future: Future[SourceRefImpl[T]]): SourceRef[T] = { - // def mkProxyOnDemand(): ActorRef = { - // log.warning("HAVE TO CREATE PROXY!!!") - // val proxyName = allocatedName + "Proxy" - // system.actorOf(Props(new ProxyRefFor(future)), proxyName) - // } - // MaterializedSourceRef[T](future) - // } - // TODO introduce a master with which all stages running the streams register themselves? def nextSourceRefStageName(): String = @@ -51,29 +41,3 @@ private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extend sinkRefStageNames.next() } - -/** INTERNAL API */ -@InternalApi -private[akka] final class ProxyRefFor[T](futureRef: Future[SourceRefImpl[T]]) extends Actor with Stash with ActorLogging { - import context.dispatcher - import akka.pattern.pipe - - override def preStart(): Unit = { - futureRef.pipeTo(self) - } - - override def receive: Receive = { - case ref: SourceRefImpl[T] ⇒ - log.warning("REF:::: initial = " + ref.initialPartnerRef) - context become initialized(ref) - unstashAll() - - case msg ⇒ - log.warning("Stashing [{}], since target reference is still not initialized...", msg.getClass) - stash() - } - - def initialized(ref: SourceRefImpl[T]): Receive = { - case any ⇒ ??? - } -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 63cb82ba1d..680acfbced 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -16,8 +16,6 @@ import akka.japi.Util import java.util.Comparator import java.util.concurrent.CompletionStage -import akka.annotation.InternalApi - import scala.compat.java8.FutureConverters._ object Flow { @@ -221,13 +219,6 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] = new Flow(delegate.mapMaterializedValue(f.apply _)) - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Flow[In, Out, Mat2] = - new Flow(delegate.flattenMaterializedValue(timeout)) - /** * Transform this [[Flow]] by appending the given processing steps. * {{{ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index a060c6be21..20c482dcf6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -18,10 +18,7 @@ import scala.concurrent.ExecutionContext import scala.util.Try import java.util.concurrent.CompletionStage -import akka.annotation.InternalApi - import scala.compat.java8.FutureConverters._ -import scala.concurrent.duration.FiniteDuration /** Java API */ object Sink { @@ -281,18 +278,6 @@ object Sink { new Sink(scaladsl.Sink.lazyInit[T, M]( t ⇒ sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), () ⇒ fallback.create()).mapMaterializedValue(_.toJava)) - - /** - * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), - * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. - * - * Adheres to [[StreamRefAttributes]]. - * - * See more detailed documentation on [[SourceRef]]. - */ - - def sourceRef[T](): javadsl.Sink[T, SourceRef[T]] = - scaladsl.Sink.sourceRef[T]().asJava } /** @@ -336,13 +321,6 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterializedValue(f.apply _)) - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Sink[In, Mat2] = - new Sink(delegate.flattenMaterializedValue[Mat2](timeout)) - /** * Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index f94555de2a..3ba09913a6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -26,7 +26,6 @@ import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture import akka.annotation.InternalApi -import akka.stream.scaladsl.Sink import scala.compat.java8.FutureConverters._ @@ -446,16 +445,6 @@ object Source { (s: S) ⇒ read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext), (s: S) ⇒ close.apply(s).toScala)) - /** - * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), - * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. - * - * Adheres to [[StreamRefAttributes]]. - * - * See more detailed documentation on [[SinkRef]]. - */ - def sinkRef[T](): javadsl.Source[T, SinkRef[T]] = - scaladsl.Source.sinkRef[T]().asJava } /** @@ -485,13 +474,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] = new Source(delegate.mapMaterializedValue(f.apply _)) - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Source[Out, Mat2] = - new Source(delegate.flattenMaterializedValue[Mat2](timeout)) - /** * Transform this [[Source]] by appending the given processing stages. * {{{ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala new file mode 100644 index 0000000000..6563305825 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamRefs.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.javadsl + +import java.util.concurrent.CompletionStage + +import akka.annotation.ApiMayChange +import akka.stream._ + +/** + * API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value + * will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked + * as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details. + * + * Factories for creating stream refs. + */ +@ApiMayChange +object StreamRefs { + import scala.compat.java8.FutureConverters._ + + /** + * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), + * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. + * + * Adheres to [[StreamRefAttributes]]. + * + * See more detailed documentation on [[SourceRef]]. + */ + @ApiMayChange + def sourceRef[T](): javadsl.Sink[T, CompletionStage[SourceRef[T]]] = + scaladsl.StreamRefs.sourceRef[T]().mapMaterializedValue(_.toJava).asJava + + /** + * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), + * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. + * + * Adheres to [[StreamRefAttributes]]. + * + * See more detailed documentation on [[SinkRef]]. + */ + @ApiMayChange + def sinkRef[T](): javadsl.Source[T, CompletionStage[SinkRef[T]]] = + scaladsl.StreamRefs.sinkRef[T]().mapMaterializedValue(_.toJava).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 405e5cdf19..47b749bd68 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -19,7 +19,7 @@ import scala.concurrent.duration.FiniteDuration import scala.language.higherKinds import akka.stream.impl.fusing.FlattenMerge import akka.NotUsed -import akka.annotation.{ DoNotInherit, InternalApi } +import akka.annotation.DoNotInherit /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -114,15 +114,6 @@ final class Flow[-In, +Out, +Mat]( traversalBuilder.transformMat(f), shape) - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] override def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] = - new Flow( - traversalBuilder.flattenMat(timeout), - shape) - /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. * {{{ @@ -2565,12 +2556,6 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] - /** * Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index b6dae63914..de29f07f95 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -6,21 +6,17 @@ package akka.stream.scaladsl import akka.{ Done, NotUsed } import akka.dispatch.ExecutionContexts import akka.actor.{ ActorRef, Props, Status } -import akka.annotation.InternalApi import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages -import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl } import akka.stream.stage._ import akka.stream.{ javadsl, _ } -import akka.util.OptionVal import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.collection.generic.CanBuildFrom import scala.collection.immutable -import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } @@ -61,15 +57,6 @@ final class Sink[-In, +Mat]( traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]), shape) - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Sink[In, Mat2] = - new Sink( - traversalBuilder.flattenMat(timeout), - shape) - /** * Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes @@ -464,18 +451,4 @@ object Sink { def lazyInit[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], fallback: () ⇒ M): Sink[T, Future[M]] = Sink.fromGraph(new LazySink(sinkFactory, fallback)) - /** - * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), - * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. - * - * Adheres to [[StreamRefAttributes]]. - * - * See more detailed documentation on [[SourceRef]]. - */ - def sourceRef[T](): Sink[T, SourceRef[T]] = { - import scala.concurrent.duration._ - val sink = Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None)) - sink.flattenMaterializedValue[SourceRef[T]](10.seconds) - } - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 007b0d6048..1cf78b5383 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -5,16 +5,12 @@ package akka.stream.scaladsl import java.util.concurrent.CompletionStage -import akka.util.ConstantFun -import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Cancellable, Props } -import akka.annotation.InternalApi import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages._ import akka.stream.impl.{ PublisherSource, _ } -import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.{ Outlet, SourceShape, _ } import akka.util.ConstantFun import akka.{ Done, NotUsed } @@ -23,12 +19,9 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } -import akka.stream.impl.streamref.SourceRefStageImpl -import akka.stream.stage.{ GraphStage, GraphStageWithMaterializedValue } -import akka.util.OptionVal +import akka.stream.stage.GraphStageWithMaterializedValue import scala.compat.java8.FutureConverters._ @@ -93,13 +86,6 @@ final class Source[+Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]), shape) - /** - * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future. - */ - @InternalApi - private[akka] override def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] = - new Source[Out, Mat2](traversalBuilder.flattenMat(timeout), shape) - /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]]. @@ -617,17 +603,4 @@ object Source { def unfoldResourceAsync[T, S](create: () ⇒ Future[S], read: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]): Source[T, NotUsed] = Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close)) - /** - * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), - * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. - * - * Adheres to [[StreamRefAttributes]]. - * - * See more detailed documentation on [[SinkRef]]. - */ - def sinkRef[T](): Source[T, SinkRef[T]] = { - import scala.concurrent.duration._ - val value: Source[T, Future[SinkRef[T]]] = Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None)) - value.flattenMaterializedValue[SinkRef[T]](1.second) - } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala new file mode 100644 index 0000000000..ee38b01b6e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.annotation.ApiMayChange +import akka.stream.{ SinkRef, SourceRef, StreamRefAttributes } +import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl } +import akka.util.OptionVal + +import scala.concurrent.Future + +/** + * API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value + * will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked + * as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details. + * + * Factories for creating stream refs. + */ +@ApiMayChange +object StreamRefs { + + /** + * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), + * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. + * + * Adheres to [[StreamRefAttributes]]. + * + * See more detailed documentation on [[SourceRef]]. + */ + @ApiMayChange + def sourceRef[T](): Sink[T, Future[SourceRef[T]]] = + Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None)) + + /** + * A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones), + * to consume data from this local stream, as if they were attached in the spot of the local Sink directly. + * + * Adheres to [[StreamRefAttributes]]. + * + * See more detailed documentation on [[SinkRef]]. + */ + @ApiMayChange + def sinkRef[T](): Source[T, Future[SinkRef[T]]] = + Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None)) +} diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 156e739f9f..3fa1b8ee32 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -186,6 +186,15 @@ object GraphStageLogic { initialReceive: StageActorRef.Receive, name: String) { + // not really needed, but let's keep MiMa happy + def this( + materializer: akka.stream.ActorMaterializer, + getAsyncCallback: StageActorRef.Receive ⇒ AsyncCallback[(ActorRef, Any)], + initialReceive: StageActorRef.Receive + ) { + this(materializer, getAsyncCallback, initialReceive, "") + } + private val callback = getAsyncCallback(internalReceive) private def cell = materializer.supervisor match { case ref: LocalActorRef ⇒ ref.underlying