diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index a22231ac12..b17b7fe85d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -219,5 +219,17 @@ class FlowDelaySpec extends StreamSpec { probe.request(10).expectNextN(1 to 2).expectComplete() } + + // repeater for #27095 + "not throw NPE when using EmitEarly and buffer is full" taggedAs TimingTest in { + val result = + Source(1 to 9) + .delay(1.second, DelayOverflowStrategy.emitEarly) + .addAttributes(Attributes.inputBuffer(5, 5)) + .runWith(Sink.seq) + .futureValue + + result should ===((1 to 9).toSeq) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 111b361e89..d1e9e53403 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1775,10 +1775,16 @@ private[stream] object Collect { } } - def pullCondition: Boolean = - !strategy.isBackpressure || buffer.used < size + def pullCondition: Boolean = strategy match { + case EmitEarly => + // when buffer is full we can only emit early if out is available + buffer.used < size || isAvailable(out) + case _ => + !strategy.isBackpressure || buffer.used < size + } def grabAndPull(): Unit = { + if (buffer.used == size) throw new IllegalStateException("Trying to enqueue but buffer is full") buffer.enqueue((System.nanoTime(), grab(in))) if (pullCondition) pull(in) }