diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 9ad1daca25..d87712abc6 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -3,6 +3,7 @@ */ package akka.stream.testkit +import scala.language.existentials import akka.actor.ActorSystem import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } import akka.testkit.TestProbe diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala index 878701ff6e..85b3b5d4f2 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala @@ -10,6 +10,7 @@ import java.net.InetSocketAddress import java.net.SocketAddress object TestUtils { // FIXME: remove once going back to project dependencies + import scala.language.reflectiveCalls // Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object type GeneralSocket = { def bind(sa: SocketAddress): Unit diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 109382723e..d6e4fa56d2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -250,7 +250,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin protected def onError(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 33dad6bfc7..7b02e0ecbd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -213,7 +213,8 @@ private[akka] abstract class FanIn(val settings: MaterializerSettings, val input override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) inputBunch.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index a8dd5cb71b..e7831d2020 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -244,7 +244,8 @@ private[akka] abstract class FanOut(val settings: MaterializerSettings, val outp override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) primaryInputs.cancel() outputBunch.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 5df3d7b9c9..0967d9cb0e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -105,7 +105,8 @@ private[akka] class FanoutProcessorImpl( } override def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) // Stopping will happen after flush diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 10307a6e7c..27ef598909 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -15,6 +15,7 @@ import akka.stream.StreamTcpException import org.reactivestreams.Processor import akka.actor.Stash import akka.stream.impl._ +import akka.actor.ActorLogging /** * INTERNAL API @@ -36,7 +37,8 @@ private[akka] object TcpStreamActor { /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash { +private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash + with ActorLogging { import TcpStreamActor._ @@ -183,6 +185,7 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) writePump.nextPhase(writePump.running) def fail(e: Throwable): Unit = { + log.debug("fail {} due to: {}", self, e.getMessage) tcpInputs.cancel() tcpOutputs.cancel(e) primaryInputs.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 3b7303bc36..d88420943e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -19,6 +19,7 @@ import akka.util.ByteString import org.reactivestreams.Subscriber import akka.stream.ConnectionException import akka.stream.BindFailedException +import akka.actor.ActorLogging /** * INTERNAL API @@ -39,7 +40,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], bindCmd: Tcp.Bind, settings: MaterializerSettings) extends Actor - with Pump with Stash { + with Pump with Stash with ActorLogging { import context.system object primaryOutputs extends SimpleOutputs(self, pump = this) { @@ -160,6 +161,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } def fail(e: Throwable): Unit = { + log.debug("fail {} due to: {}", self, e.getMessage) incomingConnections.cancel() primaryOutputs.cancel(e) } diff --git a/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala index 69565d6c19..067dd749a1 100644 --- a/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala +++ b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala @@ -360,7 +360,8 @@ class SslTlsCipherActor(val requester: ActorRef, val sessionNegotioation: SslTls override def receive = inputSubstreamManagement orElse outputSubstreamManagement protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (tracing) log.debug("fail {} due to: {}", self, e.getMessage) failInputs(e) failOutputs(e) context.stop(self)