diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala index 7caa01641a..3e610e88dc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala @@ -193,5 +193,18 @@ class BoundedSourceQueueSpec extends StreamSpec("""akka.loglevel = debug downstream.cancel() } + + "provide info about number of messages" in { + val sub = TestSubscriber.probe[Int]() + val queue = Source.queue[Int](100).toMat(Sink.fromSubscriber(sub))(Keep.left).run() + + queue.offer(1) + queue.size() shouldBe 1 + + (2 to 100).map { i => + queue.offer(i) + } + queue.size() shouldBe 100 + } } } diff --git a/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30125-added-size-to-bounded-source-queue.excludes b/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30125-added-size-to-bounded-source-queue.excludes new file mode 100644 index 0000000000..9131764321 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30125-added-size-to-bounded-source-queue.excludes @@ -0,0 +1,2 @@ +# disable compatibility check for a new method in BoundedSourceQueue +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.BoundedSourceQueue.size") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/BoundedSourceQueue.scala b/akka-stream/src/main/scala/akka/stream/BoundedSourceQueue.scala index 3784c1fb5d..7db8374a61 100644 --- a/akka-stream/src/main/scala/akka/stream/BoundedSourceQueue.scala +++ b/akka-stream/src/main/scala/akka/stream/BoundedSourceQueue.scala @@ -32,4 +32,9 @@ trait BoundedSourceQueue[T] { * Completes the stream with a failure. */ def fail(ex: Throwable): Unit + + /** + * Returns the approximate number of elements in this queue. + */ + def size(): Int } diff --git a/akka-stream/src/main/scala/akka/stream/impl/BoundedSourceQueue.scala b/akka-stream/src/main/scala/akka/stream/impl/BoundedSourceQueue.scala index 68c42e26f9..bb64b3b736 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/BoundedSourceQueue.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/BoundedSourceQueue.scala @@ -133,6 +133,8 @@ import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Out if (setDone(Done(QueueOfferResult.Failure(ex)))) Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback } + + override def size(): Int = queue.size() } // some state transition helpers