Added size() to BoundedSourceQueue #30124 (#30125)

This commit is contained in:
Alex 2021-03-22 14:45:47 +04:00 committed by GitHub
parent 22fd122f4c
commit 651b339777
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 0 deletions

View file

@ -193,5 +193,18 @@ class BoundedSourceQueueSpec extends StreamSpec("""akka.loglevel = debug
downstream.cancel()
}
"provide info about number of messages" in {
val sub = TestSubscriber.probe[Int]()
val queue = Source.queue[Int](100).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.offer(1)
queue.size() shouldBe 1
(2 to 100).map { i =>
queue.offer(i)
}
queue.size() shouldBe 100
}
}
}

View file

@ -0,0 +1,2 @@
# disable compatibility check for a new method in BoundedSourceQueue
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.BoundedSourceQueue.size")

View file

@ -32,4 +32,9 @@ trait BoundedSourceQueue[T] {
* Completes the stream with a failure.
*/
def fail(ex: Throwable): Unit
/**
* Returns the approximate number of elements in this queue.
*/
def size(): Int
}

View file

@ -133,6 +133,8 @@ import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Out
if (setDone(Done(QueueOfferResult.Failure(ex))))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}
override def size(): Int = queue.size()
}
// some state transition helpers