Give a good error message on unassigned ports #22552
This commit is contained in:
parent
e4a09c207f
commit
d51f7abd63
3 changed files with 131 additions and 1 deletions
|
|
@ -1564,7 +1564,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
val otherKeys = otherDigests.keySet
|
val otherKeys = otherDigests.keySet
|
||||||
val myKeys =
|
val myKeys =
|
||||||
if (totChunks == 1) dataEntries.keySet
|
if (totChunks == 1) dataEntries.keySet
|
||||||
else dataEntries.keysIterator.filter(key => math.abs(key.hashCode) % totChunks == chunk).toSet
|
else dataEntries.keysIterator.filter(key ⇒ math.abs(key.hashCode) % totChunks == chunk).toSet
|
||||||
val otherMissingKeys = myKeys diff otherKeys
|
val otherMissingKeys = myKeys diff otherKeys
|
||||||
val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
|
val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
|
||||||
if (keys.nonEmpty) {
|
if (keys.nonEmpty) {
|
||||||
|
|
|
||||||
|
|
@ -236,6 +236,116 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
|
||||||
ex.getMessage should startWith("not yet initialized: only setHandler is allowed in GraphStageLogic constructor")
|
ex.getMessage should startWith("not yet initialized: only setHandler is allowed in GraphStageLogic constructor")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"give a good error message if in handler missing" in {
|
||||||
|
val ex = intercept[IllegalStateException] {
|
||||||
|
Source.maybe[String]
|
||||||
|
.via(new GraphStage[FlowShape[String, String]] {
|
||||||
|
val in = Inlet[String]("in")
|
||||||
|
val out = Outlet[String]("out")
|
||||||
|
override val shape: FlowShape[String, String] = FlowShape(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
// ups, we forgot to actually set the handlers
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString = "stage-name"
|
||||||
|
}).runWith(Sink.ignore)
|
||||||
|
}
|
||||||
|
ex.getMessage should startWith("No handler defined in stage [stage-name] for in port [in")
|
||||||
|
}
|
||||||
|
|
||||||
|
"give a good error message if out handler missing" in {
|
||||||
|
val ex = intercept[IllegalStateException] {
|
||||||
|
Source.maybe[String]
|
||||||
|
.via(new GraphStage[FlowShape[String, String]] {
|
||||||
|
val in = Inlet[String]("in")
|
||||||
|
val out = Outlet[String]("out")
|
||||||
|
override val shape: FlowShape[String, String] = FlowShape(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush() = ???
|
||||||
|
})
|
||||||
|
// ups we forgot the out handler
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString = "stage-name"
|
||||||
|
})
|
||||||
|
// just to have another graphstage downstream
|
||||||
|
.map(_ ⇒ "whatever")
|
||||||
|
.runWith(Sink.ignore)
|
||||||
|
}
|
||||||
|
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
|
||||||
|
}
|
||||||
|
|
||||||
|
"give a good error message if out handler missing with downstream boundary" in {
|
||||||
|
val ex = intercept[IllegalStateException] {
|
||||||
|
Source.maybe[String]
|
||||||
|
.via(new GraphStage[FlowShape[String, String]] {
|
||||||
|
val in = Inlet[String]("in")
|
||||||
|
val out = Outlet[String]("out")
|
||||||
|
override val shape: FlowShape[String, String] = FlowShape(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush() = ???
|
||||||
|
})
|
||||||
|
// ups we forgot the out handler
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString = "stage-name"
|
||||||
|
}).runWith(Sink.ignore.async)
|
||||||
|
}
|
||||||
|
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
|
||||||
|
}
|
||||||
|
|
||||||
|
"give a good error message if handler missing with downstream publisher" in {
|
||||||
|
val ex = intercept[IllegalStateException] {
|
||||||
|
Source.maybe[String].async
|
||||||
|
.via(new GraphStage[FlowShape[String, String]] {
|
||||||
|
val in = Inlet[String]("in")
|
||||||
|
val out = Outlet[String]("out")
|
||||||
|
override val shape: FlowShape[String, String] = FlowShape(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush() = ???
|
||||||
|
})
|
||||||
|
// ups we forgot the out handler
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString = "stage-name"
|
||||||
|
}).runWith(Sink.ignore)
|
||||||
|
}
|
||||||
|
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
|
||||||
|
}
|
||||||
|
|
||||||
|
"give a good error message if handler missing when stage is an island" in {
|
||||||
|
val ex = intercept[IllegalStateException] {
|
||||||
|
Source.maybe[String]
|
||||||
|
.via(new GraphStage[FlowShape[String, String]] {
|
||||||
|
val in = Inlet[String]("in")
|
||||||
|
val out = Outlet[String]("out")
|
||||||
|
override val shape: FlowShape[String, String] = FlowShape(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush() = ???
|
||||||
|
})
|
||||||
|
// ups we forgot the out handler
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString = "stage-name"
|
||||||
|
}).async.runWith(Sink.ignore)
|
||||||
|
}
|
||||||
|
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -640,6 +640,7 @@ final class GraphStageIsland(
|
||||||
connection.inOwner = logic
|
connection.inOwner = logic
|
||||||
connection.id = slot
|
connection.id = slot
|
||||||
connection.inHandler = logic.handlers(in.id).asInstanceOf[InHandler]
|
connection.inHandler = logic.handlers(in.id).asInstanceOf[InHandler]
|
||||||
|
if (connection.inHandler eq null) failOnMissingHandler(logic)
|
||||||
logic.portToConn(in.id) = connection
|
logic.portToConn(in.id) = connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -648,6 +649,7 @@ final class GraphStageIsland(
|
||||||
connection.outOwner = logic
|
connection.outOwner = logic
|
||||||
connection.id = slot
|
connection.id = slot
|
||||||
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
||||||
|
if (connection.outHandler eq null) failOnMissingHandler(logic)
|
||||||
logic.portToConn(logic.inCount + out.id) = connection
|
logic.portToConn(logic.inCount + out.id) = connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -664,6 +666,7 @@ final class GraphStageIsland(
|
||||||
connection.outOwner = logic
|
connection.outOwner = logic
|
||||||
connection.id = -1 // Will be filled later
|
connection.id = -1 // Will be filled later
|
||||||
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
||||||
|
if (connection.outHandler eq null) failOnMissingHandler(logic)
|
||||||
logic.portToConn(logic.inCount + out.id) = connection
|
logic.portToConn(logic.inCount + out.id) = connection
|
||||||
|
|
||||||
boundary.publisher
|
boundary.publisher
|
||||||
|
|
@ -693,6 +696,8 @@ final class GraphStageIsland(
|
||||||
while (i < totalConnections) {
|
while (i < totalConnections) {
|
||||||
val conn = outConns.head
|
val conn = outConns.head
|
||||||
outConns = outConns.tail
|
outConns = outConns.tail
|
||||||
|
if (conn.inHandler eq null) failOnMissingHandler(conn.inOwner)
|
||||||
|
else if (conn.outHandler eq null) failOnMissingHandler(conn.outOwner)
|
||||||
finalConnections(i) = conn
|
finalConnections(i) = conn
|
||||||
conn.id = i
|
conn.id = i
|
||||||
i += 1
|
i += 1
|
||||||
|
|
@ -716,6 +721,21 @@ final class GraphStageIsland(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def failOnMissingHandler(logic: GraphStageLogic): Unit = {
|
||||||
|
val missingHandlerIdx = logic.handlers.indexWhere(_.asInstanceOf[AnyRef] eq null)
|
||||||
|
val isIn = missingHandlerIdx < logic.inCount
|
||||||
|
val portLabel = logic.originalStage match {
|
||||||
|
case OptionVal.Some(stage) ⇒
|
||||||
|
if (isIn) s"in port [${stage.shape.inlets(missingHandlerIdx)}]"
|
||||||
|
else s"out port [${stage.shape.outlets(missingHandlerIdx - logic.inCount)}"
|
||||||
|
case OptionVal.None ⇒
|
||||||
|
if (isIn) s"in port id [$missingHandlerIdx]"
|
||||||
|
else s"out port id [$missingHandlerIdx]"
|
||||||
|
}
|
||||||
|
throw new IllegalStateException(s"No handler defined in stage [${logic.originalStage.getOrElse(logic).toString}] for $portLabel." +
|
||||||
|
" All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.")
|
||||||
|
}
|
||||||
|
|
||||||
override def toString: String = "GraphStagePhase"
|
override def toString: String = "GraphStagePhase"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue