diff --git a/akka-docs-dev/rst/java/stream-rate.rst b/akka-docs-dev/rst/java/stream-rate.rst index 86e71cb54c..00c5869905 100644 --- a/akka-docs-dev/rst/java/stream-rate.rst +++ b/akka-docs-dev/rst/java/stream-rate.rst @@ -107,6 +107,11 @@ we want to be nice to jobs that has been waiting for long, then this option can .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-droptail +Instead of dropping the youngest element from the tail of the buffer a new element can be dropped without +enqueueing it to the buffer at all. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-dropnew + Here is another example with a queue of 1000 jobs, but it makes space for the new element by dropping one element from the *head* of the buffer. This is the *oldest* waiting job. This is the preferred strategy if jobs are expected to be diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index ce659c8caa..d8b63b350f 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -68,6 +68,10 @@ class StreamBuffersRateSpec extends AkkaSpec { jobs.buffer(1000, OverflowStrategy.dropTail) //#explicit-buffers-droptail + //#explicit-buffers-dropnew + jobs.buffer(1000, OverflowStrategy.dropNew) + //#explicit-buffers-dropnew + //#explicit-buffers-drophead jobs.buffer(1000, OverflowStrategy.dropHead) //#explicit-buffers-drophead diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst index ebf54583eb..82171262a0 100644 --- a/akka-docs-dev/rst/scala/stream-rate.rst +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -107,6 +107,11 @@ we want to be nice to jobs that has been waiting for long, then this option can .. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-droptail +Instead of dropping the youngest element from the tail of the buffer a new element can be dropped without +enqueueing it to the buffer at all. + +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-dropnew + Here is another example with a queue of 1000 jobs, but it makes space for the new element by dropping one element from the *head* of the buffer. This is the *oldest* waiting job. This is the preferred strategy if jobs are expected to be diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 1cf3192726..0a64e316e1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -7,6 +7,7 @@ import scala.concurrent.duration._ import akka.stream.ActorFlowMaterializer import akka.stream.OverflowStrategy import akka.stream.testkit._ +import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ import akka.actor.PoisonPill import akka.actor.Status @@ -44,6 +45,20 @@ class ActorRefSourceSpec extends AkkaSpec { for (n ← 300 to 399) s.expectNext(n) } + "drop new when full and with dropNew strategy" in { + val (ref, sub) = Source.actorRef(100, OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run() + + for (n ← 1 to 20) ref ! n + sub.request(10) + for (n ← 1 to 10) sub.expectNext(n) + sub.request(10) + for (n ← 11 to 20) sub.expectNext(n) + + for (n ← 200 to 399) ref ! n + sub.request(100) + for (n ← 200 to 299) sub.expectNext(n) + } + "terminate when the stream is cancelled" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink(s)).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index d3fa4af163..920970c338 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -6,12 +6,12 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy.Fail.BufferOverflowException import akka.stream.testkit._ +import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ class FlowBufferSpec extends AkkaSpec { @@ -148,6 +148,26 @@ class FlowBufferSpec extends AkkaSpec { sub.cancel() } + "drop new elements if buffer is full and configured so" in { + val (publisher, subscriber) = TestSource.probe[Int].buffer(100, overflowStrategy = OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run() + + // Fill up buffer + for (i ← 1 to 150) publisher.sendNext(i) + + // drain + for (i ← 1 to 100) { + subscriber.requestNext(i) + } + + subscriber.request(1) + subscriber.expectNoMsg(1.seconds) + + publisher.sendNext(-1) + subscriber.requestNext(-1) + + subscriber.cancel() + } + "fail upstream if buffer is full and configured so" in assertAllStagesStopped { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index 4a2e7d205d..04e9067301 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -25,6 +25,8 @@ object OverflowStrategy { */ private[akka] final case object DropBuffer extends OverflowStrategy + private[akka] final case object DropNew extends OverflowStrategy + /** * INTERNAL API */ @@ -54,6 +56,11 @@ object OverflowStrategy { */ def dropBuffer: OverflowStrategy = DropBuffer + /** + * If the buffer is full when a new element arrives, drops the new element. + */ + def dropNew: OverflowStrategy = DropNew + /** * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until * space becomes available in the buffer. diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 5789602d47..5d8bde2560 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -63,6 +63,8 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf case DropBuffer ⇒ buffer.clear() buffer.enqueue(elem) + case DropNew ⇒ + // do not enqueue new element if the buffer is full case Fail ⇒ onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) case Backpressure ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6f1ab826bf..be5afe813c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -249,6 +249,10 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt buffer.enqueue(elem) ctx.pull() } + case DropNew ⇒ { (ctx, elem) ⇒ + if (!buffer.isFull) buffer.enqueue(elem) + ctx.pull() + } case Backpressure ⇒ { (ctx, elem) ⇒ buffer.enqueue(elem) if (buffer.isFull) ctx.holdUpstream() @@ -608,4 +612,4 @@ private[akka] object Log { private final val DefaultLoggerName = "akka.stream.Log" private final val OffInt = LogLevels.Off.asInt private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) -} \ No newline at end of file +}