diff --git a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java index 9d3425c5e6..3bdca026eb 100644 --- a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java @@ -80,7 +80,7 @@ public class GraphStageDocTest { { setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { push(out, counter); counter += 1; } @@ -140,17 +140,13 @@ public class GraphStageDocTest { { setHandler(in, new AbstractInHandler() { @Override - public void onPush() { - try { - push(out, f.apply(grab(in))); - } catch (Exception ex) { - failStage(ex); - } + public void onPush() throws Exception { + push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { pull(in); } }); @@ -217,7 +213,7 @@ public class GraphStageDocTest { setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { pull(in); } }); @@ -282,7 +278,7 @@ public class GraphStageDocTest { setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { if (lastElem.isDefined()) { push(out, lastElem.get()); lastElem = Option.none(); @@ -342,7 +338,7 @@ public class GraphStageDocTest { setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { pull(in); } }); @@ -417,7 +413,7 @@ public class GraphStageDocTest { }); setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { pull(in); } }); @@ -492,7 +488,7 @@ public class GraphStageDocTest { { setHandler(in, new AbstractInHandler() { @Override - public void onPush() { + public void onPush() throws Exception { A elem = grab(in); if (open) pull(in); else { @@ -504,7 +500,7 @@ public class GraphStageDocTest { }); setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { pull(in); } }); @@ -570,7 +566,7 @@ public class GraphStageDocTest { setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { pull(in); } }); @@ -655,7 +651,7 @@ public class GraphStageDocTest { setHandler(out, new AbstractOutHandler() { @Override - public void onPull() { + public void onPull() throws Exception { if (buffer.isEmpty()) { downstreamWaiting = true; } else { diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index ef1b1fd805..bc375372fe 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -1211,16 +1211,19 @@ trait InHandler { * Called when the input port has a new element available. The actual element can be retrieved via the * [[GraphStageLogic.grab()]] method. */ + @throws(classOf[Exception]) def onPush(): Unit /** * Called when the input port is finished. After this callback no other callbacks will be called for this port. */ + @throws(classOf[Exception]) def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage() /** * Called when the input port has failed. After this callback no other callbacks will be called for this port. */ + @throws(classOf[Exception]) def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex) } @@ -1232,12 +1235,14 @@ trait OutHandler { * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] * is now allowed to be called on this port. */ + @throws(classOf[Exception]) def onPull(): Unit /** * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ + @throws(classOf[Exception]) def onDownstreamFinish(): Unit = { GraphInterpreter .currentInterpreter