=str #18817 add Java API for GraphStage In/OutHandler

This commit is contained in:
Roland Kuhn 2015-11-04 15:10:20 +00:00 committed by Endre Sándor Varga
parent 2f1ef278a5
commit 0990d735a0
6 changed files with 227 additions and 42 deletions

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.stage;
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
public class JavaIdentityStage<T> extends GraphStage<FlowShape<T, T>> {
private Inlet<T> _in = Inlet.create("Identity.in");
private Outlet<T> _out = Outlet.create("Identity.out");
private FlowShape<T, T> _shape = FlowShape.of(_in, _out);
public Inlet<T> in() { return _in; }
public Outlet<T> out() { return _out; }
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape()) {
{
setHandler(in(), new AbstractInHandler() {
@Override
public void onPush() {
push(out(), grab(in()));
}
});
setHandler(out(), new AbstractOutHandler() {
@Override
public void onPull() {
pull(in());
}
});
}
};
}
@Override
public FlowShape<T, T> shape() {
return _shape;
}
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.stage;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.Arrays;
import java.util.List;
public class StageTest extends StreamTest {
public StageTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf());
@Test
public void javaStageUsage() throws Exception {
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5);
final Source<Integer, ?> ints = Source.from(input);
final JavaIdentityStage<Integer> identity = new JavaIdentityStage<Integer>();
final Future<List<Integer>> result =
ints
.via(identity)
.via(identity)
.grouped(1000)
.runWith(Sink.<List<Integer>>head(), materializer);
assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), Await.result(result, Duration.create(3, "seconds")));
}
}

View file

@ -45,7 +45,21 @@ sealed abstract class OutPort { self: Outlet[_] ⇒
* express the internal structural hierarchy of stream topologies).
*/
object Inlet {
def apply[T](toString: String): Inlet[T] = new Inlet[T](toString)
/**
* Scala API
*
* Creates a new Inlet with the given name. The name will be used when
* displaying debug information or error messages involving the port.
*/
def apply[T](name: String): Inlet[T] = new Inlet[T](name)
/**
* JAVA API
*
* Creates a new Inlet with the given name. The name will be used when
* displaying debug information or error messages involving the port.
*/
def create[T](name: String): Inlet[T] = Inlet(name)
}
final class Inlet[T] private (override val toString: String) extends InPort {
@ -62,7 +76,22 @@ final class Inlet[T] private (override val toString: String) extends InPort {
* express the internal structural hierarchy of stream topologies).
*/
object Outlet {
def apply[T](toString: String): Outlet[T] = new Outlet[T](toString)
/**
* Scala API
*
* Creates a new Outlet with the given name. The name will be used when
* displaying debug information or error messages involving the port.
*/
def apply[T](name: String): Outlet[T] = new Outlet[T](name)
/**
* JAVA API
*
* Creates a new Outlet with the given name. The name will be used when
* displaying debug information or error messages involving the port.
*/
def create[T](name: String): Outlet[T] = Outlet(name)
}
final class Outlet[T] private (override val toString: String) extends OutPort {

View file

@ -222,6 +222,38 @@ private[stream] object GraphInterpreter {
assembly
}
}
/**
* INTERNAL API
*/
private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] {
/*
* Using an Object-array avoids holding on to the GraphInterpreter class
* when this accidentally leaks onto threads that are not stopped when this
* class should be unloaded.
*/
override def initialValue = new Array(1)
}
/**
* INTERNAL API
*/
private[stream] def currentInterpreter: GraphInterpreter =
_currentInterpreter.get()(0).asInstanceOf[GraphInterpreter].nonNull
// nonNull is just a debug helper to find nulls more timely
/**
* INTERNAL API
*/
private[stream] def currentInterpreterOrNull: GraphInterpreter =
_currentInterpreter.get()(0).asInstanceOf[GraphInterpreter]
/**
* INTERNAL API
*/
private[stream] def setCurrentInterpreter(gi: GraphInterpreter) =
_currentInterpreter.get()(0) = gi
}
/**
@ -317,7 +349,10 @@ private[stream] final class GraphInterpreter(
// of the class for a full description.
val portStates = Array.fill[Int](assembly.connectionCount)(InReady)
private[this] var activeStage: GraphStageLogic = _
/**
* INTERNAL API
*/
private[stream] var activeStage: GraphStageLogic = _
// The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be
// completed
@ -350,6 +385,11 @@ private[stream] final class GraphInterpreter(
_Name
} else _Name
/**
* INTERNAL API
*/
private[stream] def nonNull: GraphInterpreter = this
/**
* Assign the boundary logic to a given connection. This will serve as the interface to the external world
* (outside the interpreter) to process and inject events.
@ -462,9 +502,12 @@ private[stream] final class GraphInterpreter(
*/
def execute(eventLimit: Int): Unit = {
if (Debug) println(s"$Name ---------------- EXECUTE (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})")
val previousInterpreter = currentInterpreterOrNull
setCurrentInterpreter(this)
try {
var eventsRemaining = eventLimit
var connection = dequeue()
while (eventsRemaining > 0 && connection != NoEvent) {
while (eventsRemaining > 0 && queueTail != queueHead) {
val connection = dequeue()
try processEvent(connection)
catch {
case NonFatal(e)
@ -473,7 +516,9 @@ private[stream] final class GraphInterpreter(
}
afterStageHasRun(activeStage)
eventsRemaining -= 1
if (eventsRemaining > 0) connection = dequeue()
}
} finally {
setCurrentInterpreter(previousInterpreter)
}
if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})")
// TODO: deadlock detection

View file

@ -32,6 +32,8 @@ private[akka] object IteratorInterpreter {
} else push(out, elem)
}
}
override def onDownstreamFinish(): Unit = completeStage()
})
override def toString = "IteratorUpstream"

