chore: Avoid forwarding method on ArrayDequeue in stream module. (#1687)
This commit is contained in:
parent
7782cf55e8
commit
c953f50ded
3 changed files with 6 additions and 6 deletions
|
|
@ -45,7 +45,7 @@ import pekko.stream.stage._
|
|||
private val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
|
||||
require(maxBuffer > 0, "Buffer size must be greater than 0")
|
||||
|
||||
private val buffer: util.Deque[In] = new util.ArrayDeque[In]()
|
||||
private val buffer: util.ArrayDeque[In] = new util.ArrayDeque[In]()
|
||||
private var acknowledgementReceived = false
|
||||
private var completeReceived = false
|
||||
private var completionSignalled = false
|
||||
|
|
@ -75,7 +75,7 @@ import pekko.stream.stage._
|
|||
}
|
||||
|
||||
private def dequeueAndSend(): Unit = {
|
||||
ref ! messageAdapter(self)(buffer.poll())
|
||||
ref ! messageAdapter(self)(buffer.pollFirst())
|
||||
}
|
||||
|
||||
private def finish(): Unit = {
|
||||
|
|
@ -85,7 +85,7 @@ import pekko.stream.stage._
|
|||
}
|
||||
|
||||
def onPush(): Unit = {
|
||||
buffer.offer(grab(in))
|
||||
buffer.offerLast(grab(in))
|
||||
if (acknowledgementReceived) {
|
||||
dequeueAndSend()
|
||||
acknowledgementReceived = false
|
||||
|
|
|
|||
|
|
@ -524,7 +524,7 @@ private final case class SavedIslandData(
|
|||
if (Debug) println(s"PUSH: $matValue => $matValueStack")
|
||||
|
||||
case Concat(first, next) =>
|
||||
if (next ne EmptyTraversal) traversalStack.add(next)
|
||||
if (next ne EmptyTraversal) traversalStack.addLast(next)
|
||||
nextStep = first
|
||||
case Pop =>
|
||||
val popped = matValueStack.removeLast()
|
||||
|
|
|
|||
|
|
@ -793,7 +793,7 @@ import org.reactivestreams.Subscription
|
|||
else if (currentLimit == 0) {
|
||||
self ! Resume
|
||||
} else {
|
||||
shortCircuitBuffer.poll() match {
|
||||
shortCircuitBuffer.pollFirst() match {
|
||||
case b: BoundaryEvent => processEvent(b)
|
||||
case Resume => finishShellRegistration()
|
||||
case unexpected =>
|
||||
|
|
@ -842,7 +842,7 @@ import org.reactivestreams.Subscription
|
|||
override def postStop(): Unit = {
|
||||
if (shortCircuitBuffer ne null) {
|
||||
while (!shortCircuitBuffer.isEmpty) {
|
||||
shortCircuitBuffer.poll() match {
|
||||
shortCircuitBuffer.pollFirst() match {
|
||||
case b: BoundaryEvent =>
|
||||
// signal to telemetry that this event won't be processed
|
||||
b.cancel()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue