From b9cb6d1329fc9a46dbdd9725222675daff8a2ff8 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Thu, 7 Jan 2016 03:07:31 +0100 Subject: [PATCH] =str include port name when require fails in GraphStage --- .../scala/akka/stream/stage/GraphStage.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index a4f328ca79..7f3eefb0e8 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -416,8 +416,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: interpreter.pull(conn(in)) } else { // Detailed error information should not add overhead to the hot path - require(!isClosed(in), "Cannot pull closed port") - require(!hasBeenPulled(in), "Cannot pull port twice") + require(!isClosed(in), s"Cannot pull closed port ($in)") + require(!hasBeenPulled(in), s"Cannot pull port ($in) twice") } } @@ -451,7 +451,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: elem.asInstanceOf[T] } else { // Slow path - require(isAvailable(in), "Cannot get element from already empty input port") + require(isAvailable(in), s"Cannot get element from already empty input port ($in)") val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed] val elem = failed.previousElem.asInstanceOf[T] interpreter.connectionSlots(connection) = Failed(failed.ex, Empty) @@ -505,8 +505,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } else { // Detailed error information should not add overhead to the hot path ReactiveStreamsCompliance.requireNonNullElement(elem) - require(isAvailable(out), "Cannot push port twice") - require(!isClosed(out), "Cannot pull closed port") + require(isAvailable(out), s"Cannot push port ($out) twice") + require(!isClosed(out), s"Cannot pull closed port ($out)") } } @@ -1053,15 +1053,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: def hasBeenPulled: Boolean = pulled && !isClosed def grab(): T = { - require(elem != null, "cannot grab element from port when data have not yet arrived") + require(elem != null, s"cannot grab element from port ($this) when data have not yet arrived") val ret = elem elem = null.asInstanceOf[T] ret } def pull(): Unit = { - require(!pulled, "cannot pull port twice") - require(!closed, "cannot pull closed port") + require(!pulled, s"cannot pull port ($this) twice") + require(!closed, s"cannot pull closed port ($this) ") pulled = true _sink.pullSubstream() } @@ -1070,6 +1070,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: closed = true _sink.cancelSubstream() } + + override def toString = s"SubSinkInlet($name)" } /** @@ -1160,6 +1162,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: closed = true _source.failSubstream(ex) } + + override def toString = s"SubSourceOutlet($name)" } }