Stashing in Typed, #22275 (#24349)

* Utility for stashing, #22275

* The main reason for providing these utilities and promote
  a standardized way of doing the buffering is that monitoring
  instrumentation can be added to these classes, which is not
  possible if we just say "buffer in some collection".

* unstash a few at a time, became rather complicated

* separate api and impl, and more tests
This commit is contained in:
Patrik Nordwall 2018-01-22 16:43:20 +01:00 committed by Konrad `ktoso` Malawski
parent fef57ea034
commit 155e765c9d
7 changed files with 1042 additions and 0 deletions

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import org.scalatest.Matchers
import org.scalatest.WordSpec
class ImmutableStashBufferSpec extends WordSpec with Matchers {
"A ImmutableStashBuffer" must {
"answer empty correctly" in {
var buffer = ImmutableStashBuffer[String](10)
buffer.isEmpty should ===(true)
buffer.nonEmpty should ===(false)
buffer = buffer.stash("m1")
buffer.isEmpty should ===(false)
buffer.nonEmpty should ===(true)
}
"append and drop" in {
var buffer = ImmutableStashBuffer[String](10)
buffer.size should ===(0)
buffer = buffer.stash("m1")
buffer.size should ===(1)
buffer = buffer.stash("m2")
buffer.size should ===(2)
val m1 = buffer.head
m1 should ===("m1")
buffer.size should ===(2)
buffer = buffer.dropHead()
buffer.size should ===(1)
m1 should ===("m1")
val m2 = buffer.head
m2 should ===("m2")
buffer = buffer.dropHead()
buffer.size should ===(0)
intercept[NoSuchElementException] {
buffer = buffer.dropHead()
}
intercept[NoSuchElementException] {
buffer.head
}
buffer.size should ===(0)
}
"enforce capacity" in {
var buffer = ImmutableStashBuffer[String](3)
buffer = buffer.stash("m1")
buffer = buffer.stash("m2")
buffer = buffer.stash("m3")
intercept[StashOverflowException] {
buffer = buffer.stash("m4")
}
// it's actually a javadsl.StashOverflowException
intercept[akka.actor.typed.javadsl.StashOverflowException] {
buffer.stash("m4")
}
buffer.size should ===(3)
}
"process elements in the right order" in {
var buffer = ImmutableStashBuffer[String](10)
buffer = buffer.stash("m1")
buffer = buffer.stash("m2")
buffer = buffer.stash("m3")
val sb1 = new StringBuilder()
buffer.foreach(sb1.append(_))
sb1.toString() should ===("m1m2m3")
buffer = buffer.dropHead()
val sb2 = new StringBuilder()
buffer.foreach(sb2.append(_))
sb2.toString() should ===("m2m3")
}
}
}

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import org.scalatest.Matchers
import org.scalatest.WordSpec
class MutableStashBufferSpec extends WordSpec with Matchers {
"A MutableStashBuffer" must {
"answer empty correctly" in {
val buffer = MutableStashBuffer[String](10)
buffer.isEmpty should ===(true)
buffer.nonEmpty should ===(false)
buffer.stash("m1")
buffer.isEmpty should ===(false)
buffer.nonEmpty should ===(true)
}
"append and drop" in {
val buffer = MutableStashBuffer[String](10)
buffer.size should ===(0)
buffer.stash("m1")
buffer.size should ===(1)
buffer.stash("m2")
buffer.size should ===(2)
val m1 = buffer.head
m1 should ===("m1")
buffer.size should ===(2)
buffer.dropHead()
buffer.size should ===(1)
m1 should ===("m1")
val m2 = buffer.head
m2 should ===("m2")
buffer.dropHead()
buffer.size should ===(0)
intercept[NoSuchElementException] {
buffer.dropHead()
}
intercept[NoSuchElementException] {
buffer.head
}
buffer.size should ===(0)
}
"enforce capacity" in {
val buffer = MutableStashBuffer[String](3)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
intercept[StashOverflowException] {
buffer.stash("m4")
}
// it's actually a javadsl.StashOverflowException
intercept[akka.actor.typed.javadsl.StashOverflowException] {
buffer.stash("m4")
}
buffer.size should ===(3)
}
"process elements in the right order" in {
val buffer = MutableStashBuffer[String](10)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
val sb1 = new StringBuilder()
buffer.foreach(sb1.append(_))
sb1.toString() should ===("m1m2m3")
buffer.dropHead()
val sb2 = new StringBuilder()
buffer.foreach(sb2.append(_))
sb2.toString() should ===("m2m3")
}
}
}

