diff --git a/akka-docs/src/main/paradox/stream/stream-flows-and-basics.md b/akka-docs/src/main/paradox/stream/stream-flows-and-basics.md index 9795936d2a..9109a8e6e9 100644 --- a/akka-docs/src/main/paradox/stream/stream-flows-and-basics.md +++ b/akka-docs/src/main/paradox/stream/stream-flows-and-basics.md @@ -333,6 +333,20 @@ In Graphs it is possible to access the materialized value from inside the stream @@@ +### Source pre-materialization + +There are situations in which you require a `Source` materialized value **before** the `Source` gets hooked up to the rest of the graph. +This is particularly useful in the case of "materialized value powered" `Source`s, like `Source.queue`, `Source.actorRef` or `Source.maybe`. + +By using the `preMaterialize` operator on a `Source`, you can obtain its materialized value and another `Source`. The latter can be used +to consume messages from the original `Source`. Note that this can be materialized multiple times. + +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-prematerialization } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-prematerialization } + ## Stream ordering In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs `{IA1,IA2,...,IAn}` @@ -352,7 +366,7 @@ If you find yourself in need of fine grained control over order of emitted eleme scenarios consider using `MergePreferred`, `MergePrioritized` or `GraphStage` – which gives you full control over how the merge is performed. -# Actor Materializer Lifecycle +## Actor Materializer Lifecycle An important aspect of working with streams and actors is understanding an `ActorMaterializer`'s life-cycle. The materializer is bound to the lifecycle of the `ActorRefFactory` it is created from, which in practice will diff --git a/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java b/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java index 042330309a..7db95bb28d 100644 --- a/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorSystem; +import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.dispatch.Futures; import akka.stream.*; @@ -271,6 +272,22 @@ public class FlowDocTest extends AbstractJavaTest { //#flow-mat-combine } + @Test + public void sourcePreMaterialization() { + //#source-prematerialization + Source matValuePoweredSource = + Source.actorRef(100, OverflowStrategy.fail()); + + Pair> actorRefSourcePair = + matValuePoweredSource.preMaterialize(mat); + + actorRefSourcePair.first().tell("Hello!", ActorRef.noSender()); + + // pass source around for materialization + actorRefSourcePair.second().runWith(Sink.foreach(System.out::println), mat); + //#source-prematerialization + } + public void fusingAndAsync() { //#flow-async Source.range(1, 3) diff --git a/akka-docs/src/test/scala/docs/stream/FlowDocSpec.scala b/akka-docs/src/test/scala/docs/stream/FlowDocSpec.scala index e6579e58f2..8bd8e0fd2f 100644 --- a/akka-docs/src/test/scala/docs/stream/FlowDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/FlowDocSpec.scala @@ -5,7 +5,7 @@ package docs.stream import akka.{ Done, NotUsed } import akka.actor.{ Actor, ActorSystem, Cancellable } -import akka.stream.{ ActorMaterializer, ClosedShape, FlowShape, Materializer } +import akka.stream.{ ActorMaterializer, ClosedShape, FlowShape, Materializer, OverflowStrategy } import akka.stream.scaladsl._ import akka.testkit.AkkaSpec import docs.CompileOnlySpec @@ -230,6 +230,20 @@ class FlowDocSpec extends AkkaSpec with CompileOnlySpec { .to(Sink.ignore) //#flow-async } + + "source pre-materialization" in { + //#source-prematerialization + val matValuePoweredSource = + Source.actorRef[String](bufferSize = 100, overflowStrategy = OverflowStrategy.fail) + + val (actorRef, source) = matValuePoweredSource.preMaterialize() + + actorRef ! "Hello!" + + // pass source around for materialization + source.runWith(Sink.foreach(println)) + //#source-prematerialization + } } object FlowDocSpec { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 43e726fec6..472501a89e 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -884,4 +884,9 @@ public class SourceTest extends StreamTest { final Source f = Source.empty().divertTo(Sink.ignore(), e -> true); final Source f2 = Source.empty().divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo"); } + + @Test + public void mustBeAbleToUsePreMaterialize() { + final Pair> p = Source.empty().preMaterialize(materializer); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index b872d440c9..a4cb4ffe75 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -384,4 +384,67 @@ class SourceSpec extends StreamSpec with DefaultTimeout { closed should ===(true) } } + + "Source pre-materialization" must { + + "materialize the source and connect it to a publisher" in { + val matValPoweredSource = Source.maybe[Int] + val (mat, src) = matValPoweredSource.preMaterialize() + + val probe = src.runWith(TestSink.probe[Int]) + + probe.request(1) + mat.success(Some(42)) + probe.expectNext(42) + probe.expectComplete() + } + + "allow for multiple downstream materialized sources" in { + val matValPoweredSource = Source.queue[String](Int.MaxValue, OverflowStrategy.fail) + val (mat, src) = matValPoweredSource.preMaterialize() + + val probe1 = src.runWith(TestSink.probe[String]) + val probe2 = src.runWith(TestSink.probe[String]) + + probe1.request(1) + probe2.request(1) + mat.offer("One").futureValue + probe1.expectNext("One") + probe2.expectNext("One") + } + + "survive cancellations of downstream materialized sources" in { + val matValPoweredSource = Source.queue[String](Int.MaxValue, OverflowStrategy.fail) + val (mat, src) = matValPoweredSource.preMaterialize() + + val probe1 = src.runWith(TestSink.probe[String]) + src.runWith(Sink.cancelled) + + probe1.request(1) + mat.offer("One").futureValue + probe1.expectNext("One") + } + + "propagate failures to downstream materialized sources" in { + val matValPoweredSource = Source.queue[String](Int.MaxValue, OverflowStrategy.fail) + val (mat, src) = matValPoweredSource.preMaterialize() + + val probe1 = src.runWith(TestSink.probe[String]) + val probe2 = src.runWith(TestSink.probe[String]) + + mat.fail(new RuntimeException("boom")) + + probe1.expectSubscription() + probe2.expectSubscription() + + probe1.expectError().getMessage should ===("boom") + probe2.expectError().getMessage should ===("boom") + } + + "correctly propagate materialization failures" in { + val matValPoweredSource = Source.empty.mapMaterializedValue(_ ⇒ throw new RuntimeException("boom")) + + a[RuntimeException] shouldBe thrownBy(matValPoweredSource.preMaterialize()) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 8241be0921..482866aa5f 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -475,6 +475,15 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] = new Source(delegate.mapMaterializedValue(f.apply _)) + /** + * Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source + * that can be used to consume elements from the newly materialized Source. + */ + def preMaterialize(materializer: Materializer): Pair[Mat @uncheckedVariance, Source[Out @uncheckedVariance, NotUsed]] = { + val (mat, src) = delegate.preMaterialize()(materializer) + Pair(mat, new Source(src)) + } + /** * Transform this [[Source]] by appending the given processing stages. * {{{ @@ -1922,8 +1931,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels or substream cancels */ - def prefixAndTail(n: Int): javadsl.Source[akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] = - new Source(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ akka.japi.Pair(taken.asJava, tail.asJava) }) + def prefixAndTail(n: Int): javadsl.Source[Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] = + new Source(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail.asJava) }) /** * This operation demultiplexes the incoming stream into separate output diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7e9dd39eec..e1ef830acc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -87,6 +87,15 @@ final class Source[+Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]), shape) + /** + * Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source + * that can be used to consume elements from the newly materialized Source. + */ + def preMaterialize()(implicit materializer: Materializer): (Mat, ReprMat[Out, NotUsed]) = { + val (mat, pub) = toMat(Sink.asPublisher(fanout = true))(Keep.both).run() + (mat, Source.fromPublisher(pub)) + } + /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].