#20573 Fixing wrong initial buffer sizes in delay, and one logic bug

This commit is contained in:
Endre Sándor Varga 2016-05-24 11:42:17 +02:00
parent 03395d5739
commit 14b70836a7
3 changed files with 39 additions and 5 deletions

View file

@ -3,12 +3,14 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.Done
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.stream.{ BufferOverflowException, DelayOverflowStrategy, ActorMaterializer } import akka.stream.{ ActorMaterializer, Attributes, BufferOverflowException, DelayOverflowStrategy }
import scala.concurrent.Await
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
@ -88,7 +90,7 @@ class FlowDelaySpec extends AkkaSpec {
} }
"pass elements with delay through normally in backpressured mode" in assertAllStagesStopped { "pass elements with delay through normally in backpressured mode" in assertAllStagesStopped {
Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).runWith(TestSink.probe[Int]) Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).withAttributes(inputBuffer(1, 1)).runWith(TestSink.probe[Int])
.request(5) .request(5)
.expectNoMsg(200.millis) .expectNoMsg(200.millis)
.expectNext(200.millis, 1) .expectNext(200.millis, 1)
@ -123,5 +125,37 @@ class FlowDelaySpec extends AkkaSpec {
//fail will terminate despite of non empty internal buffer //fail will terminate despite of non empty internal buffer
pSub.sendError(new RuntimeException() with NoStackTrace) pSub.sendError(new RuntimeException() with NoStackTrace)
} }
"properly delay according to buffer size" in {
import akka.pattern.pipe
import system.dispatcher
// With a buffer size of 1, delays add up
Source(1 to 5)
.delay(500.millis, DelayOverflowStrategy.backpressure)
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
.runWith(Sink.ignore).pipeTo(testActor)
expectNoMsg(2.seconds)
expectMsg(Done)
// With a buffer large enough to hold all arriving elements, delays don't add up
Source(1 to 100)
.delay(1.second, DelayOverflowStrategy.backpressure)
.withAttributes(Attributes.inputBuffer(initial = 100, max = 100))
.runWith(Sink.ignore).pipeTo(testActor)
expectMsg(Done)
// Delays that are already present are preserved when buffer is large enough
Source.tick(100.millis, 100.millis, ()).take(10)
.delay(1.second, DelayOverflowStrategy.backpressure)
.withAttributes(Attributes.inputBuffer(initial = 10, max = 10))
.runWith(Sink.ignore).pipeTo(testActor)
expectNoMsg(900.millis)
expectMsg(Done)
}
} }
} }

View file

@ -84,7 +84,7 @@ private[stream] object Stages {
val repeat = name("repeat") val repeat = name("repeat")
val unfold = name("unfold") val unfold = name("unfold")
val unfoldAsync = name("unfoldAsync") val unfoldAsync = name("unfoldAsync")
val delay = name("delay") and inputBuffer(16, 16) val delay = name("delay")
val terminationWatcher = name("terminationWatcher") val terminationWatcher = name("terminationWatcher")

View file

@ -1125,7 +1125,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
} }
else { else {
grabAndPull(strategy != Backpressure || buffer.capacity < size - 1) grabAndPull(strategy != Backpressure || buffer.used < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d) if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
} }
} }