From 907c6a6931bbc57d698c99fbbe0498859cd6e9db Mon Sep 17 00:00:00 2001 From: Kevin Mas Ruiz Date: Mon, 12 Mar 2018 14:08:06 +0100 Subject: [PATCH] Do not drop messages in delay with EmitEarly when buffer overflows (#24642) --- .../akka/stream/scaladsl/FlowDelaySpec.scala | 17 +++++++++++++---- .../scala/akka/stream/impl/fusing/Ops.scala | 1 + 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index df84c32b7b..05bd75b9ee 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -5,16 +5,16 @@ package akka.stream.scaladsl import akka.Done import akka.stream.Attributes._ +import akka.stream.OverflowStrategies.EmitEarly import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } -import akka.stream.{ ActorMaterializer, Attributes, BufferOverflowException, DelayOverflowStrategy } +import akka.stream._ +import akka.testkit.TimingTest -import scala.concurrent.{ Await, Future } +import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.stream.ThrottleMode -import akka.testkit.TimingTest class FlowDelaySpec extends StreamSpec { @@ -190,5 +190,14 @@ class FlowDelaySpec extends StreamSpec { .expectComplete() } + "not drop messages on overflow when EmitEarly" in { + val probe = Source(1 to 2) + .delay(1.second, EmitEarly).withAttributes(Attributes.inputBuffer(1, 1)) + .runWith(TestSink.probe) + + probe.request(10) + .expectNextN(1 to 2) + .expectComplete() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 5902586c4c..430f3fd4d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1643,6 +1643,7 @@ private[stream] object Collect { cancelTimer(timerName) onTimer(timerName) } + grabAndPull() } case DropHead ⇒ () ⇒ {