View file

@ -0,0 +1,235 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
package scaladsl
import akka.event.LoggingAdapter
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
object StashSpec {
sealed trait Command
final case class Msg(s: String) extends Command
final case class Unstashed(cmd: Command) extends Command
case object Stash extends Command
case object UnstashAll extends Command
case object Unstash extends Command
final case class GetProcessed(replyTo: ActorRef[Vector[String]]) extends Command
final case class GetStashSize(replyTo: ActorRef[Int]) extends Command
// FIXME replace when we get the logging in place, #23326
def log(ctx: ActorContext[_]): LoggingAdapter = ctx.system.log
def active(processed: Vector[String]): Behavior[Command] =
Behaviors.immutable { (ctx, cmd)
cmd match {
case msg: Msg
active(processed :+ msg.s)
case GetProcessed(replyTo)
replyTo ! processed
Behaviors.same
case Stash
stashing(ImmutableStashBuffer(capacity = 10), processed)
case GetStashSize(replyTo)
replyTo ! 0
Behaviors.same
case UnstashAll
Behaviors.unhandled
case Unstash
Behaviors.unhandled
case u: Unstashed
throw new IllegalStateException(s"Unexpected $u in active")
}
}
def stashing(buffer: ImmutableStashBuffer[Command], processed: Vector[String]): Behavior[Command] =
Behaviors.immutable { (ctx, cmd)
cmd match {
case msg: Msg
stashing(buffer :+ msg, processed)
case g: GetProcessed
stashing(buffer :+ g, processed)
case GetStashSize(replyTo)
replyTo ! buffer.size
Behaviors.same
case UnstashAll
buffer.unstashAll(ctx, active(processed))
case Unstash
log(ctx).debug(s"Unstash ${buffer.size}")
if (buffer.isEmpty)
active(processed)
else {
ctx.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
log(ctx).debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(ctx, unstashing(buffer.drop(numberOfMessages), processed), numberOfMessages, Unstashed)
}
case Stash
Behaviors.unhandled
case u: Unstashed
throw new IllegalStateException(s"Unexpected $u in stashing")
}
}
def unstashing(buffer: ImmutableStashBuffer[Command], processed: Vector[String]): Behavior[Command] =
Behaviors.immutable { (ctx, cmd)
cmd match {
case Unstashed(msg: Msg)
log(ctx).debug(s"unstashed $msg")
unstashing(buffer, processed :+ msg.s)
case Unstashed(GetProcessed(replyTo))
log(ctx).debug(s"unstashed GetProcessed")
replyTo ! processed
Behaviors.same
case msg: Msg
log(ctx).debug(s"got $msg in unstashing")
unstashing(buffer :+ msg, processed)
case get: GetProcessed
log(ctx).debug(s"got GetProcessed in unstashing")
unstashing(buffer :+ get, processed)
case Stash
stashing(buffer, processed)
case Unstash
if (buffer.isEmpty) {
log(ctx).debug(s"unstashing done")
active(processed)
} else {
ctx.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
log(ctx).debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(ctx, unstashing(buffer.drop(numberOfMessages), processed), numberOfMessages, Unstashed)
}
case GetStashSize(replyTo)
replyTo ! buffer.size
Behaviors.same
case UnstashAll
Behaviors.unhandled
case u: Unstashed
throw new IllegalStateException(s"Unexpected $u in unstashing")
}
}
class MutableStash(ctx: ActorContext[Command]) extends Behaviors.MutableBehavior[Command] {
private val buffer = MutableStashBuffer.apply[Command](capacity = 10)
private var stashing = false
private var processed = Vector.empty[String]
override def onMessage(cmd: Command): Behavior[Command] = {
cmd match {
case msg: Msg
if (stashing)
buffer.stash(msg)
else
processed :+= msg.s
this
case g @ GetProcessed(replyTo)
if (stashing)
buffer.stash(g)
else
replyTo ! processed
this
case GetStashSize(replyTo)
replyTo ! buffer.size
this
case Stash
stashing = true
this
case UnstashAll
stashing = false
buffer.unstashAll(ctx, this)
case Unstash
if (buffer.isEmpty) {
stashing = false
this
} else {
ctx.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
log(ctx).debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(ctx, this, numberOfMessages, Unstashed)
}
case Unstashed(msg: Msg)
log(ctx).debug(s"unstashed $msg")
processed :+= msg.s
this
case Unstashed(GetProcessed(replyTo))
log(ctx).debug(s"unstashed GetProcessed")
replyTo ! processed
Behaviors.same
case _: Unstashed
Behaviors.unhandled
}
}
}
}
class ImmutableStashSpec extends StashSpec {
import StashSpec._
def testQualifier: String = "immutable behavior"
def behaviorUnderTest: Behavior[Command] = active(Vector.empty)
}
class MutableStashSpec extends StashSpec {
import StashSpec._
def testQualifier: String = "mutable behavior"
def behaviorUnderTest: Behavior[Command] = Behaviors.mutable(ctx new MutableStash(ctx))
}
abstract class StashSpec extends TestKit with TypedAkkaSpecWithShutdown {
import StashSpec._
def testQualifier: String
def behaviorUnderTest: Behavior[Command]
s"Stashing with $testQualifier" must {
"support unstash all" in {
val actor = spawn(behaviorUnderTest)
val probe = TestProbe[Vector[String]]("probe")
val sizeProbe = TestProbe[Int]("sizeProbe")
actor ! Msg("a")
actor ! Msg("b")
actor ! Msg("c")
actor ! Stash
actor ! Msg("d")
actor ! Msg("e")
actor ! Msg("f")
actor ! GetStashSize(sizeProbe.ref)
sizeProbe.expectMsg(3)
actor ! UnstashAll
actor ! GetProcessed(probe.ref)
probe.expectMsg(Vector("a", "b", "c", "d", "e", "f"))
}
"support unstash a few at a time" in {
val actor = spawn(behaviorUnderTest)
val probe = TestProbe[Vector[String]]("probe")
val sizeProbe = TestProbe[Int]("sizeProbe")
actor ! Msg("a")
actor ! Msg("b")
actor ! Msg("c")
actor ! Stash
actor ! Msg("d")
actor ! Msg("e")
actor ! Msg("f")
actor ! GetStashSize(sizeProbe.ref)
sizeProbe.expectMsg(3)
actor ! Unstash
actor ! Msg("g") // might arrive in the middle of the unstashing
actor ! GetProcessed(probe.ref) // this is also stashed until all unstashed
probe.expectMsg(Vector("a", "b", "c", "d", "e", "f", "g"))
}
}
}

