diff --git a/akka-stream-tests/src/test/resources/reference.conf b/akka-stream-tests/src/test/resources/reference.conf index e4d5562864..ffbc14a271 100644 --- a/akka-stream-tests/src/test/resources/reference.conf +++ b/akka-stream-tests/src/test/resources/reference.conf @@ -5,6 +5,7 @@ akka { serialize-messages = on default-dispatcher.throughput = 1 // Amplify the effects of fuzzing } + akka.actor.warn-about-java-serializer-usage = false stream.materializer.debug.fuzzing-mode = on } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index fec7e4e93f..ff1aad4a0b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -252,7 +252,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { } } - EventFilter[IllegalArgumentException](message = "Error after stage was closed.", occurrences = 1).intercept { + EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept { Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 41d5e8ddb6..0df902c945 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -467,7 +467,9 @@ private[stream] final class GraphInterpreter( logic.beforePreStart() logic.preStart() } catch { - case NonFatal(e) ⇒ logic.failStage(e) + case NonFatal(e) ⇒ + log.error(e, "Error during preStart in [{}]", assembly.stages(logic.stageId)) + logic.failStage(e) } afterStageHasRun(logic) i += 1 @@ -534,7 +536,12 @@ private[stream] final class GraphInterpreter( catch { case NonFatal(e) ⇒ if (activeStage == null) throw e - else activeStage.failStage(e, isInternal = true) + else { + val stage = assembly.stages(activeStage.stageId) + + log.error(e, "Error in stage [{}]: {}", stage, e.getMessage) + activeStage.failStage(e) + } } afterStageHasRun(activeStage) eventsRemaining -= 1 @@ -669,7 +676,7 @@ private[stream] final class GraphInterpreter( logic.afterPostStop() } catch { case NonFatal(e) ⇒ - log.error(e, s"Error during postStop in [${assembly.stages(logic.stageId)}]") + log.error(e, s"Error during postStop in [{}]: {}", assembly.stages(logic.stageId), e.getMessage) } } @@ -698,7 +705,7 @@ private[stream] final class GraphInterpreter( if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) } - private[stream] def fail(connection: Int, ex: Throwable, isInternal: Boolean): Unit = { + private[stream] def fail(connection: Int, ex: Throwable): Unit = { val currentState = portStates(connection) if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") portStates(connection) = currentState | OutClosed @@ -706,8 +713,6 @@ private[stream] final class GraphInterpreter( portStates(connection) = currentState | (OutClosed | InFailed) connectionSlots(connection) = Failed(ex, connectionSlots(connection)) if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection) - } else if (isInternal) { - log.error(ex, "Error after stage was closed.") } if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 789c49211a..e0354d6ad9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -266,8 +266,14 @@ private[stream] object TcpConnectionStage { } override def onUpstreamFailure(ex: Throwable): Unit = { - if (connection != null) connection ! Abort - else failStage(ex) + if (connection != null) { + if (interpreter.log.isDebugEnabled) { + interpreter.log.debug("Aborting tcp connection because of upstream failure: {}\n{}", + ex.getMessage, + ex.getStackTrace.mkString("\n")) + } + connection ! Abort + } else failStage(ex) } }) diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 1a7063d5fb..96d55ae8da 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -463,7 +463,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Signals failure through the given port. */ - final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex, isInternal = false) + final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex) /** * Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, @@ -487,21 +487,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called, * then stops the stage, then [[postStop()]] is called. */ - final def failStage(ex: Throwable): Unit = failStage(ex, isInternal = false) - - /** - * INTERNAL API - * - * Used to signal errors caught by the interpreter itself. This method logs failures if the stage has been - * already closed if ``isInternal`` is set to true. - */ - private[stream] final def failStage(ex: Throwable, isInternal: Boolean): Unit = { + final def failStage(ex: Throwable): Unit = { var i = 0 while (i < portToConn.length) { if (i < inCount) interpreter.cancel(portToConn(i)) else - interpreter.fail(portToConn(i), ex, isInternal) + interpreter.fail(portToConn(i), ex) i += 1 } setKeepGoing(false)