View file

@ -76,7 +76,7 @@ object GraphStageLogic {
* Input handler that terminates the stage upon receiving completion.
* The stage fails upon receiving a failure.
*/
class EagerTerminateInput extends InHandler {
object EagerTerminateInput extends InHandler {
override def onPush(): Unit = ()
}
@ -84,7 +84,7 @@ object GraphStageLogic {
* Input handler that does not terminate the stage upon receiving completion.
* The stage fails upon receiving a failure.
*/
class IgnoreTerminateInput extends InHandler {
object IgnoreTerminateInput extends InHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit = ()
}
@ -95,14 +95,15 @@ object GraphStageLogic {
*/
class ConditionalTerminateInput(predicate: () Boolean) extends InHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit = if (predicate()) inOwnerStageLogic.completeStage()
override def onUpstreamFinish(): Unit =
if (predicate()) GraphInterpreter.currentInterpreter.activeStage.completeStage()
}
/**
* Input handler that does not terminate the stage upon receiving completion
* nor failure.
*/
class TotallyIgnorantInput extends InHandler {
object TotallyIgnorantInput extends InHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit = ()
override def onUpstreamFailure(ex: Throwable): Unit = ()
@ -111,14 +112,14 @@ object GraphStageLogic {
/**
* Output handler that terminates the stage upon cancellation.
*/
class EagerTerminateOutput extends OutHandler {
object EagerTerminateOutput extends OutHandler {
override def onPull(): Unit = ()
}
/**
* Output handler that does not terminate the stage upon cancellation.
*/
class IgnoreTerminateOutput extends OutHandler {
object IgnoreTerminateOutput extends OutHandler {
override def onPull(): Unit = ()
override def onDownstreamFinish(): Unit = ()
}
@ -129,7 +130,8 @@ object GraphStageLogic {
*/
class ConditionalTerminateOutput(predicate: () Boolean) extends OutHandler {
override def onPull(): Unit = ()
override def onDownstreamFinish(): Unit = if (predicate()) outOwnerStageLogic.completeStage()
override def onDownstreamFinish(): Unit =
if (predicate()) GraphInterpreter.currentInterpreter.activeStage.completeStage()
}
private object DoNothing extends (() Unit) {
@ -194,12 +196,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Input handler that terminates the stage upon receiving completion.
* The stage fails upon receiving a failure.
*/
final protected def eagerTerminateInput: InHandler = new EagerTerminateInput
final protected def eagerTerminateInput: InHandler = EagerTerminateInput
/**
* Input handler that does not terminate the stage upon receiving completion.
* The stage fails upon receiving a failure.
*/
final protected def ignoreTerminateInput: InHandler = new IgnoreTerminateInput
final protected def ignoreTerminateInput: InHandler = IgnoreTerminateInput
/**
* Input handler that terminates the state upon receiving completion if the
* given condition holds at that time. The stage fails upon receiving a failure.
@ -209,15 +211,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Input handler that does not terminate the stage upon receiving completion
* nor failure.
*/
final protected def totallyIgnorantInput: InHandler = new TotallyIgnorantInput
final protected def totallyIgnorantInput: InHandler = TotallyIgnorantInput
/**
* Output handler that terminates the stage upon cancellation.
*/
final protected def eagerTerminateOutput: OutHandler = new EagerTerminateOutput
final protected def eagerTerminateOutput: OutHandler = EagerTerminateOutput
/**
* Output handler that does not terminate the stage upon cancellation.
*/
final protected def ignoreTerminateOutput: OutHandler = new IgnoreTerminateOutput
final protected def ignoreTerminateOutput: OutHandler = IgnoreTerminateOutput
/**
* Output handler that terminates the state upon receiving completion if the
* given condition holds at that time. The stage fails upon receiving a failure.
@ -228,7 +230,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Assigns callbacks for the events for an [[Inlet]]
*/
final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
handler.inOwnerStageLogic = this
handlers(in.id) = handler
if (_interpreter != null) _interpreter.setHandler(conn(in), handler)
}
@ -244,7 +245,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Assigns callbacks for the events for an [[Outlet]]
*/
final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
handler.outOwnerStageLogic = this
handlers(out.id + inCount) = handler
if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
}
@ -834,11 +834,6 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
* Collection of callbacks for an input port of a [[GraphStage]]
*/
trait InHandler {
/**
* INTERNAL API
*/
private[stream] var inOwnerStageLogic: GraphStageLogic = _
/**
* Called when the input port has a new element available. The actual element can be retrieved via the
* [[GraphStageLogic.grab()]] method.
@ -848,23 +843,18 @@ trait InHandler {
/**
* Called when the input port is finished. After this callback no other callbacks will be called for this port.
*/
def onUpstreamFinish(): Unit = inOwnerStageLogic.completeStage()
def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage()
/**
* Called when the input port has failed. After this callback no other callbacks will be called for this port.
*/
def onUpstreamFailure(ex: Throwable): Unit = inOwnerStageLogic.failStage(ex)
def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex)
}
/**
* Collection of callbacks for an output port of a [[GraphStage]]
*/
trait OutHandler {
/**
* INTERNAL API
*/
private[stream] var outOwnerStageLogic: GraphStageLogic = _
/**
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
* is now allowed to be called on this port.
@ -875,5 +865,28 @@ trait OutHandler {
* Called when the output port will no longer accept any new elements. After this callback no other callbacks will
* be called for this port.
*/
def onDownstreamFinish(): Unit = outOwnerStageLogic.completeStage()
def onDownstreamFinish(): Unit = {
GraphInterpreter
.currentInterpreter
.activeStage
.completeStage()
}
}
/**
* Java API: callbacks for an input port where termination logic is predefined
* (completing when upstream completes, failing when upstream fails).
*/
abstract class AbstractInHandler extends InHandler
/**
* Java API: callbacks for an output port where termination logic is predefined
* (completing when downstream cancels).
*/
abstract class AbstractOutHandler extends OutHandler
/**
* Java API: callback combination for output and input ports where termination logic is predefined
* (completing when upstream completes, failing when upstream fails, completing when downstream cancels).
*/
abstract class AbstractInOutHandler extends InHandler with OutHandler