diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index b47ed50134..a839ad2e36 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1564,7 +1564,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val otherKeys = otherDigests.keySet val myKeys = if (totChunks == 1) dataEntries.keySet - else dataEntries.keysIterator.filter(key => math.abs(key.hashCode) % totChunks == chunk).toSet + else dataEntries.keysIterator.filter(key ⇒ math.abs(key.hashCode) % totChunks == chunk).toSet val otherMissingKeys = myKeys diff otherKeys val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements) if (keys.nonEmpty) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index 91222f8744..6494165d84 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -236,6 +236,116 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit { ex.getMessage should startWith("not yet initialized: only setHandler is allowed in GraphStageLogic constructor") } + "give a good error message if in handler missing" in { + val ex = intercept[IllegalStateException] { + Source.maybe[String] + .via(new GraphStage[FlowShape[String, String]] { + val in = Inlet[String]("in") + val out = Outlet[String]("out") + override val shape: FlowShape[String, String] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) { + // ups, we forgot to actually set the handlers + } + + override def toString = "stage-name" + }).runWith(Sink.ignore) + } + ex.getMessage should startWith("No handler defined in stage [stage-name] for in port [in") + } + + "give a good error message if out handler missing" in { + val ex = intercept[IllegalStateException] { + Source.maybe[String] + .via(new GraphStage[FlowShape[String, String]] { + val in = Inlet[String]("in") + val out = Outlet[String]("out") + override val shape: FlowShape[String, String] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush() = ??? + }) + // ups we forgot the out handler + } + + override def toString = "stage-name" + }) + // just to have another graphstage downstream + .map(_ ⇒ "whatever") + .runWith(Sink.ignore) + } + ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + } + + "give a good error message if out handler missing with downstream boundary" in { + val ex = intercept[IllegalStateException] { + Source.maybe[String] + .via(new GraphStage[FlowShape[String, String]] { + val in = Inlet[String]("in") + val out = Outlet[String]("out") + override val shape: FlowShape[String, String] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush() = ??? + }) + // ups we forgot the out handler + } + + override def toString = "stage-name" + }).runWith(Sink.ignore.async) + } + ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + } + + "give a good error message if handler missing with downstream publisher" in { + val ex = intercept[IllegalStateException] { + Source.maybe[String].async + .via(new GraphStage[FlowShape[String, String]] { + val in = Inlet[String]("in") + val out = Outlet[String]("out") + override val shape: FlowShape[String, String] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush() = ??? + }) + // ups we forgot the out handler + } + + override def toString = "stage-name" + }).runWith(Sink.ignore) + } + ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + } + + "give a good error message if handler missing when stage is an island" in { + val ex = intercept[IllegalStateException] { + Source.maybe[String] + .via(new GraphStage[FlowShape[String, String]] { + val in = Inlet[String]("in") + val out = Outlet[String]("out") + override val shape: FlowShape[String, String] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush() = ??? + }) + // ups we forgot the out handler + } + + override def toString = "stage-name" + }).async.runWith(Sink.ignore) + } + ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index bb9e1dbe81..2d2e2a7a17 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -640,6 +640,7 @@ final class GraphStageIsland( connection.inOwner = logic connection.id = slot connection.inHandler = logic.handlers(in.id).asInstanceOf[InHandler] + if (connection.inHandler eq null) failOnMissingHandler(logic) logic.portToConn(in.id) = connection } @@ -648,6 +649,7 @@ final class GraphStageIsland( connection.outOwner = logic connection.id = slot connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler] + if (connection.outHandler eq null) failOnMissingHandler(logic) logic.portToConn(logic.inCount + out.id) = connection } @@ -664,6 +666,7 @@ final class GraphStageIsland( connection.outOwner = logic connection.id = -1 // Will be filled later connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler] + if (connection.outHandler eq null) failOnMissingHandler(logic) logic.portToConn(logic.inCount + out.id) = connection boundary.publisher @@ -693,6 +696,8 @@ final class GraphStageIsland( while (i < totalConnections) { val conn = outConns.head outConns = outConns.tail + if (conn.inHandler eq null) failOnMissingHandler(conn.inOwner) + else if (conn.outHandler eq null) failOnMissingHandler(conn.outOwner) finalConnections(i) = conn conn.id = i i += 1 @@ -716,6 +721,21 @@ final class GraphStageIsland( } } + private def failOnMissingHandler(logic: GraphStageLogic): Unit = { + val missingHandlerIdx = logic.handlers.indexWhere(_.asInstanceOf[AnyRef] eq null) + val isIn = missingHandlerIdx < logic.inCount + val portLabel = logic.originalStage match { + case OptionVal.Some(stage) ⇒ + if (isIn) s"in port [${stage.shape.inlets(missingHandlerIdx)}]" + else s"out port [${stage.shape.outlets(missingHandlerIdx - logic.inCount)}" + case OptionVal.None ⇒ + if (isIn) s"in port id [$missingHandlerIdx]" + else s"out port id [$missingHandlerIdx]" + } + throw new IllegalStateException(s"No handler defined in stage [${logic.originalStage.getOrElse(logic).toString}] for $portLabel." + + " All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.") + } + override def toString: String = "GraphStagePhase" }