Use min of numberOfMessages and buffer size when unstash. (#27398)
This commit is contained in:
parent
7cc7d86971
commit
38be540467
2 changed files with 36 additions and 1 deletions
|
|
@ -158,6 +158,41 @@ class StashBufferSpec extends WordSpec with Matchers {
|
||||||
buffer.head should ===("m2")
|
buffer.head should ===("m2")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"unstash at most the number of messages in the buffer" in {
|
||||||
|
val buffer = StashBuffer[String](context, 10)
|
||||||
|
buffer.stash("m1")
|
||||||
|
buffer.stash("m2")
|
||||||
|
buffer.stash("m3")
|
||||||
|
buffer.stash("get")
|
||||||
|
|
||||||
|
val valueInbox = TestInbox[String]()
|
||||||
|
def behavior(state: String): Behavior[String] =
|
||||||
|
Behaviors.receive[String] { (_, message) =>
|
||||||
|
if (message == "get") {
|
||||||
|
valueInbox.ref ! state
|
||||||
|
Behaviors.same
|
||||||
|
} else if (message == "m2") {
|
||||||
|
buffer.stash("m4")
|
||||||
|
buffer.stash("get")
|
||||||
|
Behaviors.same
|
||||||
|
} else {
|
||||||
|
behavior(state + message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unstash will only process at most the number of messages in the buffer when
|
||||||
|
// the call is made, any newly added messages have to be processed by another
|
||||||
|
// unstash call.
|
||||||
|
val b2 = buffer.unstash(behavior(""), 20, identity)
|
||||||
|
valueInbox.expectMessage("m1m3")
|
||||||
|
buffer.size should ===(2)
|
||||||
|
buffer.head should ===("m4")
|
||||||
|
|
||||||
|
buffer.unstash(b2, 20, identity)
|
||||||
|
valueInbox.expectMessage("m1m3m4")
|
||||||
|
buffer.size should ===(0)
|
||||||
|
}
|
||||||
|
|
||||||
"fail quick on invalid start behavior" in {
|
"fail quick on invalid start behavior" in {
|
||||||
val stash = StashBuffer[String](context, 10)
|
val stash = StashBuffer[String](context, 10)
|
||||||
stash.stash("one")
|
stash.stash("one")
|
||||||
|
|
|
||||||
|
|
@ -126,7 +126,7 @@ import akka.util.{ unused, ConstantFun }
|
||||||
unstashed(ctx, next)
|
unstashed(ctx, next)
|
||||||
wrap(next.message)
|
wrap(next.message)
|
||||||
}
|
}
|
||||||
}.take(numberOfMessages)
|
}.take(math.min(numberOfMessages, size))
|
||||||
interpretUnstashedMessages(behavior, ctx, iter)
|
interpretUnstashedMessages(behavior, ctx, iter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue