diff --git a/akka-stream-tests/src/test/java/akka/stream/stage/JavaIdentityStage.java b/akka-stream-tests/src/test/java/akka/stream/stage/JavaIdentityStage.java new file mode 100644 index 0000000000..e256c0d946 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/stage/JavaIdentityStage.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.stage; + +import akka.stream.Attributes; +import akka.stream.FlowShape; +import akka.stream.Inlet; +import akka.stream.Outlet; + +public class JavaIdentityStage extends GraphStage> { + private Inlet _in = Inlet.create("Identity.in"); + private Outlet _out = Outlet.create("Identity.out"); + private FlowShape _shape = FlowShape.of(_in, _out); + + public Inlet in() { return _in; } + public Outlet out() { return _out; } + + + @Override + public GraphStageLogic createLogic(Attributes inheritedAttributes) { + return new GraphStageLogic(shape()) { + + { + setHandler(in(), new AbstractInHandler() { + @Override + public void onPush() { + push(out(), grab(in())); + } + + }); + + setHandler(out(), new AbstractOutHandler() { + @Override + public void onPull() { + pull(in()); + } + }); + + } + + }; + } + + @Override + public FlowShape shape() { + return _shape; + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java b/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java new file mode 100644 index 0000000000..ad05c98c85 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.stage; + +import akka.stream.StreamTest; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.testkit.AkkaSpec; + +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Arrays; +import java.util.List; + +public class StageTest extends StreamTest { + public StageTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", + AkkaSpec.testConf()); + + @Test + public void javaStageUsage() throws Exception { + final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); + final Source ints = Source.from(input); + final JavaIdentityStage identity = new JavaIdentityStage(); + + final Future> result = + ints + .via(identity) + .via(identity) + .grouped(1000) + .runWith(Sink.>head(), materializer); + + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), Await.result(result, Duration.create(3, "seconds"))); + } +} diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 456c8eef41..a1a983baa5 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -45,7 +45,21 @@ sealed abstract class OutPort { self: Outlet[_] ⇒ * express the internal structural hierarchy of stream topologies). */ object Inlet { - def apply[T](toString: String): Inlet[T] = new Inlet[T](toString) + /** + * Scala API + * + * Creates a new Inlet with the given name. The name will be used when + * displaying debug information or error messages involving the port. + */ + def apply[T](name: String): Inlet[T] = new Inlet[T](name) + + /** + * JAVA API + * + * Creates a new Inlet with the given name. The name will be used when + * displaying debug information or error messages involving the port. + */ + def create[T](name: String): Inlet[T] = Inlet(name) } final class Inlet[T] private (override val toString: String) extends InPort { @@ -62,7 +76,22 @@ final class Inlet[T] private (override val toString: String) extends InPort { * express the internal structural hierarchy of stream topologies). */ object Outlet { - def apply[T](toString: String): Outlet[T] = new Outlet[T](toString) + + /** + * Scala API + * + * Creates a new Outlet with the given name. The name will be used when + * displaying debug information or error messages involving the port. + */ + def apply[T](name: String): Outlet[T] = new Outlet[T](name) + + /** + * JAVA API + * + * Creates a new Outlet with the given name. The name will be used when + * displaying debug information or error messages involving the port. + */ + def create[T](name: String): Outlet[T] = Outlet(name) } final class Outlet[T] private (override val toString: String) extends OutPort { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index f03efe3c4a..b8e32a957f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -222,6 +222,38 @@ private[stream] object GraphInterpreter { assembly } } + + /** + * INTERNAL API + */ + private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] { + /* + * Using an Object-array avoids holding on to the GraphInterpreter class + * when this accidentally leaks onto threads that are not stopped when this + * class should be unloaded. + */ + override def initialValue = new Array(1) + } + + /** + * INTERNAL API + */ + private[stream] def currentInterpreter: GraphInterpreter = + _currentInterpreter.get()(0).asInstanceOf[GraphInterpreter].nonNull + // nonNull is just a debug helper to find nulls more timely + + /** + * INTERNAL API + */ + private[stream] def currentInterpreterOrNull: GraphInterpreter = + _currentInterpreter.get()(0).asInstanceOf[GraphInterpreter] + + /** + * INTERNAL API + */ + private[stream] def setCurrentInterpreter(gi: GraphInterpreter) = + _currentInterpreter.get()(0) = gi + } /** @@ -317,7 +349,10 @@ private[stream] final class GraphInterpreter( // of the class for a full description. val portStates = Array.fill[Int](assembly.connectionCount)(InReady) - private[this] var activeStage: GraphStageLogic = _ + /** + * INTERNAL API + */ + private[stream] var activeStage: GraphStageLogic = _ // The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be // completed @@ -350,6 +385,11 @@ private[stream] final class GraphInterpreter( _Name } else _Name + /** + * INTERNAL API + */ + private[stream] def nonNull: GraphInterpreter = this + /** * Assign the boundary logic to a given connection. This will serve as the interface to the external world * (outside the interpreter) to process and inject events. @@ -462,18 +502,23 @@ private[stream] final class GraphInterpreter( */ def execute(eventLimit: Int): Unit = { if (Debug) println(s"$Name ---------------- EXECUTE (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") - var eventsRemaining = eventLimit - var connection = dequeue() - while (eventsRemaining > 0 && connection != NoEvent) { - try processEvent(connection) - catch { - case NonFatal(e) ⇒ - if (activeStage == null) throw e - else activeStage.failStage(e) + val previousInterpreter = currentInterpreterOrNull + setCurrentInterpreter(this) + try { + var eventsRemaining = eventLimit + while (eventsRemaining > 0 && queueTail != queueHead) { + val connection = dequeue() + try processEvent(connection) + catch { + case NonFatal(e) ⇒ + if (activeStage == null) throw e + else activeStage.failStage(e) + } + afterStageHasRun(activeStage) + eventsRemaining -= 1 } - afterStageHasRun(activeStage) - eventsRemaining -= 1 - if (eventsRemaining > 0) connection = dequeue() + } finally { + setCurrentInterpreter(previousInterpreter) } if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") // TODO: deadlock detection diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 10f59bab37..e1451ffeb2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -32,6 +32,8 @@ private[akka] object IteratorInterpreter { } else push(out, elem) } } + + override def onDownstreamFinish(): Unit = completeStage() }) override def toString = "IteratorUpstream" diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 2ff8644f0c..7d62676c3a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -76,7 +76,7 @@ object GraphStageLogic { * Input handler that terminates the stage upon receiving completion. * The stage fails upon receiving a failure. */ - class EagerTerminateInput extends InHandler { + object EagerTerminateInput extends InHandler { override def onPush(): Unit = () } @@ -84,7 +84,7 @@ object GraphStageLogic { * Input handler that does not terminate the stage upon receiving completion. * The stage fails upon receiving a failure. */ - class IgnoreTerminateInput extends InHandler { + object IgnoreTerminateInput extends InHandler { override def onPush(): Unit = () override def onUpstreamFinish(): Unit = () } @@ -95,14 +95,15 @@ object GraphStageLogic { */ class ConditionalTerminateInput(predicate: () ⇒ Boolean) extends InHandler { override def onPush(): Unit = () - override def onUpstreamFinish(): Unit = if (predicate()) inOwnerStageLogic.completeStage() + override def onUpstreamFinish(): Unit = + if (predicate()) GraphInterpreter.currentInterpreter.activeStage.completeStage() } /** * Input handler that does not terminate the stage upon receiving completion * nor failure. */ - class TotallyIgnorantInput extends InHandler { + object TotallyIgnorantInput extends InHandler { override def onPush(): Unit = () override def onUpstreamFinish(): Unit = () override def onUpstreamFailure(ex: Throwable): Unit = () @@ -111,14 +112,14 @@ object GraphStageLogic { /** * Output handler that terminates the stage upon cancellation. */ - class EagerTerminateOutput extends OutHandler { + object EagerTerminateOutput extends OutHandler { override def onPull(): Unit = () } /** * Output handler that does not terminate the stage upon cancellation. */ - class IgnoreTerminateOutput extends OutHandler { + object IgnoreTerminateOutput extends OutHandler { override def onPull(): Unit = () override def onDownstreamFinish(): Unit = () } @@ -129,7 +130,8 @@ object GraphStageLogic { */ class ConditionalTerminateOutput(predicate: () ⇒ Boolean) extends OutHandler { override def onPull(): Unit = () - override def onDownstreamFinish(): Unit = if (predicate()) outOwnerStageLogic.completeStage() + override def onDownstreamFinish(): Unit = + if (predicate()) GraphInterpreter.currentInterpreter.activeStage.completeStage() } private object DoNothing extends (() ⇒ Unit) { @@ -194,12 +196,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Input handler that terminates the stage upon receiving completion. * The stage fails upon receiving a failure. */ - final protected def eagerTerminateInput: InHandler = new EagerTerminateInput + final protected def eagerTerminateInput: InHandler = EagerTerminateInput /** * Input handler that does not terminate the stage upon receiving completion. * The stage fails upon receiving a failure. */ - final protected def ignoreTerminateInput: InHandler = new IgnoreTerminateInput + final protected def ignoreTerminateInput: InHandler = IgnoreTerminateInput /** * Input handler that terminates the state upon receiving completion if the * given condition holds at that time. The stage fails upon receiving a failure. @@ -209,15 +211,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Input handler that does not terminate the stage upon receiving completion * nor failure. */ - final protected def totallyIgnorantInput: InHandler = new TotallyIgnorantInput + final protected def totallyIgnorantInput: InHandler = TotallyIgnorantInput /** * Output handler that terminates the stage upon cancellation. */ - final protected def eagerTerminateOutput: OutHandler = new EagerTerminateOutput + final protected def eagerTerminateOutput: OutHandler = EagerTerminateOutput /** * Output handler that does not terminate the stage upon cancellation. */ - final protected def ignoreTerminateOutput: OutHandler = new IgnoreTerminateOutput + final protected def ignoreTerminateOutput: OutHandler = IgnoreTerminateOutput /** * Output handler that terminates the state upon receiving completion if the * given condition holds at that time. The stage fails upon receiving a failure. @@ -228,7 +230,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Assigns callbacks for the events for an [[Inlet]] */ final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = { - handler.inOwnerStageLogic = this handlers(in.id) = handler if (_interpreter != null) _interpreter.setHandler(conn(in), handler) } @@ -244,7 +245,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Assigns callbacks for the events for an [[Outlet]] */ final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = { - handler.outOwnerStageLogic = this handlers(out.id + inCount) = handler if (_interpreter != null) _interpreter.setHandler(conn(out), handler) } @@ -834,11 +834,6 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap * Collection of callbacks for an input port of a [[GraphStage]] */ trait InHandler { - /** - * INTERNAL API - */ - private[stream] var inOwnerStageLogic: GraphStageLogic = _ - /** * Called when the input port has a new element available. The actual element can be retrieved via the * [[GraphStageLogic.grab()]] method. @@ -848,23 +843,18 @@ trait InHandler { /** * Called when the input port is finished. After this callback no other callbacks will be called for this port. */ - def onUpstreamFinish(): Unit = inOwnerStageLogic.completeStage() + def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage() /** * Called when the input port has failed. After this callback no other callbacks will be called for this port. */ - def onUpstreamFailure(ex: Throwable): Unit = inOwnerStageLogic.failStage(ex) + def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex) } /** * Collection of callbacks for an output port of a [[GraphStage]] */ trait OutHandler { - /** - * INTERNAL API - */ - private[stream] var outOwnerStageLogic: GraphStageLogic = _ - /** * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] * is now allowed to be called on this port. @@ -875,5 +865,28 @@ trait OutHandler { * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ - def onDownstreamFinish(): Unit = outOwnerStageLogic.completeStage() -} \ No newline at end of file + def onDownstreamFinish(): Unit = { + GraphInterpreter + .currentInterpreter + .activeStage + .completeStage() + } +} + +/** + * Java API: callbacks for an input port where termination logic is predefined + * (completing when upstream completes, failing when upstream fails). + */ +abstract class AbstractInHandler extends InHandler + +/** + * Java API: callbacks for an output port where termination logic is predefined + * (completing when downstream cancels). + */ +abstract class AbstractOutHandler extends OutHandler + +/** + * Java API: callback combination for output and input ports where termination logic is predefined + * (completing when upstream completes, failing when upstream fails, completing when downstream cancels). + */ +abstract class AbstractInOutHandler extends InHandler with OutHandler