=str include port name when require fails in GraphStage
This commit is contained in:
parent
b4f367e46a
commit
b9cb6d1329
1 changed files with 12 additions and 8 deletions
|
|
@ -416,8 +416,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
interpreter.pull(conn(in))
|
||||
} else {
|
||||
// Detailed error information should not add overhead to the hot path
|
||||
require(!isClosed(in), "Cannot pull closed port")
|
||||
require(!hasBeenPulled(in), "Cannot pull port twice")
|
||||
require(!isClosed(in), s"Cannot pull closed port ($in)")
|
||||
require(!hasBeenPulled(in), s"Cannot pull port ($in) twice")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -451,7 +451,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
elem.asInstanceOf[T]
|
||||
} else {
|
||||
// Slow path
|
||||
require(isAvailable(in), "Cannot get element from already empty input port")
|
||||
require(isAvailable(in), s"Cannot get element from already empty input port ($in)")
|
||||
val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed]
|
||||
val elem = failed.previousElem.asInstanceOf[T]
|
||||
interpreter.connectionSlots(connection) = Failed(failed.ex, Empty)
|
||||
|
|
@ -505,8 +505,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
} else {
|
||||
// Detailed error information should not add overhead to the hot path
|
||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||
require(isAvailable(out), "Cannot push port twice")
|
||||
require(!isClosed(out), "Cannot pull closed port")
|
||||
require(isAvailable(out), s"Cannot push port ($out) twice")
|
||||
require(!isClosed(out), s"Cannot pull closed port ($out)")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1053,15 +1053,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
def hasBeenPulled: Boolean = pulled && !isClosed
|
||||
|
||||
def grab(): T = {
|
||||
require(elem != null, "cannot grab element from port when data have not yet arrived")
|
||||
require(elem != null, s"cannot grab element from port ($this) when data have not yet arrived")
|
||||
val ret = elem
|
||||
elem = null.asInstanceOf[T]
|
||||
ret
|
||||
}
|
||||
|
||||
def pull(): Unit = {
|
||||
require(!pulled, "cannot pull port twice")
|
||||
require(!closed, "cannot pull closed port")
|
||||
require(!pulled, s"cannot pull port ($this) twice")
|
||||
require(!closed, s"cannot pull closed port ($this) ")
|
||||
pulled = true
|
||||
_sink.pullSubstream()
|
||||
}
|
||||
|
|
@ -1070,6 +1070,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
closed = true
|
||||
_sink.cancelSubstream()
|
||||
}
|
||||
|
||||
override def toString = s"SubSinkInlet($name)"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1160,6 +1162,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
closed = true
|
||||
_source.failSubstream(ex)
|
||||
}
|
||||
|
||||
override def toString = s"SubSourceOutlet($name)"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue