From 721e486fdb83ce69f1f8033ca2ed2a3fd31171ff Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Mon, 11 Sep 2017 16:35:35 +0200 Subject: [PATCH] +str #23392 AbstractGraphStageWithMaterializedValue for Java users (#23394) (no need to use Tuple2, can be Pair) --- .../java/jdocs/stream/GraphStageDocTest.java | 16 +++---- .../scala/akka/stream/stage/GraphStage.scala | 42 +++++++++++++++++-- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java b/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java index 245cc4eed7..08773d2c41 100644 --- a/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java @@ -6,6 +6,7 @@ import akka.actor.ActorSystem; //#imports import akka.dispatch.Futures; import akka.japi.Option; +import akka.japi.Pair; import akka.japi.Predicate; import akka.japi.function.Procedure; import akka.stream.*; @@ -21,13 +22,14 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.reactivestreams.Subscription; -import scala.Tuple2; +import scala.compat.java8.FutureConverters; import scala.concurrent.ExecutionContext; import scala.concurrent.Promise; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -570,7 +572,7 @@ public class GraphStageDocTest extends AbstractJavaTest { //#materialized - public class FirstValue extends GraphStageWithMaterializedValue, CompletionStage> { + public class FirstValue extends AbstractGraphStageWithMaterializedValue, CompletionStage> { public final Inlet in = Inlet.create("FirstValue.in"); public final Outlet out = Outlet.create("FirstValue.out"); @@ -582,8 +584,8 @@ public class GraphStageDocTest extends AbstractJavaTest { } @Override - public Tuple2> createLogicAndMaterializedValue(Attributes inheritedAttributes) { - Promise promise = Futures.promise(); + public Pair> createLogicAndMaterializedValuePair(Attributes inheritedAttributes) { + CompletableFuture promise = new CompletableFuture<>(); GraphStageLogic logic = new GraphStageLogic(shape) { { @@ -591,7 +593,7 @@ public class GraphStageDocTest extends AbstractJavaTest { @Override public void onPush() { A elem = grab(in); - promise.success(elem); + promise.complete(elem); push(out, elem); // replace handler with one just forwarding @@ -612,8 +614,8 @@ public class GraphStageDocTest extends AbstractJavaTest { }); } }; - - return new Tuple2(logic, promise.future()); + + return new Pair<>(logic, promise); } } //#materialized diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 5289ea1f94..eeb6f1aeee 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -24,6 +24,18 @@ import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } import akka.util.OptionVal import akka.annotation.InternalApi +/** + * Scala API: A GraphStage represents a reusable graph stream processing stage. + * + * Extend this `GraphStageWithMaterializedValue` if you want to provide a materialized value, + * represented by the type parameter `M`. If your GraphStage does not need to provide a materialized + * value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value. + * + * A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that + * creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together. + * + * See also [[AbstractGraphStageWithMaterializedValue]] for Java DSL for this stage. + */ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { @throws(classOf[Exception]) @@ -50,9 +62,33 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, } /** - * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes - * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing - * logic that ties the ports together. + * Java API: A GraphStage represents a reusable graph stream processing stage. + * + * Extend this `AbstractGraphStageWithMaterializedValue` if you want to provide a materialized value, + * represented by the type parameter `M`. If your GraphStage does not need to provide a materialized + * value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value. + * + * A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that + * creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together. + * + * See also [[GraphStageWithMaterializedValue]] for Scala DSL for this stage. + */ +abstract class AbstractGraphStageWithMaterializedValue[+S <: Shape, M] extends GraphStageWithMaterializedValue[S, M] { + @throws(classOf[Exception]) + final def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) = { + val pair = createLogicAndMaterializedValuePair(inheritedAttributes) + pair.first → pair.second + } + + @throws(classOf[Exception]) + def createLogicAndMaterializedValuePair(inheritedAttributes: Attributes): akka.japi.Pair[GraphStageLogic, M] +} + +/** + * A GraphStage represents a reusable graph stream processing stage. + * + * A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that + * creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together. */ abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =