From e182c5a1fc8d4ac60df723e4fcc2731a931bb586 Mon Sep 17 00:00:00 2001 From: __ Date: Mon, 6 Jan 2020 20:18:43 +0500 Subject: [PATCH] Add stream name to buffer overflow log message (#28159) * Add stream name to buffer overflow log message (#28085) * De-emphasize stream name by moving it to the back Co-authored-by: Arnout Engelen --- .../akka/stream/impl/ActorRefSource.scala | 30 +++++++++---- .../scala/akka/stream/impl/QueueSource.scala | 44 ++++++++++++++----- .../scala/akka/stream/impl/fusing/Ops.scala | 24 +++++++--- 3 files changed, 73 insertions(+), 25 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala index 2d8366a796..1d26120300 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -52,6 +52,7 @@ private object ActorRefSource { override protected def stageActorName: String = inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) + private val name = inheritedAttributes.nameOrDefault(getClass.toString) override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { case (_, PoisonPill) => log.warning( @@ -71,19 +72,23 @@ private object ActorRefSource { buffer match { case OptionVal.None => if (isCompleting) { - log.warning("Dropping element because Status.Success received already: [{}]", m) + log.warning("Dropping element because Status.Success received already: [{}] in stream [{}]", m, name) } else if (isAvailable(out)) { push(out, m) } else { - log.debug("Dropping element because there is no downstream demand and no buffer: [{}]", m) + log.debug( + "Dropping element because there is no downstream demand and no buffer: [{}] in stream [{}]", + m, + name) } case OptionVal.Some(buf) => if (isCompleting) { log.warning( - "Dropping element because Status.Success received already, only draining already buffered elements: [{}] (pending: [{}])", + "Dropping element because Status.Success received already, only draining already buffered elements: [{}] (pending: [{}] in stream [{}])", m, - buf.used) + buf.used, + name) } else if (!buf.isFull) { buf.enqueue(m) tryPush() @@ -92,30 +97,37 @@ private object ActorRefSource { case s: DropHead => log.log( s.logLevel, - "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") + "Dropping the head element because buffer is full and overflowStrategy is: [DropHead] in stream [{}]", + name) buf.dropHead() buf.enqueue(m) tryPush() case s: DropTail => log.log( s.logLevel, - "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") + "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [{}]", + name) buf.dropTail() buf.enqueue(m) tryPush() case s: DropBuffer => log.log( s.logLevel, - "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") + "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer] in stream [{}]", + name) buf.clear() buf.enqueue(m) tryPush() case s: DropNew => log.log( s.logLevel, - "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") + "Dropping the new element because buffer is full and overflowStrategy is: [DropNew] in stream [{}]", + name) case s: Fail => - log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + log.log( + s.logLevel, + "Failing because buffer is full and overflowStrategy is: [Fail] in stream [{}]", + name) val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") failStage(bufferOverflowException) diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 0879d8003f..3896180804 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -36,6 +36,7 @@ import scala.concurrent.{ Future, Promise } override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val completion = Promise[Done] + val name = inheritedAttributes.nameOrDefault(getClass.toString) val stageLogic = new GraphStageLogic(shape) with OutHandler with SourceQueueWithComplete[T] with StageLogging { override protected def logSource: Class[_] = classOf[QueueSource[_]] @@ -65,32 +66,41 @@ import scala.concurrent.{ Future, Promise } case s: DropHead => log.log( s.logLevel, - "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") + "Dropping the head element because buffer is full and overflowStrategy is: [DropHead] in stream [{}]", + name) buffer.dropHead() enqueueAndSuccess(offer) case s: DropTail => log.log( s.logLevel, - "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") + "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [{}]", + name) buffer.dropTail() enqueueAndSuccess(offer) case s: DropBuffer => log.log( s.logLevel, - "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") + "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer] in stream [{}]", + name) buffer.clear() enqueueAndSuccess(offer) case s: DropNew => - log.log(s.logLevel, "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") + log.log( + s.logLevel, + "Dropping the new element because buffer is full and overflowStrategy is: [DropNew] in stream [{}]", + name) offer.promise.success(QueueOfferResult.Dropped) case s: Fail => - log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail] in stream [{}]", name) val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") offer.promise.success(QueueOfferResult.Failure(bufferOverflowException)) completion.failure(bufferOverflowException) failStage(bufferOverflowException) case s: Backpressure => - log.log(s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") + log.log( + s.logLevel, + "Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [{}]", + name) pendingOffer match { case Some(_) => offer.promise.failure( @@ -118,21 +128,35 @@ import scala.concurrent.{ Future, Promise } else overflowStrategy match { case s @ (_: DropHead | _: DropBuffer) => - log.log(s.logLevel, "Dropping element because buffer is full and overflowStrategy is: [{}]", s) + log.log( + s.logLevel, + "Dropping element because buffer is full and overflowStrategy is: [{}] in stream [{}]", + s, + name) pendingOffer.get.promise.success(QueueOfferResult.Dropped) pendingOffer = Some(offer) case s @ (_: DropTail | _: DropNew) => - log.log(s.logLevel, "Dropping element because buffer is full and overflowStrategy is: [{}]", s) + log.log( + s.logLevel, + "Dropping element because buffer is full and overflowStrategy is: [{}] in stream [{}]", + s, + name) promise.success(QueueOfferResult.Dropped) case s: Fail => - log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + log.log( + s.logLevel, + "Failing because buffer is full and overflowStrategy is: [Fail] in stream [{}]", + name) val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") promise.success(QueueOfferResult.Failure(bufferOverflowException)) completion.failure(bufferOverflowException) failStage(bufferOverflowException) case s: Backpressure => - log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Backpressure]") + log.log( + s.logLevel, + "Failing because buffer is full and overflowStrategy is: [Backpressure] in stream [{}]", + name) promise.failure( new IllegalStateException( "You have to wait for previous offer to be resolved to send another request")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index f135c78162..27b943c97e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -909,6 +909,7 @@ private[stream] object Collect { private val buffer: BufferImpl[T] = BufferImpl(size, inheritedAttributes) + private val name = inheritedAttributes.nameOrDefault(getClass.toString) val enqueueAction: T => Unit = overflowStrategy match { case s: DropHead => @@ -916,7 +917,8 @@ private[stream] object Collect { if (buffer.isFull) { log.log( s.logLevel, - "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") + "Dropping the head element because buffer is full and overflowStrategy is: [DropHead] in stream [{}]", + name) buffer.dropHead() } buffer.enqueue(elem) @@ -926,7 +928,8 @@ private[stream] object Collect { if (buffer.isFull) { log.log( s.logLevel, - "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") + "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [{}]", + name) buffer.dropTail() } buffer.enqueue(elem) @@ -936,7 +939,8 @@ private[stream] object Collect { if (buffer.isFull) { log.log( s.logLevel, - "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") + "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer] in stream [{}]", + name) buffer.clear() } buffer.enqueue(elem) @@ -947,17 +951,25 @@ private[stream] object Collect { else log.log( s.logLevel, - "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") + "Dropping the new element because buffer is full and overflowStrategy is: [DropNew] in stream [{}]", + name) pull(in) case s: Backpressure => elem => buffer.enqueue(elem) if (!buffer.isFull) pull(in) - else log.log(s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") + else + log.log( + s.logLevel, + "Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [{}]", + name) case s: Fail => elem => if (buffer.isFull) { - log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + log.log( + s.logLevel, + "Failing because buffer is full and overflowStrategy is: [Fail] in stream [{}]", + name) failStage(BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) } else { buffer.enqueue(elem)