View file

@ -313,4 +313,28 @@ object Behavior {
undefer(possiblyDeferredResult, ctx)
}
/**
* INTERNAL API
*
* Execute the behavior with the given messages (or signals).
* The returned [[Behavior]] from each processed message is used for the next message.
*/
@InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: ActorContext[T], messages: Iterator[T]): Behavior[T] = {
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
val b2 = Behavior.undefer(b, ctx)
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
else {
val nextB = messages.next() match {
case sig: Signal
Behavior.interpretSignal(b2, ctx, sig)
case msg
Behavior.interpretMessage(b2, ctx, msg)
}
interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive
}
}
interpretOne(Behavior.undefer(behavior, ctx))
}
}

View file

@ -0,0 +1,182 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.internal
import java.util.function.Consumer
import java.util.function.{ Function JFunction }
import scala.annotation.tailrec
import akka.actor.typed.scaladsl
import akka.actor.typed.javadsl
import akka.annotation.InternalApi
import akka.actor.typed.Behavior
import akka.actor.typed.ActorContext
import akka.actor.typed.Signal
import akka.util.ConstantFun
/**
* INTERNAL API
*/
@InternalApi private[akka] object ImmutableStashBufferImpl {
def apply[T](capacity: Int): ImmutableStashBufferImpl[T] =
new ImmutableStashBufferImpl(capacity, Vector.empty)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class ImmutableStashBufferImpl[T](val capacity: Int, buffer: Vector[T])
extends javadsl.ImmutableStashBuffer[T] with scaladsl.ImmutableStashBuffer[T] {
override def isEmpty: Boolean = buffer.isEmpty
override def nonEmpty: Boolean = !isEmpty
override def size: Int = buffer.size
override def isFull: Boolean = size == capacity
override def stash(message: T): ImmutableStashBufferImpl[T] = {
if (message == null) throw new NullPointerException
if (isFull)
throw new javadsl.StashOverflowException(s"Couldn't add [${message.getClass.getName}] " +
s"because stash with capacity [$capacity] is full")
new ImmutableStashBufferImpl(capacity, buffer :+ message)
}
override def dropHead(): ImmutableStashBufferImpl[T] =
if (buffer.nonEmpty) new ImmutableStashBufferImpl(capacity, buffer.tail)
else throw new NoSuchElementException("head of empty buffer")
override def drop(numberOfMessages: Int): ImmutableStashBufferImpl[T] =
if (isEmpty) this
else new ImmutableStashBufferImpl(capacity, buffer.drop(numberOfMessages))
override def head: T =
if (buffer.nonEmpty) buffer.head
else throw new NoSuchElementException("head of empty buffer")
override def foreach(f: T Unit): Unit =
buffer.foreach(f)
override def forEach(f: Consumer[T]): Unit = foreach(f.accept)
override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T])
override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstashAll(ctx.asScala, behavior)
override def unstash(scaladslCtx: scaladsl.ActorContext[T], behavior: Behavior[T],
numberOfMessages: Int, wrap: T T): Behavior[T] = {
val ctx = scaladslCtx.asInstanceOf[ActorContext[T]]
val iter = buffer.iterator.take(numberOfMessages).map(wrap)
Behavior.interpretMessages[T](behavior, ctx, iter)
}
override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T],
numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] =
unstash(ctx.asScala, behavior, numberOfMessages, x wrap.apply(x))
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object MutableStashBufferImpl {
private final class Node[T](var next: Node[T], val message: T) {
def apply(f: T Unit): Unit = f(message)
}
def apply[T](capacity: Int): MutableStashBufferImpl[T] =
new MutableStashBufferImpl(capacity, null, null)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class MutableStashBufferImpl[T] private (
val capacity: Int,
private var _head: MutableStashBufferImpl.Node[T],
private var _tail: MutableStashBufferImpl.Node[T])
extends javadsl.MutableStashBuffer[T] with scaladsl.MutableStashBuffer[T] {
import MutableStashBufferImpl.Node
private var _size: Int = if (_head eq null) 0 else 1
override def isEmpty: Boolean = _head eq null
override def nonEmpty: Boolean = !isEmpty
override def size: Int = _size
override def isFull: Boolean = _size == capacity
override def stash(message: T): MutableStashBufferImpl[T] = {
if (message == null) throw new NullPointerException
if (isFull)
throw new javadsl.StashOverflowException(s"Couldn't add [${message.getClass.getName}] " +
s"because stash with capacity [$capacity] is full")
val node = new Node(null, message)
if (isEmpty) {
_head = node
_tail = node
} else {
_tail.next = node
_tail = node
}
_size += 1
this
}
override def dropHead(): T = {
val message = head
_head = _head.next
_size -= 1
if (isEmpty)
_tail = null
message
}
override def head: T =
if (nonEmpty) _head.message
else throw new NoSuchElementException("head of empty buffer")
override def foreach(f: T Unit): Unit = {
var node = _head
while (node ne null) {
node(f)
node = node.next
}
}
override def forEach(f: Consumer[T]): Unit = foreach(f.accept)
override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T])
override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstashAll(ctx.asScala, behavior)
override def unstash(scaladslCtx: scaladsl.ActorContext[T], behavior: Behavior[T],
numberOfMessages: Int, wrap: T T): Behavior[T] = {
val iter = new Iterator[T] {
override def hasNext: Boolean = MutableStashBufferImpl.this.nonEmpty
override def next(): T = MutableStashBufferImpl.this.dropHead()
}.take(numberOfMessages).map(wrap)
val ctx = scaladslCtx.asInstanceOf[ActorContext[T]]
Behavior.interpretMessages[T](behavior, ctx, iter)
}
override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T],
numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] =
unstash(ctx.asScala, behavior, numberOfMessages, x wrap.apply(x))
}

