(no need to use Tuple2, can be Pair)
This commit is contained in:
parent
e6da282f7f
commit
721e486fdb
2 changed files with 48 additions and 10 deletions
|
|
@ -6,6 +6,7 @@ import akka.actor.ActorSystem;
|
|||
//#imports
|
||||
import akka.dispatch.Futures;
|
||||
import akka.japi.Option;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.Predicate;
|
||||
import akka.japi.function.Procedure;
|
||||
import akka.stream.*;
|
||||
|
|
@ -21,13 +22,14 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Subscription;
|
||||
import scala.Tuple2;
|
||||
import scala.compat.java8.FutureConverters;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Promise;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -570,7 +572,7 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
|||
|
||||
|
||||
//#materialized
|
||||
public class FirstValue<A> extends GraphStageWithMaterializedValue<FlowShape<A, A>, CompletionStage<A>> {
|
||||
public class FirstValue<A> extends AbstractGraphStageWithMaterializedValue<FlowShape<A, A>, CompletionStage<A>> {
|
||||
|
||||
public final Inlet<A> in = Inlet.create("FirstValue.in");
|
||||
public final Outlet<A> out = Outlet.create("FirstValue.out");
|
||||
|
|
@ -582,8 +584,8 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Tuple2<GraphStageLogic, CompletionStage<A>> createLogicAndMaterializedValue(Attributes inheritedAttributes) {
|
||||
Promise<A> promise = Futures.promise();
|
||||
public Pair<GraphStageLogic, CompletionStage<A>> createLogicAndMaterializedValuePair(Attributes inheritedAttributes) {
|
||||
CompletableFuture<A> promise = new CompletableFuture<>();
|
||||
|
||||
GraphStageLogic logic = new GraphStageLogic(shape) {
|
||||
{
|
||||
|
|
@ -591,7 +593,7 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
|||
@Override
|
||||
public void onPush() {
|
||||
A elem = grab(in);
|
||||
promise.success(elem);
|
||||
promise.complete(elem);
|
||||
push(out, elem);
|
||||
|
||||
// replace handler with one just forwarding
|
||||
|
|
@ -612,8 +614,8 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
|||
});
|
||||
}
|
||||
};
|
||||
|
||||
return new Tuple2(logic, promise.future());
|
||||
|
||||
return new Pair<>(logic, promise);
|
||||
}
|
||||
}
|
||||
//#materialized
|
||||
|
|
|
|||
|
|
@ -24,6 +24,18 @@ import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes }
|
|||
import akka.util.OptionVal
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
* Scala API: A GraphStage represents a reusable graph stream processing stage.
|
||||
*
|
||||
* Extend this `GraphStageWithMaterializedValue` if you want to provide a materialized value,
|
||||
* represented by the type parameter `M`. If your GraphStage does not need to provide a materialized
|
||||
* value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value.
|
||||
*
|
||||
* A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that
|
||||
* creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together.
|
||||
*
|
||||
* See also [[AbstractGraphStageWithMaterializedValue]] for Java DSL for this stage.
|
||||
*/
|
||||
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {
|
||||
|
||||
@throws(classOf[Exception])
|
||||
|
|
@ -50,9 +62,33 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S,
|
|||
}
|
||||
|
||||
/**
|
||||
* A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
|
||||
* its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
|
||||
* logic that ties the ports together.
|
||||
* Java API: A GraphStage represents a reusable graph stream processing stage.
|
||||
*
|
||||
* Extend this `AbstractGraphStageWithMaterializedValue` if you want to provide a materialized value,
|
||||
* represented by the type parameter `M`. If your GraphStage does not need to provide a materialized
|
||||
* value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value.
|
||||
*
|
||||
* A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that
|
||||
* creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together.
|
||||
*
|
||||
* See also [[GraphStageWithMaterializedValue]] for Scala DSL for this stage.
|
||||
*/
|
||||
abstract class AbstractGraphStageWithMaterializedValue[+S <: Shape, M] extends GraphStageWithMaterializedValue[S, M] {
|
||||
@throws(classOf[Exception])
|
||||
final def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) = {
|
||||
val pair = createLogicAndMaterializedValuePair(inheritedAttributes)
|
||||
pair.first → pair.second
|
||||
}
|
||||
|
||||
@throws(classOf[Exception])
|
||||
def createLogicAndMaterializedValuePair(inheritedAttributes: Attributes): akka.japi.Pair[GraphStageLogic, M]
|
||||
}
|
||||
|
||||
/**
|
||||
* A GraphStage represents a reusable graph stream processing stage.
|
||||
*
|
||||
* A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that
|
||||
* creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together.
|
||||
*/
|
||||
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
|
||||
final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue