Do not drop messages in delay with EmitEarly when buffer overflows (#24642)

This commit is contained in:
Kevin Mas Ruiz 2018-03-12 14:08:06 +01:00 committed by Johan Andrén
parent f7637d24e6
commit 907c6a6931
2 changed files with 14 additions and 4 deletions

View file

@ -5,16 +5,16 @@ package akka.stream.scaladsl
import akka.Done
import akka.stream.Attributes._
import akka.stream.OverflowStrategies.EmitEarly
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.{ ActorMaterializer, Attributes, BufferOverflowException, DelayOverflowStrategy }
import akka.stream._
import akka.testkit.TimingTest
import scala.concurrent.{ Await, Future }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.ThrottleMode
import akka.testkit.TimingTest
class FlowDelaySpec extends StreamSpec {
@ -190,5 +190,14 @@ class FlowDelaySpec extends StreamSpec {
.expectComplete()
}
"not drop messages on overflow when EmitEarly" in {
val probe = Source(1 to 2)
.delay(1.second, EmitEarly).withAttributes(Attributes.inputBuffer(1, 1))
.runWith(TestSink.probe)
probe.request(10)
.expectNextN(1 to 2)
.expectComplete()
}
}
}

View file

@ -1643,6 +1643,7 @@ private[stream] object Collect {
cancelTimer(timerName)
onTimer(timerName)
}
grabAndPull()
}
case DropHead
() {