From 8b8c7355bf6f986e55e87ae9e2251b35142ebf1c Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Wed, 5 Dec 2018 14:31:43 +0100 Subject: [PATCH] Adding loglevel for overflow strategy #25949 (#25976) * Adding loglevel for overflow strategy (#25949) * Simplifying overflow strategy api (#25949) * Adding overflow strategy check for backpressu (#25949) * Adding log statements to all stages with overflow strategy (#25949) * Adding excludes for internal api changes (#25949) * Adding internal api annotations (#25949) * Adding log source overrides for better logger names (#25949) * Moving mima excludes for 2.5.18 (#25949) --- .../stream/InvokeWithFeedbackBenchmark.scala | 2 +- .../mima-filters/2.5.18.backwards.excludes | 52 +++++++++++++ .../scala/akka/stream/OverflowStrategy.scala | 73 +++++++++++++------ .../stream/impl/ActorRefSourceActor.scala | 26 +++---- .../scala/akka/stream/impl/QueueSource.scala | 34 ++++++--- .../scala/akka/stream/impl/fusing/Ops.scala | 54 +++++++++----- .../scala/akka/stream/scaladsl/Source.scala | 2 +- 7 files changed, 175 insertions(+), 68 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.5.18.backwards.excludes diff --git a/akka-bench-jmh/src/main/scala/akka/stream/InvokeWithFeedbackBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/InvokeWithFeedbackBenchmark.scala index 4c36bb8308..9814f1c7ff 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/InvokeWithFeedbackBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/InvokeWithFeedbackBenchmark.scala @@ -33,7 +33,7 @@ class InvokeWithFeedbackBenchmark { // these are currently the only two built in stages using invokeWithFeedback val (in, out) = - Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategies.Backpressure) + Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.backpressure) .toMat(Sink.queue[Int]())(Keep.both) .run() diff --git a/akka-stream/src/main/mima-filters/2.5.18.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.18.backwards.excludes new file mode 100644 index 0000000000..4a846b1afb --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.18.backwards.excludes @@ -0,0 +1,52 @@ +# #25949 adding loglevel for overflow strategy +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.DelayOverflowStrategy.isBackpressure") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.OverflowStrategy.withLogLevel") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.OverflowStrategy.logLevel") + +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.OverflowStrategies$Fail$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Fail.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Fail.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Fail.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Fail.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Fail.productPrefix") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.OverflowStrategies#Fail.toString") + +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.OverflowStrategies$Backpressure$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Backpressure.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Backpressure.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Backpressure.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Backpressure.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#Backpressure.productPrefix") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.OverflowStrategies#Backpressure.toString") + +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.OverflowStrategies$DropHead$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropHead.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropHead.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropHead.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropHead.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropHead.productPrefix") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.OverflowStrategies#DropHead.toString") + +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.OverflowStrategies$DropBuffer$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropBuffer.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropBuffer.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropBuffer.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropBuffer.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropBuffer.productPrefix") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.OverflowStrategies#DropBuffer.toString") + +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.OverflowStrategies$DropTail$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropTail.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropTail.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropTail.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropTail.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropTail.productPrefix") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.OverflowStrategies#DropTail.toString") + +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.OverflowStrategies$DropNew$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropNew.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropNew.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropNew.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropNew.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.OverflowStrategies#DropNew.productPrefix") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.OverflowStrategies#DropNew.toString") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index be8ad5ecf4..9bacb74fe7 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -5,14 +5,19 @@ package akka.stream import OverflowStrategies._ -import akka.annotation.DoNotInherit +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.event.Logging +import akka.event.Logging.LogLevel /** * Represents a strategy that decides how to deal with a buffer of time based operator * that is full but is about to receive a new element. */ @DoNotInherit -sealed abstract class DelayOverflowStrategy extends Serializable +sealed abstract class DelayOverflowStrategy extends Serializable { + /** INTERNAL API */ + @InternalApi private[akka] def isBackpressure: Boolean +} final case class BufferOverflowException(msg: String) extends RuntimeException(msg) @@ -21,37 +26,61 @@ final case class BufferOverflowException(msg: String) extends RuntimeException(m * about to receive a new element. */ @DoNotInherit -sealed abstract class OverflowStrategy extends DelayOverflowStrategy +sealed abstract class OverflowStrategy extends DelayOverflowStrategy { + /** INTERNAL API */ + @InternalApi private[akka] def logLevel: LogLevel + def withLogLevel(logLevel: Logging.LogLevel): OverflowStrategy +} private[akka] object OverflowStrategies { /** * INTERNAL API */ - private[akka] case object DropHead extends OverflowStrategy + private[akka] case class DropHead(logLevel: LogLevel) extends OverflowStrategy { + override def withLogLevel(logLevel: LogLevel): DropHead = DropHead(logLevel) + private[akka] override def isBackpressure: Boolean = false + } /** * INTERNAL API */ - private[akka] case object DropTail extends OverflowStrategy + private[akka] case class DropTail(logLevel: LogLevel) extends OverflowStrategy { + override def withLogLevel(logLevel: LogLevel): DropTail = DropTail(logLevel) + private[akka] override def isBackpressure: Boolean = false + } /** * INTERNAL API */ - private[akka] case object DropBuffer extends OverflowStrategy + private[akka] case class DropBuffer(logLevel: LogLevel) extends OverflowStrategy { + override def withLogLevel(logLevel: LogLevel): DropBuffer = DropBuffer(logLevel) + private[akka] override def isBackpressure: Boolean = false + } /** * INTERNAL API */ - private[akka] case object DropNew extends OverflowStrategy + private[akka] case class DropNew(logLevel: LogLevel) extends OverflowStrategy { + override def withLogLevel(logLevel: LogLevel): DropNew = DropNew(logLevel) + private[akka] override def isBackpressure: Boolean = false + } /** * INTERNAL API */ - private[akka] case object Backpressure extends OverflowStrategy + private[akka] case class Backpressure(logLevel: LogLevel) extends OverflowStrategy { + override def withLogLevel(logLevel: LogLevel): Backpressure = Backpressure(logLevel) + private[akka] override def isBackpressure: Boolean = true + } /** * INTERNAL API */ - private[akka] case object Fail extends OverflowStrategy + private[akka] case class Fail(logLevel: LogLevel) extends OverflowStrategy { + override def withLogLevel(logLevel: LogLevel): Fail = Fail(logLevel) + private[akka] override def isBackpressure: Boolean = false + } /** * INTERNAL API */ - private[akka] case object EmitEarly extends DelayOverflowStrategy + private[akka] case object EmitEarly extends DelayOverflowStrategy { + private[akka] override def isBackpressure: Boolean = false + } } object OverflowStrategy { @@ -59,34 +88,34 @@ object OverflowStrategy { * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. */ - def dropHead: OverflowStrategy = DropHead + def dropHead: OverflowStrategy = DropHead(Logging.DebugLevel) /** * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for * the new element. */ - def dropTail: OverflowStrategy = DropTail + def dropTail: OverflowStrategy = DropTail(Logging.DebugLevel) /** * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. */ - def dropBuffer: OverflowStrategy = DropBuffer + def dropBuffer: OverflowStrategy = DropBuffer(Logging.DebugLevel) /** * If the buffer is full when a new element arrives, drops the new element. */ - def dropNew: OverflowStrategy = DropNew + def dropNew: OverflowStrategy = DropNew(Logging.DebugLevel) /** * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until * space becomes available in the buffer. */ - def backpressure: OverflowStrategy = Backpressure + def backpressure: OverflowStrategy = Backpressure(Logging.DebugLevel) /** * If the buffer is full when a new element is available this strategy completes the stream with failure. */ - def fail: OverflowStrategy = Fail + def fail: OverflowStrategy = Fail(Logging.ErrorLevel) } object DelayOverflowStrategy { @@ -99,32 +128,32 @@ object DelayOverflowStrategy { * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. */ - def dropHead: DelayOverflowStrategy = DropHead + def dropHead: DelayOverflowStrategy = DropHead(Logging.DebugLevel) /** * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for * the new element. */ - def dropTail: DelayOverflowStrategy = DropTail + def dropTail: DelayOverflowStrategy = DropTail(Logging.DebugLevel) /** * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. */ - def dropBuffer: DelayOverflowStrategy = DropBuffer + def dropBuffer: DelayOverflowStrategy = DropBuffer(Logging.DebugLevel) /** * If the buffer is full when a new element arrives, drops the new element. */ - def dropNew: DelayOverflowStrategy = DropNew + def dropNew: DelayOverflowStrategy = DropNew(Logging.DebugLevel) /** * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until * space becomes available in the buffer. */ - def backpressure: DelayOverflowStrategy = Backpressure + def backpressure: DelayOverflowStrategy = Backpressure(Logging.DebugLevel) /** * If the buffer is full when a new element is available this strategy completes the stream with failure. */ - def fail: DelayOverflowStrategy = Fail + def fail: DelayOverflowStrategy = Fail(Logging.ErrorLevel) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index ce1fb565d4..4416a0b59c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -72,27 +72,27 @@ import akka.stream.ActorMaterializerSettings else if (!buffer.isFull) buffer.enqueue(elem) else overflowStrategy match { - case DropHead ⇒ - log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") + case s: DropHead ⇒ + log.log(s.logLevel, "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() buffer.enqueue(elem) - case DropTail ⇒ - log.debug("Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") + case s: DropTail ⇒ + log.log(s.logLevel, "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") buffer.dropTail() buffer.enqueue(elem) - case DropBuffer ⇒ - log.debug("Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") + case s: DropBuffer ⇒ + log.log(s.logLevel, "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") buffer.clear() buffer.enqueue(elem) - case DropNew ⇒ + case s: DropNew ⇒ // do not enqueue new element if the buffer is full - log.debug("Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") - case Fail ⇒ - log.error("Failing because buffer is full and overflowStrategy is: [Fail]") - onErrorThenStop(new BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) - case Backpressure ⇒ + log.log(s.logLevel, "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") + case s: Fail ⇒ + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + onErrorThenStop(BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) + case s: Backpressure ⇒ // there is a precondition check in Source.actorRefSource factory method - log.debug("Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") + log.log(s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index c2d1c91abc..a56e9c37a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -40,7 +40,9 @@ import scala.util.control.NonFatal override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val completion = Promise[Done] - val stageLogic = new GraphStageLogic(shape) with OutHandler with SourceQueueWithComplete[T] { + val stageLogic = new GraphStageLogic(shape) with OutHandler with SourceQueueWithComplete[T] with StageLogging { + override protected def logSource: Class[_] = classOf[QueueSource[_]] + var buffer: Buffer[T] = _ var pendingOffer: Option[Offer[T]] = None var terminating = false @@ -62,23 +64,29 @@ import scala.util.control.NonFatal if (!buffer.isFull) { enqueueAndSuccess(offer) } else overflowStrategy match { - case DropHead ⇒ + case s: DropHead ⇒ + log.log(s.logLevel, "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() enqueueAndSuccess(offer) - case DropTail ⇒ + case s: DropTail ⇒ + log.log(s.logLevel, "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") buffer.dropTail() enqueueAndSuccess(offer) - case DropBuffer ⇒ + case s: DropBuffer ⇒ + log.log(s.logLevel, "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") buffer.clear() enqueueAndSuccess(offer) - case DropNew ⇒ + case s: DropNew ⇒ + log.log(s.logLevel, "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") offer.promise.success(QueueOfferResult.Dropped) - case Fail ⇒ + case s: Fail ⇒ + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") offer.promise.success(QueueOfferResult.Failure(bufferOverflowException)) completion.failure(bufferOverflowException) failStage(bufferOverflowException) - case Backpressure ⇒ + case s: Backpressure ⇒ + log.log(s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") pendingOffer match { case Some(_) ⇒ offer.promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) @@ -102,17 +110,21 @@ import scala.util.control.NonFatal } else if (pendingOffer.isEmpty) pendingOffer = Some(offer) else overflowStrategy match { - case DropHead | DropBuffer ⇒ + case s @ (_: DropHead | _: DropBuffer) ⇒ + log.log(s.logLevel, "Dropping element because buffer is full and overflowStrategy is: [{}]", s) pendingOffer.get.promise.success(QueueOfferResult.Dropped) pendingOffer = Some(offer) - case DropTail | DropNew ⇒ + case s @ (_: DropTail | _: DropNew) ⇒ + log.log(s.logLevel, "Dropping element because buffer is full and overflowStrategy is: [{}]", s) promise.success(QueueOfferResult.Dropped) - case Fail ⇒ + case s: Fail ⇒ + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") promise.success(QueueOfferResult.Failure(bufferOverflowException)) completion.failure(bufferOverflowException) failStage(bufferOverflowException) - case Backpressure ⇒ + case s: Backpressure ⇒ + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Backpressure]") promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 351dc0de16..ab23b9194c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -868,33 +868,47 @@ private[stream] object Collect { */ @InternalApi private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] { - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + override protected def logSource: Class[_] = classOf[Buffer[_]] private var buffer: BufferImpl[T] = _ val enqueueAction: T ⇒ Unit = overflowStrategy match { - case DropHead ⇒ elem ⇒ - if (buffer.isFull) buffer.dropHead() + case s: DropHead ⇒ elem ⇒ + if (buffer.isFull) { + log.log(s.logLevel, "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") + buffer.dropHead() + } buffer.enqueue(elem) pull(in) - case DropTail ⇒ elem ⇒ - if (buffer.isFull) buffer.dropTail() + case s: DropTail ⇒ elem ⇒ + if (buffer.isFull) { + log.log(s.logLevel, "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") + buffer.dropTail() + } buffer.enqueue(elem) pull(in) - case DropBuffer ⇒ elem ⇒ - if (buffer.isFull) buffer.clear() + case s: DropBuffer ⇒ elem ⇒ + if (buffer.isFull) { + log.log(s.logLevel, "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") + buffer.clear() + } buffer.enqueue(elem) pull(in) - case DropNew ⇒ elem ⇒ + case s: DropNew ⇒ elem ⇒ if (!buffer.isFull) buffer.enqueue(elem) + else log.log(s.logLevel, "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") pull(in) - case Backpressure ⇒ elem ⇒ + case s: Backpressure ⇒ elem ⇒ buffer.enqueue(elem) if (!buffer.isFull) pull(in) - case Fail ⇒ elem ⇒ - if (buffer.isFull) failStage(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) - else { + else log.log(s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") + case s: Fail ⇒ elem ⇒ + if (buffer.isFull) { + log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") + failStage(BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) + } else { buffer.enqueue(elem) pull(in) } @@ -1648,31 +1662,31 @@ private[stream] object Collect { } grabAndPull() } - case DropHead ⇒ + case _: DropHead ⇒ () ⇒ { buffer.dropHead() grabAndPull() } - case DropTail ⇒ + case _: DropTail ⇒ () ⇒ { buffer.dropTail() grabAndPull() } - case DropNew ⇒ + case _: DropNew ⇒ () ⇒ { grab(in) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } - case DropBuffer ⇒ + case _: DropBuffer ⇒ () ⇒ { buffer.clear() grabAndPull() } - case Fail ⇒ + case _: Fail ⇒ () ⇒ { - failStage(new BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!")) + failStage(BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!")) } - case Backpressure ⇒ + case _: Backpressure ⇒ () ⇒ { throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } @@ -1690,7 +1704,7 @@ private[stream] object Collect { } def pullCondition: Boolean = - strategy != Backpressure || buffer.used < size + !strategy.isBackpressure || buffer.used < size def grabAndPull(): Unit = { buffer.enqueue((System.nanoTime(), grab(in))) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7cd37d3135..a361168751 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -494,7 +494,7 @@ object Source { failureMatcher: PartialFunction[Any, Throwable], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") - require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") + require(!overflowStrategy.isBackpressure, "Backpressure overflowStrategy not supported") fromGraph(new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) }