View file

@ -0,0 +1,218 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.javadsl
import java.util.function.Consumer
import java.util.function.{ Function JFunction }
import scala.annotation.tailrec
import akka.annotation.DoNotInherit
import akka.actor.typed.Behavior
import akka.actor.typed.internal.ImmutableStashBufferImpl
import akka.actor.typed.internal.MutableStashBufferImpl
import akka.actor.typed.scaladsl
object ImmutableStashBuffer {
/**
* Create an empty message buffer.
*
* @param capacity the buffer can hold at most this number of messages
* @return an empty message buffer
*/
def create[T](capacity: Int): ImmutableStashBuffer[T] =
ImmutableStashBufferImpl[T](capacity)
}
/**
* A thread safe immutable message buffer that can be used to buffer messages inside actors.
*
* The buffer can hold at most the given `capacity` number of messages.
*/
@DoNotInherit abstract class ImmutableStashBuffer[T] {
/**
* Check if the message buffer is empty.
*
* @return if the buffer is empty
*/
def isEmpty: Boolean
/**
* Check if the message buffer is not empty.
*
* @return if the buffer is not empty
*/
def nonEmpty: Boolean
/**
* How many elements are in the message buffer.
*
* @return the number of elements in the message buffer
*/
def size: Int
/**
* @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer
*/
def isFull: Boolean
/**
* Add one element to the end of the message buffer. Note that this class is
* immutable so the returned instance contains the added message.
*
* @param message the message to buffer, must not be `null`
* @return this message buffer
* @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]].
*/
def stash(message: T): ImmutableStashBuffer[T]
/**
* Remove the first element of the message buffer. Note that this class is
* immutable so the head element is removed in the returned instance.
*
* @throws `NoSuchElementException` if the buffer is empty
*/
def dropHead(): ImmutableStashBuffer[T]
/**
* Remove the first `numberOfMessages` of the message buffer. Note that this class is
* immutable so the elements are removed in the returned instance.
*/
def drop(numberOfMessages: Int): ImmutableStashBuffer[T]
/**
* Return the first element of the message buffer.
*
* @return the first element or throws `NoSuchElementException` if the buffer is empty
* @throws `NoSuchElementException` if the buffer is empty
*/
def head: T
/**
* Iterate over all elements of the buffer and apply a function to each element.
*
* @param f the function to apply to each element
*/
def forEach(f: Consumer[T]): Unit
/**
* Process all stashed messages with the `behavior` and the returned
* [[Behavior]] from each processed message.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
/**
* Process `numberOfMessages` of the stashed messages with the `behavior`
* and the returned [[Behavior]] from each processed message.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T]
}
object MutableStashBuffer {
/**
* Create an empty message buffer.
*
* @param capacity the buffer can hold at most this number of messages
* @return an empty message buffer
*/
def create[T](capacity: Int): MutableStashBuffer[T] =
MutableStashBufferImpl[T](capacity)
}
/**
* A non thread safe mutable message buffer that can be used to buffer messages inside actors.
*
* The buffer can hold at most the given `capacity` number of messages.
*/
@DoNotInherit abstract class MutableStashBuffer[T] {
/**
* Check if the message buffer is empty.
*
* @return if the buffer is empty
*/
def isEmpty: Boolean
/**
* Check if the message buffer is not empty.
*
* @return if the buffer is not empty
*/
def nonEmpty: Boolean
/**
* How many elements are in the message buffer.
*
* @return the number of elements in the message buffer
*/
def size: Int
/**
* @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer
*/
def isFull: Boolean
/**
* Add one element to the end of the message buffer.
*
* [[StashOverflowException]] is thrown if the buffer [[MutableStashBuffer#isFull]].
*
* @param message the message to buffer
* @return this message buffer
* @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]].
*/
def stash(message: T): MutableStashBuffer[T]
/**
* Return the first element of the message buffer and removes it.
*
* @return the first element or throws `NoSuchElementException` if the buffer is empty
* @throws `NoSuchElementException` if the buffer is empty
*/
def dropHead(): T
/**
* Return the first element of the message buffer without removing it.
*
* @return the first element or throws `NoSuchElementException` if the buffer is empty
* @throws `NoSuchElementException` if the buffer is empty
*/
def head: T
/**
* Iterate over all elements of the buffer and apply a function to each element.
*
* @param f the function to apply to each element
*/
def forEach(f: Consumer[T]): Unit
/**
* Process all stashed messages with the `behavior` and the returned
* [[Behavior]] from each processed message. The `MutableStashBuffer` will be
* empty after processing all messages, unless an exception is thrown.
* If an exception is thrown by processing a message a proceeding messages
* and the message causing the exception have been removed from the
* `MutableStashBuffer`, but unprocessed messages remain.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
/**
* Process `numberOfMessages` of the stashed messages with the `behavior`
* and the returned [[Behavior]] from each processed message.
* If an exception is thrown by processing a message a proceeding messages
* and the message causing the exception have been removed from the
* `MutableStashBuffer`, but unprocessed messages remain.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T]
}
/**
* Is thrown when the size of the stash exceeds the capacity of the stash buffer.
*/
class StashOverflowException(message: String) extends scaladsl.StashOverflowException(message)

View file

@ -0,0 +1,224 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import java.util.function.Consumer
import java.util.function.{ Function JFunction }
import scala.annotation.tailrec
import akka.annotation.DoNotInherit
import akka.actor.typed.Behavior
import akka.actor.typed.internal.ImmutableStashBufferImpl
import akka.actor.typed.internal.MutableStashBufferImpl
object ImmutableStashBuffer {
/**
* Create an empty message buffer.
*
* @param capacity the buffer can hold at most this number of messages
* @return an empty message buffer
*/
def apply[T](capacity: Int): ImmutableStashBuffer[T] =
ImmutableStashBufferImpl[T](capacity)
}
/**
* A thread safe immutable message buffer that can be used to buffer messages inside actors.
*
* The buffer can hold at most the given `capacity` number of messages.
*/
@DoNotInherit trait ImmutableStashBuffer[T] {
/**
* Check if the message buffer is empty.
*
* @return if the buffer is empty
*/
def isEmpty: Boolean
/**
* Check if the message buffer is not empty.
*
* @return if the buffer is not empty
*/
def nonEmpty: Boolean
/**
* How many elements are in the message buffer.
*
* @return the number of elements in the message buffer
*/
def size: Int
/**
* @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer
*/
def isFull: Boolean
/**
* Add one element to the end of the message buffer. Note that this class is
* immutable so the returned instance contains the added message.
*
* @param message the message to buffer, must not be `null`
* @return this message buffer
* @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]].
*/
def stash(message: T): ImmutableStashBuffer[T]
/**
* Add one element to the end of the message buffer. Note that this class is
* immutable so the returned instance contains the added message.
*
* @param message the message to buffer, must not be `null`
* @return this message buffer
* @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]].
*/
def :+(message: T): ImmutableStashBuffer[T] = stash(message)
/**
* Remove the first element of the message buffer. Note that this class is
* immutable so the head element is removed in the returned instance.
*
* @throws `NoSuchElementException` if the buffer is empty
*/
def dropHead(): ImmutableStashBuffer[T]
/**
* Remove the first `numberOfMessages` of the message buffer. Note that this class is
* immutable so the elements are removed in the returned instance.
*/
def drop(numberOfMessages: Int): ImmutableStashBuffer[T]
/**
* Return the first element of the message buffer.
*
* @return the first element or throws `NoSuchElementException` if the buffer is empty
* @throws `NoSuchElementException` if the buffer is empty
*/
def head: T
/**
* Iterate over all elements of the buffer and apply a function to each element.
*
* @param f the function to apply to each element
*/
def foreach(f: T Unit): Unit
/**
* Process all stashed messages with the `behavior` and the returned
* [[Behavior]] from each processed message.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
/**
* Process `numberOfMessages` of the stashed messages with the `behavior`
* and the returned [[Behavior]] from each processed message.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T T): Behavior[T]
}
object MutableStashBuffer {
/**
* Create an empty message buffer.
*
* @param capacity the buffer can hold at most this number of messages
* @return an empty message buffer
*/
def apply[T](capacity: Int): MutableStashBuffer[T] =
MutableStashBufferImpl[T](capacity)
}
/**
* A non thread safe mutable message buffer that can be used to buffer messages inside actors.
*
* The buffer can hold at most the given `capacity` number of messages.
*/
@DoNotInherit trait MutableStashBuffer[T] {
/**
* Check if the message buffer is empty.
*
* @return if the buffer is empty
*/
def isEmpty: Boolean
/**
* Check if the message buffer is not empty.
*
* @return if the buffer is not empty
*/
def nonEmpty: Boolean
/**
* How many elements are in the message buffer.
*
* @return the number of elements in the message buffer
*/
def size: Int
/**
* @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer
*/
def isFull: Boolean
/**
* Add one element to the end of the message buffer.
*
* @param message the message to buffer
* @return this message buffer
* @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]].
*/
def stash(message: T): MutableStashBuffer[T]
/**
* Return the first element of the message buffer and removes it.
*
* @return the first element or throws `NoSuchElementException` if the buffer is empty
* @throws `NoSuchElementException` if the buffer is empty
*/
def dropHead(): T
/**
* Return the first element of the message buffer without removing it.
*
* @return the first element or throws `NoSuchElementException` if the buffer is empty
* @throws `NoSuchElementException` if the buffer is empty
*/
def head: T
/**
* Iterate over all elements of the buffer and apply a function to each element.
*
* @param f the function to apply to each element
*/
def foreach(f: T Unit): Unit
/**
* Process all stashed messages with the `behavior` and the returned
* [[Behavior]] from each processed message. The `MutableStashBuffer` will be
* empty after processing all messages, unless an exception is thrown.
* If an exception is thrown by processing a message a proceeding messages
* and the message causing the exception have been removed from the
* `MutableStashBuffer`, but unprocessed messages remain.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
/**
* Process `numberOfMessages` of the stashed messages with the `behavior`
* and the returned [[Behavior]] from each processed message.
* If an exception is thrown by processing a message a proceeding messages
* and the message causing the exception have been removed from the
* `MutableStashBuffer`, but unprocessed messages remain.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T T): Behavior[T]
}
/**
* Is thrown when the size of the stash exceeds the capacity of the stash buffer.
*/
class StashOverflowException(message: String) extends RuntimeException(message)