diff --git a/akka-docs/src/main/paradox/images/sink-ref-animation.gif b/akka-docs/src/main/paradox/images/sink-ref-animation.gif
new file mode 100644
index 0000000000..ae572ddd13
Binary files /dev/null and b/akka-docs/src/main/paradox/images/sink-ref-animation.gif differ
diff --git a/akka-docs/src/main/paradox/images/source-ref-animation.gif b/akka-docs/src/main/paradox/images/source-ref-animation.gif
index 897ed88146..073d64fdd0 100644
Binary files a/akka-docs/src/main/paradox/images/source-ref-animation.gif and b/akka-docs/src/main/paradox/images/source-ref-animation.gif differ
diff --git a/akka-docs/src/main/paradox/stream/stream-refs.md b/akka-docs/src/main/paradox/stream/stream-refs.md
index 90f1a35b79..8540a61955 100644
--- a/akka-docs/src/main/paradox/stream/stream-refs.md
+++ b/akka-docs/src/main/paradox/stream/stream-refs.md
@@ -69,12 +69,18 @@ That sink materializes the `SourceRef` that you can then send to other nodes. Pl
Scala
: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source }
+Java
+: @@snip [FlowStreamRefsDocTest.java]($code$/java/jdocs/stream/FlowStreamRefsDocTest.java) { #offer-source }
+
The origin actor which creates and owns the Source could also perform some validation or additional setup
when preparing the source. Once it has handed out the `SourceRef` the remote side can run it like this:
Scala
: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source-use }
+Java
+: @@snip [FlowStreamRefsDocTest.java]($code$/java/jdocs/stream/FlowStreamRefsDocTest.java) { #offer-source-use }
+
The process of preparing and running a `SourceRef` powered distributed stream is shown by the animation below:

@@ -104,15 +110,22 @@ into various other systems (e.g. any of the Alpakka provided Sinks).
Scala
: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-sink }
+Java
+: @@snip [FlowStreamRefsDocTest.java]($code$/java/jdocs/stream/FlowStreamRefsDocTest.java) { #offer-sink }
+
Using the offered `SinkRef` to send data to the origin of the Sink is also simple, as we can treat the
SinkRef just as any other Sink and directly `runWith` or `run` with it.
Scala
: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-sink-use }
+Java
+: @@snip [FlowStreamRefsDocTest.java]($code$/java/jdocs/stream/FlowStreamRefsDocTest.java) { #offer-sink-use }
+The process of preparing and running a `SinkRef` powered distributed stream is shown by the animation below:
+
+
-
@@@ warning
A `SinkeRef` is *by design* "single-shot". i.e. it may only be materialized once.
@@ -127,15 +140,13 @@ Scala
## Bulk Stream References
@@@ warning
- Not yet implemented. See ticket ...... FIXME, ticket number
+ Bulk stream references are not implemented yet.
+ See ticket [Bulk Transfer Stream Refs #24276](https://github.com/akka/akka/issues/24276) to track progress or signal demand for this feature.
@@@
-Bulk stream references can be used to create simple to use side-channels to transfer humongous amounts
+Bulk stream refs can be used to create simple to use side-channels to transfer humongous amounts
of data such as huge log files, messages or even media, with as much ease as if it was a trivial local stream.
-Connections for each stream ref bulk stream ref are established independently, and do not utilise
-actor messaging (which is not designed for such bulk transfers, but rather small messages).
-
## Configuration
### Stream reference subscription timeouts
@@ -156,6 +167,8 @@ globally (`akka.stream.materializer.stream-ref.subscription-timeout`), but also
Scala
: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #attr-sub-timeout }
+Java
+: @@snip [FlowStreamRefsDocTest.java]($code$/java/jdocs/stream/FlowStreamRefsDocTest.java) { #attr-sub-timeout }
## General configuration
diff --git a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java
new file mode 100644
index 0000000000..2428c4a694
--- /dev/null
+++ b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java
@@ -0,0 +1,162 @@
+/**
+ * Copyright (C) 2015-2018 Lightbend Inc.
+ */
+package jdocs.stream;
+
+import akka.NotUsed;
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.remote.WireFormats;
+import akka.stream.*;
+import akka.stream.javadsl.*;
+import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
+import jdocs.AbstractJavaTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class FlowStreamRefsDocTest extends AbstractJavaTest {
+
+ static ActorSystem system = null;
+ static Materializer mat = null;
+
+ @Test
+ public void compileOnlySpec() {
+ // do nothing
+ }
+
+ //#offer-source
+ static class RequestLogs {
+ public final long streamId;
+
+ public RequestLogs(long streamId) {
+ this.streamId = streamId;
+ }
+ }
+
+ static class LogsOffer {
+ final SourceRef sourceRef;
+
+ public LogsOffer(SourceRef sourceRef) {
+ this.sourceRef = sourceRef;
+ }
+ }
+
+ static class DataSource extends AbstractActor {
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder()
+ .match(RequestLogs.class, this::handleRequestLogs)
+ .build();
+ }
+
+ private void handleRequestLogs(RequestLogs requestLogs) {
+ Source logs = streamLogs(requestLogs.streamId);
+ SourceRef logsRef = logs.runWith(Sink.sourceRef(), mat);
+ LogsOffer offer = new LogsOffer(logsRef);
+ sender().tell(offer, self());
+ }
+
+ private Source streamLogs(long streamId) {
+ return Source.repeat("[INFO] some interesting logs here (for id: " + streamId + ")");
+ }
+ }
+ //#offer-source
+
+ public void offerSource() {
+ new TestKit(system) {{
+
+ //#offer-source-use
+ ActorRef sourceActor = system.actorOf(Props.create(DataSource.class), "dataSource");
+
+ sourceActor.tell(new RequestLogs(1337), getTestActor());
+ LogsOffer offer = expectMsgClass(LogsOffer.class);
+
+ offer.sourceRef.getSource()
+ .runWith(Sink.foreach(log -> System.out.println(log)), mat);
+
+ //#offer-source-use
+ }};
+ }
+
+ //#offer-sink
+ class PrepareUpload {
+ final String id;
+
+ public PrepareUpload(String id) {
+ this.id = id;
+ }
+ }
+ class MeasurementsSinkReady {
+ final String id;
+ final SinkRef sinkRef;
+
+ public PrepareUpload(String id, SinkRef ref) {
+ this.id = id;
+ this.sinkRef = ref;
+ }
+ }
+
+ static class DataReceiver extends AbstractActor {
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder()
+ .match(PrepareUpload.class, prepare -> {
+ Sink sink = logsSinkFor(prepare.id);
+ SinkRef sinkRef = Source.sinkRef().to(sink).run(mat);
+
+ sender().tell(new MeasurementsSinkReady(sinkRef), self());
+ })
+ .create();
+ }
+
+ private Sink logsSinkFor(String id) {
+ return Sink.ignore(); // would be actual useful Sink in reality
+ }
+ }
+ //#offer-sink
+
+ public void offerSink() {
+ new TestKit(system) {{
+
+ //#offer-sink-use
+ ActorRef receiver = system.actorOf(Props.create(DataReceiver.class), "dataReceiver");
+
+ receiver.tell(new PrepareUpload("system-42-tmp"), getTestActor());
+ MeasurementsSinkReady ready = expectMsgClass(LogsOffer.class);
+
+ Source.repeat("hello")
+ .runWith(ready.sinkRef, mat);
+ //#offer-sink-use
+ }};
+ }
+
+ public void configureTimeouts() {
+ new TestKit(system) {{
+
+ //#attr-sub-timeout
+ FiniteDuration timeout = FiniteDuration.create(5, TimeUnit.SECONDS);
+ Attributes timeoutAttributes = StreamRefAttributes.subscriptionTimeout(timeout);
+
+ // configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
+ Source.repeat("hello")
+ .runWith(Sink.sourceRef().addAttributes(timeoutAttributes), mat);
+
+ // configuring SinkRef.source:
+ Source.sinkRef().addAttributes(timeoutAttributes)
+ .runWith(Sink.ignore(), mat); // not very interesting sink, just an example
+
+ //#attr-sub-timeout
+ }};
+ }
+
+}
diff --git a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala
index bc674df352..3dd8f5be05 100644
--- a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala
+++ b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala
@@ -62,8 +62,8 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
import akka.pattern._
import akka.stream.SinkRef
- case class PrepareUpload(sourceId: String)
- case class MeasurementsSinkReady(sourceId: String, sinkRef: SinkRef[String])
+ case class PrepareUpload(id: String)
+ case class MeasurementsSinkReady(id: String, sinkRef: SinkRef[String])
class DataReceiver extends Actor {
@@ -112,7 +112,7 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
import scala.concurrent.duration._
import akka.stream.StreamRefAttributes
- // configuring SourceRef.sink (notice that we apply the attributes to the Sink!):
+ // configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
Source.repeat("hello")
.runWith(Sink.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)))
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala
index 217b16e741..60a9339d1c 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala
@@ -43,7 +43,7 @@ object StreamRefsSpec {
case "give-infinite" ⇒
val source: Source[String, NotUsed] = Source.fromIterator(() ⇒ Iterator.from(1)).map("ping-" + _)
- val ref: SourceRef[String] = source.runWith(Sink.sourceRef())
+ val (r: NotUsed, ref: SourceRef[String]) = source.toMat(Sink.sourceRef())(Keep.both).run()
sender() ! ref
@@ -85,14 +85,14 @@ object StreamRefsSpec {
* For them it's a Sink; for us it's a Source.
*/
val sink: SinkRef[String] =
- Source.sinkRef[String]
+ Source.sinkRef[String]()
.to(Sink.actorRef(probe, ""))
.run()
sender() ! sink
case "receive-subscribe-timeout" ⇒
- val sink = Source.sinkRef[String]
+ val sink = Source.sinkRef[String]()
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
.to(Sink.actorRef(probe, ""))
.run()
@@ -100,7 +100,7 @@ object StreamRefsSpec {
sender() ! sink
case "receive-32" ⇒
- val (sink, driver) = Source.sinkRef[String]
+ val (sink, driver) = Source.sinkRef[String]()
.toMat(TestSink.probe(context.system))(Keep.both)
.run()
diff --git a/akka-stream/src/main/scala/akka/stream/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/StreamRefs.scala
index e2e6871170..7a208e511a 100644
--- a/akka-stream/src/main/scala/akka/stream/StreamRefs.scala
+++ b/akka-stream/src/main/scala/akka/stream/StreamRefs.scala
@@ -4,7 +4,7 @@
package akka.stream
import akka.NotUsed
-import akka.actor.ActorRef
+import akka.actor.{ ActorRef, ActorSystem }
import akka.stream.scaladsl.{ Sink, Source }
import scala.language.implicitConversions
@@ -14,7 +14,7 @@ import scala.language.implicitConversions
*/
object SinkRef {
/** Implicitly converts a [[SinkRef]] to a [[Sink]]. The same can be achieved by calling `.sink` on the reference. */
- implicit def convertRefToSink[T](sinkRef: SinkRef[T]): Sink[T, NotUsed] = sinkRef.sink
+ implicit def convertRefToSink[T](sinkRef: SinkRef[T]): Sink[T, NotUsed] = sinkRef.sink()
}
/**
@@ -33,9 +33,9 @@ object SinkRef {
trait SinkRef[In] {
/** Scala API: Get [[Sink]] underlying to this source ref. */
- def sink: Sink[In, NotUsed]
+ def sink(): Sink[In, NotUsed]
/** Java API: Get [[javadsl.Sink]] underlying to this source ref. */
- def getSink: javadsl.Sink[In, NotUsed]
+ final def getSink(): javadsl.Sink[In, NotUsed] = sink().asJava
}
/**
@@ -63,7 +63,7 @@ trait SourceRef[T] {
/** Scala API: Get [[Source]] underlying to this source ref. */
def source: Source[T, NotUsed]
/** Java API: Get [[javadsl.Source]] underlying to this source ref. */
- def getSource: javadsl.Source[T, NotUsed]
+ final def getSource: javadsl.Source[T, NotUsed] = source.asJava
}
// --- exceptions ---
diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala
index 588cfb787a..25e3791dd8 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala
@@ -24,9 +24,11 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.collection.immutable.Map
import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.{ Await, ExecutionContextExecutor, Future }
import scala.annotation.tailrec
-import akka.util.OptionVal
+import akka.util.{ OptionVal, PrettyDuration }
+
+import scala.util.{ Failure, Success }
/**
* INTERNAL API
@@ -431,6 +433,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
val traversalStack = new java.util.ArrayDeque[Traversal](16)
traversalStack.addLast(current)
+ var needsFlattening = false
val matValueStack = new java.util.ArrayDeque[Any](8)
if (Debug) {
@@ -478,7 +481,18 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
case compose: Compose ⇒
val second = matValueStack.removeLast()
val first = matValueStack.removeLast()
- val result = compose(first, second)
+
+ val result =
+ if (needsFlattening && (first.isInstanceOf[FlattenMatHolder[_]] || second.isInstanceOf[FlattenMatHolder[_]])) {
+ (first, second) match {
+ case (FlattenMatHolder(f1, t1), FlattenMatHolder(f2, t2)) ⇒
+ FlattenMatHolder[Any](f1.zip(f2).map({ case (left, right) ⇒ compose(left, right) })(system.dispatcher), t1) // FIXME dedicate a dispatcher thread?
+ case (FlattenMatHolder(f1, t1), v2) ⇒
+ FlattenMatHolder(f1.map(compose(_, v2))(system.dispatcher), t1) // FIXME dedicate a dispatcher thread?
+ case (v1, FlattenMatHolder(f2, t2)) ⇒
+ FlattenMatHolder(f2.map(compose(v1, _))(system.dispatcher), t2) // FIXME dedicate a dispatcher thread?
+ }
+ } else compose(first, second)
matValueStack.addLast(result)
if (Debug) println(s"COMP: $matValueStack")
case PushAttributes(attr) ⇒
@@ -491,6 +505,13 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
islandTracking.enterIsland(tag, attributesStack.getLast)
case ExitIsland ⇒
islandTracking.exitIsland()
+ case flatten: FlattenMat ⇒
+ val prev = matValueStack.removeLast()
+ if (!prev.isInstanceOf[Future[_]]) throw new IllegalArgumentException("flattenMaterializedValue MUST be applied immediately after a materialized value ")
+ val result = FlattenMatHolder(prev.asInstanceOf[Future[Mat]], flatten.timeout)
+ needsFlattening = true
+ matValueStack.addLast(result)
+ if (Debug) println(s"FLTN: $matValueStack")
case _ ⇒
}
current = nextStep
@@ -504,7 +525,23 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
islandTracking.allNestedIslandsReady()
if (Debug) println("--- Finished materialization")
- matValueStack.peekLast().asInstanceOf[Mat]
+ matValueStack.peekLast() match {
+ case FlattenMatHolder(f: Future[Mat @unchecked], t) ⇒
+ f.value match {
+ case Some(Success(m)) ⇒ m
+ case Some(Failure(ex)) ⇒ throw new Exception("Flattened materialized value failed!", ex)
+ case None ⇒
+ // last resort, await
+ val start = System.currentTimeMillis()
+ val mat = Await.result(f, t)
+ val stop = System.currentTimeMillis()
+ import scala.concurrent.duration._
+ println(s"Waiting took: >>> ${PrettyDuration.format((stop - start).millis)} <<<")
+ mat
+ }
+
+ case mat: Mat ⇒ mat
+ }
} finally {
if (isShutdown) throw shutdownWhileMaterializingFailure
diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala
index a9f99c9f6c..601265c8ac 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala
@@ -13,6 +13,8 @@ import akka.util.OptionVal
import scala.language.existentials
import scala.collection.immutable.Map.Map1
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ Await, Future }
/**
* INTERNAL API
@@ -154,6 +156,12 @@ import scala.collection.immutable.Map.Map1
@InternalApi private[akka] final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp {
def apply(arg: Any): Any = mapper.asInstanceOf[Any ⇒ Any](arg)
}
+/**
+ * INTERNAL API
+ */
+@InternalApi private[akka] final case class FlattenMat(timeout: FiniteDuration) extends MaterializedValueOp
+/** INTERNAL API */
+@InternalApi private[akka] final case class FlattenMatHolder[M](f: Future[M], timeout: FiniteDuration)
/**
* INTERNAL API
@@ -365,6 +373,8 @@ import scala.collection.immutable.Map.Map1
*/
def transformMat(f: AnyFunction1): TraversalBuilder
+ def flattenMat(timeout: FiniteDuration): TraversalBuilder
+
protected def internalSetAttributes(attributes: Attributes): TraversalBuilder
def setAttributes(attributes: Attributes): TraversalBuilder = {
@@ -480,6 +490,9 @@ import scala.collection.immutable.Map.Map1
override def transformMat(f: AnyFunction1): TraversalBuilder =
copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
+ override def flattenMat(timeout: FiniteDuration): TraversalBuilder =
+ copy(traversalSoFar = traversalSoFar.concat(FlattenMat(timeout)))
+
override def offsetOf(in: InPort): Int = inToOffset(in)
override def isTraversalComplete: Boolean = true
@@ -532,6 +545,9 @@ import scala.collection.immutable.Map.Map1
override def transformMat(f: AnyFunction1): TraversalBuilder =
TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f)
+ override def flattenMat(timeout: FiniteDuration): TraversalBuilder =
+ TraversalBuilder.empty().add(this, module.shape, Keep.right).flattenMat(timeout)
+
override val inSlots: Int = module.shape.inlets.size
override def offsetOfModule(out: OutPort): Int = 0
@@ -1050,6 +1066,15 @@ import scala.collection.immutable.Map.Map1
copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
}
+ /**
+ * UNSAFE API: May cause blocking DURING MATERIALIZATION.
+ * Use this API only in places where it is KNOWN that the flattened Future will be completed ASAP (less than a few ms, zero at best).
+ * WITH GREAT POWER, COMES GREAT RESPONSIBILITY.
+ */
+ override def flattenMat(timeout: FiniteDuration): LinearTraversalBuilder = {
+ copy(traversalSoFar = traversalSoFar.concat(FlattenMat(timeout)))
+ }
+
/**
* Wraps the builder in an island that can be materialized differently, using async boundaries to bridge
* between islands.
@@ -1301,6 +1326,9 @@ import scala.collection.immutable.Map.Map1
override def transformMat(f: AnyFunction1): TraversalBuilder = {
copy(finalSteps = finalSteps.concat(Transform(f)))
}
+ override def flattenMat(timeout: FiniteDuration): TraversalBuilder = {
+ copy(finalSteps = finalSteps.concat(FlattenMat(timeout)))
+ }
override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
this.islandTag match {
diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala
index b0b2f9eade..ecbb496012 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSinkRef.scala
@@ -1,49 +1,47 @@
-/**
- * Copyright (C) 2018 Lightbend Inc.
- */
-package akka.stream.impl.streamref
-
-import akka.NotUsed
-import akka.annotation.InternalApi
-import akka.stream.{ SinkRef, javadsl }
-import akka.stream.scaladsl.Sink
-
-import scala.concurrent.Future
-import scala.util.{ Failure, Success }
-
-/**
- * INTERNAL API
- * Allows users to directly use the SinkRef, even though we do have to go through the Future in order to be able
- * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SinkRefStageImpl]] for usage.
- */
-@InternalApi
-private[akka] final case class MaterializedSinkRef[In](futureSink: Future[SinkRefImpl[In]]) extends SinkRef[In] {
-
- override def sink: Sink[In, NotUsed] =
- futureSink.value match {
-
- case Some(Success(ready)) ⇒
- // the normal case, since once materialization finishes, the future is guaranteed to have been completed
- ready.sink
-
- case Some(Failure(cause)) ⇒
- // materialization failed
- Sink.cancelled
-
- case None ⇒
- throw new Exception(s"This should not be possible! We guarantee to complete the materialized Future value when materialization finishes! Sink was: $futureSink")
- // // not yet materialized -- in reality this case should not happen, since once materialization is finished, this Future is already completed
- // // this impl is kept in case materialization semantics would change for some reason
- // Source.fromFutureSource(futureSource.map(ref => ref.source)(ex)).mapMaterializedValue(_ ⇒ NotUsed)
- }
-
- override def getSink: javadsl.Sink[In, NotUsed] = sink.asJava
-
- override def toString: String =
- futureSink.value match {
- case None ⇒ s"SinkRef()"
- case Some(Success(ready)) ⇒ ready.toString
- case Some(Failure(ex)) ⇒ s"SinkRef()"
- }
-
-}
+///**
+// * Copyright (C) 2018 Lightbend Inc.
+// */
+//package akka.stream.impl.streamref
+//
+//import akka.NotUsed
+//import akka.annotation.InternalApi
+//import akka.stream.{ SinkRef, javadsl }
+//import akka.stream.scaladsl.{ Sink, Source }
+//
+//import scala.concurrent.{ Await, Future }
+//import scala.util.{ Failure, Success }
+//import scala.concurrent.duration._
+//
+///**
+// * INTERNAL API
+// * Allows users to directly use the SinkRef, even though we do have to go through the Future in order to be able
+// * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SinkRefStageImpl]] for usage.
+// */
+//@InternalApi
+//private[akka] final case class MaterializedSinkRef[In](futureSink: Future[SinkRefImpl[In]]) extends SinkRef[In] {
+//
+// override def sink: Sink[In, NotUsed] =
+// futureSink.value match {
+//
+// case Some(Success(ready)) ⇒
+// // the normal case, since once materialization finishes, the future is guaranteed to have been completed
+// ready.sink
+//
+// case Some(Failure(cause)) ⇒
+// // materialization failed
+// Sink.cancelled
+//
+// case None ⇒
+// ???
+// // not yet materialized, awaiting the preStart to be run...
+// // Source.fromFutureSource(futureSource.map(ref => ref.source)(ex)).mapMaterializedValue(_ ⇒ NotUsed)
+// }
+//
+// override def toString: String =
+// futureSink.value match {
+// case None ⇒ s"SinkRef()"
+// case Some(Success(ready)) ⇒ ready.toString
+// case Some(Failure(ex)) ⇒ s"SinkRef()"
+// }
+//
+//}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala
index 64d45152c4..8d606c4011 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/MaterializedSourceRef.scala
@@ -1,49 +1,79 @@
-/**
- * Copyright (C) 2018 Lightbend Inc.
- */
-package akka.stream.impl.streamref
-
-import akka.NotUsed
-import akka.annotation.InternalApi
-import akka.stream.scaladsl.Source
-import akka.stream.{ SourceRef, javadsl }
-
-import scala.concurrent.{ ExecutionContext, Future }
-import scala.util.{ Failure, Success }
-
-/**
- * INTERNAL API
- * Allows users to directly use the SourceRef, even though we do have to go through the Future in order to be able
- * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SourceRefStageImpl]] for usage.
- */
-@InternalApi
-private[akka] final case class MaterializedSourceRef[Out](futureSource: Future[SourceRefImpl[Out]]) extends SourceRef[Out] {
-
- override def source: Source[Out, NotUsed] =
- futureSource.value match {
-
- case Some(Success(ready)) ⇒
- // the normal case, since once materialization finishes, the future is guaranteed to have been completed
- ready.source
-
- case Some(Failure(cause)) ⇒
- // materialization failed
- Source.failed(cause).named("SourceRef")
-
- case None ⇒
- throw new Exception(s"This should not be possible! We guarantee to complete the materialized Future value when materialization finishes! Source was: $futureSource")
- // // not yet materialized -- in reality this case should not happen, since once materialization is finished, this Future is already completed
- // // this impl is kept in case materialization semantics would change for some reason
- // Source.fromFutureSource(futureSource.map(ref => ref.source)(ex)).mapMaterializedValue(_ ⇒ NotUsed)
- }
-
- override def getSource: javadsl.Source[Out, NotUsed] = source.asJava
-
- override def toString: String =
- futureSource.value match {
- case None ⇒ s"SourceRef()"
- case Some(Success(ready)) ⇒ ready.toString
- case Some(Failure(ex)) ⇒ s"SourceRef()"
- }
-
-}
+///**
+// * Copyright (C) 2018 Lightbend Inc.
+// */
+//package akka.stream.impl.streamref
+//
+//import java.util.concurrent.atomic.AtomicReference
+//
+//import akka.NotUsed
+//import akka.actor.ActorRef
+//import akka.annotation.InternalApi
+//import akka.stream.scaladsl.Source
+//import akka.stream.{ OverflowStrategy, SourceRef, javadsl }
+//
+//import scala.concurrent.{ ExecutionContext, Future }
+//import scala.util.{ Failure, Success }
+//
+///**
+// * INTERNAL API
+// * Allows users to directly use the SourceRef, even though we do have to go through the Future in order to be able
+// * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SourceRefStageImpl]] for usage.
+// */
+//@InternalApi
+//private[akka] final case class MaterializedSourceRef[Out](futureSource: Future[SourceRefImpl[Out]]) extends SourceRef[Out] {
+//
+// class State
+// final case class Initializing(buffer: Vector[Out]) extends State
+// final case class Initialized(ref: SourceRefImpl[Out]) extends State
+//
+// val it = new AtomicReference[State](Initializing(Vector.empty))
+//
+// // the advanced logic here is in order to allow RUNNING a stream locally ASAP even while materialization is still in-flight (preStart has not completed futureSource)
+// override def source: Source[Out, NotUsed] =
+// futureSource.value match {
+//
+// case Some(Success(ready)) ⇒
+// // the normal case, since once materialization finishes, the future is guaranteed to have been completed
+// ready.source
+//
+// case Some(Failure(cause)) ⇒
+// // materialization failed
+// Source.failed(cause).named("SourceRef")
+//
+// case None ⇒
+// // not yet materialized -- in reality this case should not happen, since once materialization is finished, this Future is already completed
+// // this impl is kept in case materialization semantics would change for some reason
+// Source.fromFutureSource(futureSource.map(ref ⇒ ref.source)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)).mapMaterializedValue(_ ⇒ NotUsed)
+// }
+//
+// // def forSerializationRef: SourceRef[Out] =
+// // futureSource.value match {
+// // case Some(Success(ready)) ⇒
+// // // the normal case, since once materialization finishes, the future is guaranteed to have been completed
+// // ready
+// //
+// // case Some(Failure(cause)) ⇒
+// // throw new IllegalStateException("Illegal serialization attempt, this stream has never materialized the SourceRef!", cause)
+// //
+// // case None ⇒
+// // // preStart has not finished yet, so we need to create and serialize a proxy ref
+// // val proxy = mkProxy()
+// //
+// // new SourceRef[Out] {
+// // override def source: Source[Out, NotUsed] =
+// // ???
+// // }
+// // }
+//
+// override def toString: String =
+// futureSource.value match {
+// case None ⇒ s"SourceRef()"
+// case Some(Success(ready)) ⇒ ready.toString
+// case Some(Failure(ex)) ⇒ s"SourceRef()"
+// }
+//
+//}
+//
+//case class BufferedRef() {
+//
+//}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala
index a4c925a252..575de4eb4c 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala
@@ -6,7 +6,7 @@ package akka.stream.impl.streamref
import scala.language.implicitConversions
import akka.Done
import akka.NotUsed
-import akka.actor.{ ActorRef, Terminated }
+import akka.actor.{ ActorRef, ActorSystem, Terminated }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.stream._
@@ -20,10 +20,8 @@ import scala.util.Try
/** INTERNAL API: Implementation class, not intended to be touched directly by end-users */
@InternalApi
private[stream] final case class SinkRefImpl[In](initialPartnerRef: ActorRef) extends SinkRef[In] {
- override def sink: Sink[In, NotUsed] =
+ override def sink(): Sink[In, NotUsed] =
Sink.fromGraph(new SinkRefStageImpl[In](OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ ⇒ NotUsed)
-
- override def getSink: javadsl.Sink[In, NotUsed] = sink.asJava
}
/**
@@ -35,7 +33,7 @@ private[stream] final case class SinkRefImpl[In](initialPartnerRef: ActorRef) ex
@InternalApi
private[stream] final class SinkRefStageImpl[In] private[akka] (
val initialPartnerRef: OptionVal[ActorRef]
-) extends GraphStageWithMaterializedValue[SinkShape[In], SourceRef[In]] {
+) extends GraphStageWithMaterializedValue[SinkShape[In], Future[SourceRef[In]]] {
val in: Inlet[In] = Inlet[In](s"${Logging.simpleName(getClass)}($initialRefName).in")
override def shape: SinkShape[In] = SinkShape.of(in)
@@ -199,10 +197,9 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
}
setHandler(in, this)
-
}
- (logic, MaterializedSourceRef[In](promise.future))
+ (logic, promise.future)
}
override def toString = s"${Logging.simpleName(getClass)}($initialRefName)"
diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala
index cf7a954059..a5e62d7c8b 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala
@@ -14,7 +14,7 @@ import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.util.{ OptionVal, PrettyDuration }
-import scala.concurrent.Promise
+import scala.concurrent.{ Future, Promise }
import scala.language.implicitConversions
/** INTERNAL API: Implementation class, not intended to be touched directly by end-users */
@@ -22,8 +22,6 @@ import scala.language.implicitConversions
private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) extends SourceRef[T] {
def source: Source[T, NotUsed] =
Source.fromGraph(new SourceRefStageImpl(OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ ⇒ NotUsed)
-
- def getSource: javadsl.Source[T, NotUsed] = source.asJava
}
/**
@@ -35,7 +33,7 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
@InternalApi
private[stream] final class SourceRefStageImpl[Out](
val initialPartnerRef: OptionVal[ActorRef]
-) extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] {
+) extends GraphStageWithMaterializedValue[SourceShape[Out], Future[SinkRef[Out]]] { stage ⇒
val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out")
override def shape = SourceShape.of(out)
@@ -46,7 +44,7 @@ private[stream] final class SourceRefStageImpl[Out](
case _ ⇒ ""
}
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SinkRef[Out]) = {
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[SinkRef[Out]]) = {
val promise = Promise[SinkRefImpl[Out]]()
val logic = new TimerGraphStageLogic(shape) with StageLogging with OutHandler {
@@ -226,7 +224,8 @@ private[stream] final class SourceRefStageImpl[Out](
setHandler(out, this)
}
- (logic, MaterializedSinkRef[Out](promise.future))
+ // (logic, MaterializedSinkRef[Out](promise.future))
+ (logic, promise.future)
}
override def toString: String =
diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala
index 3d4299ea18..c6d4079050 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala
@@ -3,10 +3,14 @@
*/
package akka.stream.impl.streamref
-import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
+import akka.actor.{ Actor, ActorLogging, ActorPath, ActorRef, ActorRefProvider, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, MinimalActorRef, Props, RepointableActorRef, Stash }
import akka.annotation.InternalApi
+import akka.event.Logging
+import akka.stream.SourceRef
import akka.stream.impl.SeqActorName
+import scala.concurrent.Future
+
/** INTERNAL API */
@InternalApi
private[stream] object StreamRefsMaster extends ExtensionId[StreamRefsMaster] with ExtensionIdProvider {
@@ -23,9 +27,21 @@ private[stream] object StreamRefsMaster extends ExtensionId[StreamRefsMaster] wi
@InternalApi
private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extends Extension {
+ private val log = Logging(system, getClass)
+
private[this] val sourceRefStageNames = SeqActorName("SourceRef") // "local target"
private[this] val sinkRefStageNames = SeqActorName("SinkRef") // "remote sender"
+ // // needed because serialization may want to serialize a ref, before we have finished materialization
+ // def bufferedRefFor[T](allocatedName: String, future: Future[SourceRefImpl[T]]): SourceRef[T] = {
+ // def mkProxyOnDemand(): ActorRef = {
+ // log.warning("HAVE TO CREATE PROXY!!!")
+ // val proxyName = allocatedName + "Proxy"
+ // system.actorOf(Props(new ProxyRefFor(future)), proxyName)
+ // }
+ // MaterializedSourceRef[T](future)
+ // }
+
// TODO introduce a master with which all stages running the streams register themselves?
def nextSourceRefStageName(): String =
@@ -35,3 +51,29 @@ private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extend
sinkRefStageNames.next()
}
+
+/** INTERNAL API */
+@InternalApi
+private[akka] final class ProxyRefFor[T](futureRef: Future[SourceRefImpl[T]]) extends Actor with Stash with ActorLogging {
+ import context.dispatcher
+ import akka.pattern.pipe
+
+ override def preStart(): Unit = {
+ futureRef.pipeTo(self)
+ }
+
+ override def receive: Receive = {
+ case ref: SourceRefImpl[T] ⇒
+ log.warning("REF:::: initial = " + ref.initialPartnerRef)
+ context become initialized(ref)
+ unstashAll()
+
+ case msg ⇒
+ log.warning("Stashing [{}], since target reference is still not initialized...", msg.getClass)
+ stash()
+ }
+
+ def initialized(ref: SourceRefImpl[T]): Receive = {
+ case any ⇒ ???
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 680acfbced..63cb82ba1d 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -16,6 +16,8 @@ import akka.japi.Util
import java.util.Comparator
import java.util.concurrent.CompletionStage
+import akka.annotation.InternalApi
+
import scala.compat.java8.FutureConverters._
object Flow {
@@ -219,6 +221,13 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
new Flow(delegate.mapMaterializedValue(f.apply _))
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Flow[In, Out, Mat2] =
+ new Flow(delegate.flattenMaterializedValue(timeout))
+
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
index 04965e1891..a060c6be21 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
@@ -18,7 +18,10 @@ import scala.concurrent.ExecutionContext
import scala.util.Try
import java.util.concurrent.CompletionStage
+import akka.annotation.InternalApi
+
import scala.compat.java8.FutureConverters._
+import scala.concurrent.duration.FiniteDuration
/** Java API */
object Sink {
@@ -333,6 +336,13 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterializedValue(f.apply _))
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Sink[In, Mat2] =
+ new Sink(delegate.flattenMaterializedValue[Mat2](timeout))
+
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 6fccfcf3e4..f94555de2a 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -25,6 +25,7 @@ import scala.compat.java8.OptionConverters._
import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture
+import akka.annotation.InternalApi
import akka.stream.scaladsl.Sink
import scala.compat.java8.FutureConverters._
@@ -484,6 +485,13 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] =
new Source(delegate.mapMaterializedValue(f.apply _))
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Source[Out, Mat2] =
+ new Source(delegate.flattenMaterializedValue[Mat2](timeout))
+
/**
* Transform this [[Source]] by appending the given processing stages.
* {{{
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 47b749bd68..405e5cdf19 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -19,7 +19,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
-import akka.annotation.DoNotInherit
+import akka.annotation.{ DoNotInherit, InternalApi }
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
@@ -114,6 +114,15 @@ final class Flow[-In, +Out, +Mat](
traversalBuilder.transformMat(f),
shape)
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] override def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] =
+ new Flow(
+ traversalBuilder.flattenMat(timeout),
+ shape)
+
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{
@@ -2556,6 +2565,12 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/
def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2]
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2]
+
/**
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
index 299410b6cc..b6dae63914 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
@@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status }
+import akka.annotation.InternalApi
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._
@@ -19,6 +20,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.generic.CanBuildFrom
import scala.collection.immutable
+import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
@@ -59,6 +61,15 @@ final class Sink[-In, +Mat](
traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]),
shape)
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Sink[In, Mat2] =
+ new Sink(
+ traversalBuilder.flattenMat(timeout),
+ shape)
+
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
@@ -461,6 +472,10 @@ object Sink {
*
* See more detailed documentation on [[SourceRef]].
*/
- def sourceRef[T](): Sink[T, SourceRef[T]] =
- Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None))
+ def sourceRef[T](): Sink[T, SourceRef[T]] = {
+ import scala.concurrent.duration._
+ val sink = Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None))
+ sink.flattenMaterializedValue[SourceRef[T]](10.seconds)
+ }
+
}
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
index b9bd8a29c2..007b0d6048 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
@@ -4,9 +4,11 @@
package akka.stream.scaladsl
import java.util.concurrent.CompletionStage
+
import akka.util.ConstantFun
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, Props }
+import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages
@@ -24,7 +26,6 @@ import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
-
import akka.stream.impl.streamref.SourceRefStageImpl
import akka.stream.stage.{ GraphStage, GraphStageWithMaterializedValue }
import akka.util.OptionVal
@@ -92,6 +93,13 @@ final class Source[+Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] =
new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]), shape)
+ /**
+ * INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
+ */
+ @InternalApi
+ private[akka] override def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] =
+ new Source[Out, Mat2](traversalBuilder.flattenMat(timeout), shape)
+
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
@@ -617,6 +625,9 @@ object Source {
*
* See more detailed documentation on [[SinkRef]].
*/
- def sinkRef[T](): Source[T, SinkRef[T]] =
- Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None))
+ def sinkRef[T](): Source[T, SinkRef[T]] = {
+ import scala.concurrent.duration._
+ val value: Source[T, Future[SinkRef[T]]] = Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None))
+ value.flattenMaterializedValue[SinkRef[T]](1.second)
+ }
}
diff --git a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala
index 05eee88723..021f2d14da 100644
--- a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala
+++ b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala
@@ -7,10 +7,10 @@ import akka.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
-import akka.stream.StreamRefMessages
+import akka.stream.{ SourceRef, StreamRefMessages }
import akka.stream.impl.streamref._
-import scala.concurrent.duration._
+import scala.concurrent.duration._
import scala.concurrent.Await
/** INTERNAL API */
@@ -39,9 +39,9 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) e
case _: StreamRefsProtocol.RemoteStreamCompleted ⇒ RemoteSinkCompletedManifest
// refs
case _: SourceRefImpl[_] ⇒ SourceRefManifest
- case _: MaterializedSourceRef[_] ⇒ SourceRefManifest
+ // case _: MaterializedSourceRef[_] ⇒ SourceRefManifest
case _: SinkRefImpl[_] ⇒ SinkRefManifest
- case _: MaterializedSinkRef[_] ⇒ SinkRefManifest
+ // case _: MaterializedSinkRef[_] ⇒ SinkRefManifest
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
@@ -55,9 +55,9 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) e
case d: StreamRefsProtocol.RemoteStreamCompleted ⇒ serializeRemoteSinkCompleted(d).toByteArray
// refs
case ref: SinkRefImpl[_] ⇒ serializeSinkRef(ref).toByteArray
- case ref: MaterializedSinkRef[_] ⇒ serializeSinkRef(Await.result(ref.futureSink, 100.millis)).toByteArray
+ // case ref: MaterializedSinkRef[_] ⇒ ??? // serializeSinkRef(ref).toByteArray
case ref: SourceRefImpl[_] ⇒ serializeSourceRef(ref).toByteArray
- case ref: MaterializedSourceRef[_] ⇒ serializeSourceRef(Await.result(ref.futureSource, 100.millis)).toByteArray
+ // case ref: MaterializedSourceRef[_] ⇒ serializeSourceRef(ref.).toByteArray
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {