parity for Sink, for preMaterialize operation (#24657)

* +str #24656 parity for Sink, for preMaterialize operation

* add java test

* added docs to stages-overview
This commit is contained in:
Konrad `ktoso` Malawski 2018-03-08 13:02:17 +09:00 committed by GitHub
parent 5a59a7b362
commit 5be3c7bf83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 4 deletions

View file

@ -520,6 +520,14 @@ Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a
---------------------------------------------------------------
### preMaterialize
Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.
Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.
---------------------------------------------------------------
<br/>
## Additional Sink and Source converters

View file

@ -8,11 +8,14 @@ import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import akka.Done;
import akka.NotUsed;
import akka.japi.Pair;
import akka.japi.function.Function;
import akka.stream.*;
import akka.testkit.javadsl.TestKit;
@ -43,7 +46,7 @@ public class SinkTest extends StreamTest {
@SuppressWarnings("unused")
final Publisher<Object> publisher = Source.from(new ArrayList<Object>()).runWith(pubSink, materializer);
}
@Test
public void mustBeAbleToUseFuture() throws Exception {
final Sink<Integer, CompletionStage<Integer>> futSink = Sink.head();
@ -58,7 +61,7 @@ public class SinkTest extends StreamTest {
@SuppressWarnings("unused")
CompletionStage<Integer> integerFuture = Source.from(new ArrayList<Integer>()).runWith(foldSink, materializer);
}
@Test
public void mustBeAbleToUseActorRefSink() throws Exception {
final TestKit probe = new TestKit(system);
@ -113,6 +116,21 @@ public class SinkTest extends StreamTest {
assertEquals(Arrays.asList(1, 2, 3), out);
}
@Test
public void mustBeAbleToUsePreMaterialize() throws Exception {
Pair<CompletionStage<String>, Sink<String, NotUsed>> pair = Sink.<String>head().preMaterialize(materializer);
CompletableFuture<String> future = pair.first().toCompletableFuture();
assertEquals(false, future.isDone()); // not yet, only once actually source attached
String element = "element";
Source.single(element).runWith(pair.second(), materializer);
String got = future.get(3, TimeUnit.SECONDS);// should complete nicely
assertEquals(element, got);
assertEquals(true, future.isDone());
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Sink<Integer, CompletionStage<Integer>> s =

View file

@ -40,6 +40,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph")
val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass graphHelpers,
jSourceClass (graphHelpers ++ Set("watch", "ask")),

View file

@ -5,14 +5,18 @@ package akka.stream.scaladsl
import java.util
import java.util.function
import java.util.function.{ BinaryOperator, BiConsumer, Supplier, ToIntFunction }
import java.util.function.{ BiConsumer, BinaryOperator, Supplier, ToIntFunction }
import java.util.stream.Collector.Characteristics
import java.util.stream.{ Collector, Collectors }
import akka.stream._
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.DefaultTimeout
import org.reactivestreams.Publisher
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
@ -317,4 +321,54 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
matVal.failed.futureValue shouldBe a[AbruptStageTerminationException]
}
}
"Sink pre-materialization" must {
"materialize the sink and wrap its exposed publisher in a Source" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)
val (matPub, sink) = publisherSink.preMaterialize()
val probe = Source.fromPublisher(matPub).runWith(TestSink.probe)
probe.expectNoMessage(100.millis)
Source.single("hello").runWith(sink)
probe.ensureSubscription()
probe.requestNext("hello")
probe.expectComplete()
}
"materialize the sink and wrap its exposed publisher(fanout) in a Source twice" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](fanout = true)
val (matPub, sink) = publisherSink.preMaterialize()
val probe1 = Source.fromPublisher(matPub).runWith(TestSink.probe)
val probe2 = Source.fromPublisher(matPub).runWith(TestSink.probe)
Source.single("hello").runWith(sink)
probe1.ensureSubscription()
probe1.requestNext("hello")
probe1.expectComplete()
probe2.ensureSubscription()
probe2.requestNext("hello")
probe2.expectComplete()
}
"materialize the sink and wrap its exposed publisher(not fanout), should fail the second materialization" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](fanout = false)
val (matPub, sink) = publisherSink.preMaterialize()
val probe1 = Source.fromPublisher(matPub).runWith(TestSink.probe)
val probe2 = Source.fromPublisher(matPub).runWith(TestSink.probe)
Source.single("hello").runWith(sink)
probe1.ensureSubscription()
probe1.requestNext("hello")
probe1.expectComplete()
probe2.ensureSubscription()
probe2.expectError().getMessage should include("only supports one subscriber")
}
}
}

View file

@ -5,7 +5,7 @@ package akka.stream.javadsl
import java.util.Optional
import akka.{ Done, NotUsed }
import akka.{ Done, NotUsed, japi }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.japi.function
@ -18,6 +18,7 @@ import scala.concurrent.ExecutionContext
import scala.util.Try
import java.util.concurrent.CompletionStage
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
/** Java API */
@ -321,6 +322,17 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterializedValue(f.apply _))
/**
* Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink
* that can be consume elements 'into' the pre-materialized one.
*
* Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.
*/
def preMaterialize(materializer: Materializer): japi.Pair[Mat @uncheckedVariance, Sink[In @uncheckedVariance, NotUsed]] = {
val (mat, sink) = delegate.preMaterialize()(materializer)
akka.japi.Pair(mat, sink.asJava)
}
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes

View file

@ -58,6 +58,17 @@ final class Sink[-In, +Mat](
traversalBuilder.transformMat(f.asInstanceOf[Any Any]),
shape)
/**
* Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink
* that can be consume elements 'into' the pre-materialized one.
*
* Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.
*/
def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed]) = {
val (sub, mat) = Source.asSubscriber.toMat(this)(Keep.both).run()
(mat, Sink.fromSubscriber(sub))
}
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes