diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index ea45e1e658..8136909ae4 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -520,6 +520,14 @@ Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a --------------------------------------------------------------- +### preMaterialize + +Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one. + +Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you. + +--------------------------------------------------------------- +
## Additional Sink and Source converters diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 2bef003a97..e6ee35a4a6 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -8,11 +8,14 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import akka.Done; import akka.NotUsed; +import akka.japi.Pair; import akka.japi.function.Function; import akka.stream.*; import akka.testkit.javadsl.TestKit; @@ -43,7 +46,7 @@ public class SinkTest extends StreamTest { @SuppressWarnings("unused") final Publisher publisher = Source.from(new ArrayList()).runWith(pubSink, materializer); } - + @Test public void mustBeAbleToUseFuture() throws Exception { final Sink> futSink = Sink.head(); @@ -58,7 +61,7 @@ public class SinkTest extends StreamTest { @SuppressWarnings("unused") CompletionStage integerFuture = Source.from(new ArrayList()).runWith(foldSink, materializer); } - + @Test public void mustBeAbleToUseActorRefSink() throws Exception { final TestKit probe = new TestKit(system); @@ -113,6 +116,21 @@ public class SinkTest extends StreamTest { assertEquals(Arrays.asList(1, 2, 3), out); } + @Test + public void mustBeAbleToUsePreMaterialize() throws Exception { + Pair, Sink> pair = Sink.head().preMaterialize(materializer); + + CompletableFuture future = pair.first().toCompletableFuture(); + assertEquals(false, future.isDone()); // not yet, only once actually source attached + + String element = "element"; + Source.single(element).runWith(pair.second(), materializer); + + String got = future.get(3, TimeUnit.SECONDS);// should complete nicely + assertEquals(element, got); + assertEquals(true, future.isDone()); + } + public void mustSuitablyOverrideAttributeHandlingMethods() { @SuppressWarnings("unused") final Sink> s = diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index d614253232..4e2cc96256 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -40,6 +40,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph") + val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass → graphHelpers, jSourceClass → (graphHelpers ++ Set("watch", "ask")), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 67648ce2af..2f7fd0cbef 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -5,14 +5,18 @@ package akka.stream.scaladsl import java.util import java.util.function -import java.util.function.{ BinaryOperator, BiConsumer, Supplier, ToIntFunction } +import java.util.function.{ BiConsumer, BinaryOperator, Supplier, ToIntFunction } import java.util.stream.Collector.Characteristics import java.util.stream.{ Collector, Collectors } + import akka.stream._ import akka.stream.testkit.Utils._ import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink import akka.testkit.DefaultTimeout +import org.reactivestreams.Publisher import org.scalatest.concurrent.ScalaFutures + import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ @@ -317,4 +321,54 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { matVal.failed.futureValue shouldBe a[AbruptStageTerminationException] } } + + "Sink pre-materialization" must { + "materialize the sink and wrap its exposed publisher in a Source" in { + val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false) + val (matPub, sink) = publisherSink.preMaterialize() + + val probe = Source.fromPublisher(matPub).runWith(TestSink.probe) + probe.expectNoMessage(100.millis) + + Source.single("hello").runWith(sink) + + probe.ensureSubscription() + probe.requestNext("hello") + probe.expectComplete() + } + "materialize the sink and wrap its exposed publisher(fanout) in a Source twice" in { + val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](fanout = true) + val (matPub, sink) = publisherSink.preMaterialize() + + val probe1 = Source.fromPublisher(matPub).runWith(TestSink.probe) + val probe2 = Source.fromPublisher(matPub).runWith(TestSink.probe) + + Source.single("hello").runWith(sink) + + probe1.ensureSubscription() + probe1.requestNext("hello") + probe1.expectComplete() + + probe2.ensureSubscription() + probe2.requestNext("hello") + probe2.expectComplete() + } + "materialize the sink and wrap its exposed publisher(not fanout), should fail the second materialization" in { + val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](fanout = false) + val (matPub, sink) = publisherSink.preMaterialize() + + val probe1 = Source.fromPublisher(matPub).runWith(TestSink.probe) + val probe2 = Source.fromPublisher(matPub).runWith(TestSink.probe) + + Source.single("hello").runWith(sink) + + probe1.ensureSubscription() + probe1.requestNext("hello") + probe1.expectComplete() + + probe2.ensureSubscription() + probe2.expectError().getMessage should include("only supports one subscriber") + } + } + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index a4d9c6cc0d..4dc0e3c69d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -5,7 +5,7 @@ package akka.stream.javadsl import java.util.Optional -import akka.{ Done, NotUsed } +import akka.{ Done, NotUsed, japi } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function @@ -18,6 +18,7 @@ import scala.concurrent.ExecutionContext import scala.util.Try import java.util.concurrent.CompletionStage +import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ /** Java API */ @@ -321,6 +322,17 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterializedValue(f.apply _)) + /** + * Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink + * that can be consume elements 'into' the pre-materialized one. + * + * Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you. + */ + def preMaterialize(materializer: Materializer): japi.Pair[Mat @uncheckedVariance, Sink[In @uncheckedVariance, NotUsed]] = { + val (mat, sink) = delegate.preMaterialize()(materializer) + akka.japi.Pair(mat, sink.asJava) + } + /** * Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index f00da21463..4d5c0661e4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -58,6 +58,17 @@ final class Sink[-In, +Mat]( traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]), shape) + /** + * Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink + * that can be consume elements 'into' the pre-materialized one. + * + * Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you. + */ + def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed]) = { + val (sub, mat) = Source.asSubscriber.toMat(this)(Keep.both).run() + (mat, Sink.fromSubscriber(sub)) + } + /** * Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes