diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala new file mode 100644 index 0000000000..b77f6650a5 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.actor.typed.scaladsl + +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class ImmutableStashBufferSpec extends WordSpec with Matchers { + + "A ImmutableStashBuffer" must { + + "answer empty correctly" in { + var buffer = ImmutableStashBuffer[String](10) + buffer.isEmpty should ===(true) + buffer.nonEmpty should ===(false) + buffer = buffer.stash("m1") + buffer.isEmpty should ===(false) + buffer.nonEmpty should ===(true) + } + + "append and drop" in { + var buffer = ImmutableStashBuffer[String](10) + buffer.size should ===(0) + buffer = buffer.stash("m1") + buffer.size should ===(1) + buffer = buffer.stash("m2") + buffer.size should ===(2) + val m1 = buffer.head + m1 should ===("m1") + buffer.size should ===(2) + buffer = buffer.dropHead() + buffer.size should ===(1) + m1 should ===("m1") + val m2 = buffer.head + m2 should ===("m2") + buffer = buffer.dropHead() + buffer.size should ===(0) + intercept[NoSuchElementException] { + buffer = buffer.dropHead() + } + intercept[NoSuchElementException] { + buffer.head + } + buffer.size should ===(0) + } + + "enforce capacity" in { + var buffer = ImmutableStashBuffer[String](3) + buffer = buffer.stash("m1") + buffer = buffer.stash("m2") + buffer = buffer.stash("m3") + intercept[StashOverflowException] { + buffer = buffer.stash("m4") + } + // it's actually a javadsl.StashOverflowException + intercept[akka.actor.typed.javadsl.StashOverflowException] { + buffer.stash("m4") + } + buffer.size should ===(3) + } + + "process elements in the right order" in { + var buffer = ImmutableStashBuffer[String](10) + buffer = buffer.stash("m1") + buffer = buffer.stash("m2") + buffer = buffer.stash("m3") + val sb1 = new StringBuilder() + buffer.foreach(sb1.append(_)) + sb1.toString() should ===("m1m2m3") + buffer = buffer.dropHead() + val sb2 = new StringBuilder() + buffer.foreach(sb2.append(_)) + sb2.toString() should ===("m2m3") + } + } + +} + diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala new file mode 100644 index 0000000000..c99f170956 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.actor.typed.scaladsl + +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class MutableStashBufferSpec extends WordSpec with Matchers { + + "A MutableStashBuffer" must { + + "answer empty correctly" in { + val buffer = MutableStashBuffer[String](10) + buffer.isEmpty should ===(true) + buffer.nonEmpty should ===(false) + buffer.stash("m1") + buffer.isEmpty should ===(false) + buffer.nonEmpty should ===(true) + } + + "append and drop" in { + val buffer = MutableStashBuffer[String](10) + buffer.size should ===(0) + buffer.stash("m1") + buffer.size should ===(1) + buffer.stash("m2") + buffer.size should ===(2) + val m1 = buffer.head + m1 should ===("m1") + buffer.size should ===(2) + buffer.dropHead() + buffer.size should ===(1) + m1 should ===("m1") + val m2 = buffer.head + m2 should ===("m2") + buffer.dropHead() + buffer.size should ===(0) + intercept[NoSuchElementException] { + buffer.dropHead() + } + intercept[NoSuchElementException] { + buffer.head + } + buffer.size should ===(0) + } + + "enforce capacity" in { + val buffer = MutableStashBuffer[String](3) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + intercept[StashOverflowException] { + buffer.stash("m4") + } + // it's actually a javadsl.StashOverflowException + intercept[akka.actor.typed.javadsl.StashOverflowException] { + buffer.stash("m4") + } + buffer.size should ===(3) + } + + "process elements in the right order" in { + val buffer = MutableStashBuffer[String](10) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + val sb1 = new StringBuilder() + buffer.foreach(sb1.append(_)) + sb1.toString() should ===("m1m2m3") + buffer.dropHead() + val sb2 = new StringBuilder() + buffer.foreach(sb2.append(_)) + sb2.toString() should ===("m2m3") + } + + } + +} + diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala new file mode 100644 index 0000000000..920c687f24 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala @@ -0,0 +1,235 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.actor.typed +package scaladsl + +import akka.event.LoggingAdapter +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe + +object StashSpec { + sealed trait Command + final case class Msg(s: String) extends Command + final case class Unstashed(cmd: Command) extends Command + + case object Stash extends Command + case object UnstashAll extends Command + case object Unstash extends Command + final case class GetProcessed(replyTo: ActorRef[Vector[String]]) extends Command + final case class GetStashSize(replyTo: ActorRef[Int]) extends Command + + // FIXME replace when we get the logging in place, #23326 + def log(ctx: ActorContext[_]): LoggingAdapter = ctx.system.log + + def active(processed: Vector[String]): Behavior[Command] = + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case msg: Msg ⇒ + active(processed :+ msg.s) + case GetProcessed(replyTo) ⇒ + replyTo ! processed + Behaviors.same + case Stash ⇒ + stashing(ImmutableStashBuffer(capacity = 10), processed) + case GetStashSize(replyTo) ⇒ + replyTo ! 0 + Behaviors.same + case UnstashAll ⇒ + Behaviors.unhandled + case Unstash ⇒ + Behaviors.unhandled + case u: Unstashed ⇒ + throw new IllegalStateException(s"Unexpected $u in active") + } + } + + def stashing(buffer: ImmutableStashBuffer[Command], processed: Vector[String]): Behavior[Command] = + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case msg: Msg ⇒ + stashing(buffer :+ msg, processed) + case g: GetProcessed ⇒ + stashing(buffer :+ g, processed) + case GetStashSize(replyTo) ⇒ + replyTo ! buffer.size + Behaviors.same + case UnstashAll ⇒ + buffer.unstashAll(ctx, active(processed)) + case Unstash ⇒ + log(ctx).debug(s"Unstash ${buffer.size}") + if (buffer.isEmpty) + active(processed) + else { + ctx.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + log(ctx).debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(ctx, unstashing(buffer.drop(numberOfMessages), processed), numberOfMessages, Unstashed) + } + case Stash ⇒ + Behaviors.unhandled + case u: Unstashed ⇒ + throw new IllegalStateException(s"Unexpected $u in stashing") + } + } + + def unstashing(buffer: ImmutableStashBuffer[Command], processed: Vector[String]): Behavior[Command] = + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case Unstashed(msg: Msg) ⇒ + log(ctx).debug(s"unstashed $msg") + unstashing(buffer, processed :+ msg.s) + case Unstashed(GetProcessed(replyTo)) ⇒ + log(ctx).debug(s"unstashed GetProcessed") + replyTo ! processed + Behaviors.same + case msg: Msg ⇒ + log(ctx).debug(s"got $msg in unstashing") + unstashing(buffer :+ msg, processed) + case get: GetProcessed ⇒ + log(ctx).debug(s"got GetProcessed in unstashing") + unstashing(buffer :+ get, processed) + case Stash ⇒ + stashing(buffer, processed) + case Unstash ⇒ + if (buffer.isEmpty) { + log(ctx).debug(s"unstashing done") + active(processed) + } else { + ctx.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + log(ctx).debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(ctx, unstashing(buffer.drop(numberOfMessages), processed), numberOfMessages, Unstashed) + } + case GetStashSize(replyTo) ⇒ + replyTo ! buffer.size + Behaviors.same + case UnstashAll ⇒ + Behaviors.unhandled + case u: Unstashed ⇒ + throw new IllegalStateException(s"Unexpected $u in unstashing") + } + } + + class MutableStash(ctx: ActorContext[Command]) extends Behaviors.MutableBehavior[Command] { + + private val buffer = MutableStashBuffer.apply[Command](capacity = 10) + private var stashing = false + private var processed = Vector.empty[String] + + override def onMessage(cmd: Command): Behavior[Command] = { + cmd match { + case msg: Msg ⇒ + if (stashing) + buffer.stash(msg) + else + processed :+= msg.s + this + case g @ GetProcessed(replyTo) ⇒ + if (stashing) + buffer.stash(g) + else + replyTo ! processed + this + case GetStashSize(replyTo) ⇒ + replyTo ! buffer.size + this + case Stash ⇒ + stashing = true + this + case UnstashAll ⇒ + stashing = false + buffer.unstashAll(ctx, this) + case Unstash ⇒ + if (buffer.isEmpty) { + stashing = false + this + } else { + ctx.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + log(ctx).debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(ctx, this, numberOfMessages, Unstashed) + } + case Unstashed(msg: Msg) ⇒ + log(ctx).debug(s"unstashed $msg") + processed :+= msg.s + this + case Unstashed(GetProcessed(replyTo)) ⇒ + log(ctx).debug(s"unstashed GetProcessed") + replyTo ! processed + Behaviors.same + case _: Unstashed ⇒ + Behaviors.unhandled + } + } + + } + +} + +class ImmutableStashSpec extends StashSpec { + import StashSpec._ + def testQualifier: String = "immutable behavior" + def behaviorUnderTest: Behavior[Command] = active(Vector.empty) +} + +class MutableStashSpec extends StashSpec { + import StashSpec._ + def testQualifier: String = "mutable behavior" + def behaviorUnderTest: Behavior[Command] = Behaviors.mutable(ctx ⇒ new MutableStash(ctx)) +} + +abstract class StashSpec extends TestKit with TypedAkkaSpecWithShutdown { + import StashSpec._ + + def testQualifier: String + def behaviorUnderTest: Behavior[Command] + + s"Stashing with $testQualifier" must { + + "support unstash all" in { + val actor = spawn(behaviorUnderTest) + val probe = TestProbe[Vector[String]]("probe") + val sizeProbe = TestProbe[Int]("sizeProbe") + + actor ! Msg("a") + actor ! Msg("b") + actor ! Msg("c") + + actor ! Stash + actor ! Msg("d") + actor ! Msg("e") + actor ! Msg("f") + actor ! GetStashSize(sizeProbe.ref) + sizeProbe.expectMsg(3) + + actor ! UnstashAll + actor ! GetProcessed(probe.ref) + probe.expectMsg(Vector("a", "b", "c", "d", "e", "f")) + } + + "support unstash a few at a time" in { + val actor = spawn(behaviorUnderTest) + val probe = TestProbe[Vector[String]]("probe") + val sizeProbe = TestProbe[Int]("sizeProbe") + + actor ! Msg("a") + actor ! Msg("b") + actor ! Msg("c") + + actor ! Stash + actor ! Msg("d") + actor ! Msg("e") + actor ! Msg("f") + actor ! GetStashSize(sizeProbe.ref) + sizeProbe.expectMsg(3) + + actor ! Unstash + actor ! Msg("g") // might arrive in the middle of the unstashing + actor ! GetProcessed(probe.ref) // this is also stashed until all unstashed + probe.expectMsg(Vector("a", "b", "c", "d", "e", "f", "g")) + } + + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 98e039a268..24ffafe866 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -313,4 +313,28 @@ object Behavior { undefer(possiblyDeferredResult, ctx) } + /** + * INTERNAL API + * + * Execute the behavior with the given messages (or signals). + * The returned [[Behavior]] from each processed message is used for the next message. + */ + @InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: ActorContext[T], messages: Iterator[T]): Behavior[T] = { + @tailrec def interpretOne(b: Behavior[T]): Behavior[T] = { + val b2 = Behavior.undefer(b, ctx) + if (!Behavior.isAlive(b2) || !messages.hasNext) b2 + else { + val nextB = messages.next() match { + case sig: Signal ⇒ + Behavior.interpretSignal(b2, ctx, sig) + case msg ⇒ + Behavior.interpretMessage(b2, ctx, msg) + } + interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive + } + } + + interpretOne(Behavior.undefer(behavior, ctx)) + } + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala new file mode 100644 index 0000000000..fc08b1534b --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.actor.typed.internal + +import java.util.function.Consumer +import java.util.function.{ Function ⇒ JFunction } + +import scala.annotation.tailrec + +import akka.actor.typed.scaladsl +import akka.actor.typed.javadsl +import akka.annotation.InternalApi +import akka.actor.typed.Behavior +import akka.actor.typed.ActorContext +import akka.actor.typed.Signal +import akka.util.ConstantFun + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ImmutableStashBufferImpl { + def apply[T](capacity: Int): ImmutableStashBufferImpl[T] = + new ImmutableStashBufferImpl(capacity, Vector.empty) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class ImmutableStashBufferImpl[T](val capacity: Int, buffer: Vector[T]) + extends javadsl.ImmutableStashBuffer[T] with scaladsl.ImmutableStashBuffer[T] { + + override def isEmpty: Boolean = buffer.isEmpty + + override def nonEmpty: Boolean = !isEmpty + + override def size: Int = buffer.size + + override def isFull: Boolean = size == capacity + + override def stash(message: T): ImmutableStashBufferImpl[T] = { + if (message == null) throw new NullPointerException + if (isFull) + throw new javadsl.StashOverflowException(s"Couldn't add [${message.getClass.getName}] " + + s"because stash with capacity [$capacity] is full") + + new ImmutableStashBufferImpl(capacity, buffer :+ message) + } + + override def dropHead(): ImmutableStashBufferImpl[T] = + if (buffer.nonEmpty) new ImmutableStashBufferImpl(capacity, buffer.tail) + else throw new NoSuchElementException("head of empty buffer") + + override def drop(numberOfMessages: Int): ImmutableStashBufferImpl[T] = + if (isEmpty) this + else new ImmutableStashBufferImpl(capacity, buffer.drop(numberOfMessages)) + + override def head: T = + if (buffer.nonEmpty) buffer.head + else throw new NoSuchElementException("head of empty buffer") + + override def foreach(f: T ⇒ Unit): Unit = + buffer.foreach(f) + + override def forEach(f: Consumer[T]): Unit = foreach(f.accept) + + override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = + unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) + + override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = + unstashAll(ctx.asScala, behavior) + + override def unstash(scaladslCtx: scaladsl.ActorContext[T], behavior: Behavior[T], + numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { + val ctx = scaladslCtx.asInstanceOf[ActorContext[T]] + val iter = buffer.iterator.take(numberOfMessages).map(wrap) + Behavior.interpretMessages[T](behavior, ctx, iter) + } + + override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T], + numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = + unstash(ctx.asScala, behavior, numberOfMessages, x ⇒ wrap.apply(x)) + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object MutableStashBufferImpl { + private final class Node[T](var next: Node[T], val message: T) { + def apply(f: T ⇒ Unit): Unit = f(message) + } + + def apply[T](capacity: Int): MutableStashBufferImpl[T] = + new MutableStashBufferImpl(capacity, null, null) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class MutableStashBufferImpl[T] private ( + val capacity: Int, + private var _head: MutableStashBufferImpl.Node[T], + private var _tail: MutableStashBufferImpl.Node[T]) + extends javadsl.MutableStashBuffer[T] with scaladsl.MutableStashBuffer[T] { + + import MutableStashBufferImpl.Node + + private var _size: Int = if (_head eq null) 0 else 1 + + override def isEmpty: Boolean = _head eq null + + override def nonEmpty: Boolean = !isEmpty + + override def size: Int = _size + + override def isFull: Boolean = _size == capacity + + override def stash(message: T): MutableStashBufferImpl[T] = { + if (message == null) throw new NullPointerException + if (isFull) + throw new javadsl.StashOverflowException(s"Couldn't add [${message.getClass.getName}] " + + s"because stash with capacity [$capacity] is full") + + val node = new Node(null, message) + if (isEmpty) { + _head = node + _tail = node + } else { + _tail.next = node + _tail = node + } + _size += 1 + this + } + + override def dropHead(): T = { + val message = head + _head = _head.next + _size -= 1 + if (isEmpty) + _tail = null + + message + } + + override def head: T = + if (nonEmpty) _head.message + else throw new NoSuchElementException("head of empty buffer") + + override def foreach(f: T ⇒ Unit): Unit = { + var node = _head + while (node ne null) { + node(f) + node = node.next + } + } + + override def forEach(f: Consumer[T]): Unit = foreach(f.accept) + + override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = + unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) + + override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = + unstashAll(ctx.asScala, behavior) + + override def unstash(scaladslCtx: scaladsl.ActorContext[T], behavior: Behavior[T], + numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { + val iter = new Iterator[T] { + override def hasNext: Boolean = MutableStashBufferImpl.this.nonEmpty + override def next(): T = MutableStashBufferImpl.this.dropHead() + }.take(numberOfMessages).map(wrap) + val ctx = scaladslCtx.asInstanceOf[ActorContext[T]] + Behavior.interpretMessages[T](behavior, ctx, iter) + } + + override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T], + numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = + unstash(ctx.asScala, behavior, numberOfMessages, x ⇒ wrap.apply(x)) + +} + diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala new file mode 100644 index 0000000000..364e5d1a4b --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.actor.typed.javadsl + +import java.util.function.Consumer +import java.util.function.{ Function ⇒ JFunction } + +import scala.annotation.tailrec + +import akka.annotation.DoNotInherit +import akka.actor.typed.Behavior +import akka.actor.typed.internal.ImmutableStashBufferImpl +import akka.actor.typed.internal.MutableStashBufferImpl +import akka.actor.typed.scaladsl + +object ImmutableStashBuffer { + /** + * Create an empty message buffer. + * + * @param capacity the buffer can hold at most this number of messages + * @return an empty message buffer + */ + def create[T](capacity: Int): ImmutableStashBuffer[T] = + ImmutableStashBufferImpl[T](capacity) + +} + +/** + * A thread safe immutable message buffer that can be used to buffer messages inside actors. + * + * The buffer can hold at most the given `capacity` number of messages. + */ +@DoNotInherit abstract class ImmutableStashBuffer[T] { + + /** + * Check if the message buffer is empty. + * + * @return if the buffer is empty + */ + def isEmpty: Boolean + + /** + * Check if the message buffer is not empty. + * + * @return if the buffer is not empty + */ + def nonEmpty: Boolean + + /** + * How many elements are in the message buffer. + * + * @return the number of elements in the message buffer + */ + def size: Int + + /** + * @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer + */ + def isFull: Boolean + + /** + * Add one element to the end of the message buffer. Note that this class is + * immutable so the returned instance contains the added message. + * + * @param message the message to buffer, must not be `null` + * @return this message buffer + * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + */ + def stash(message: T): ImmutableStashBuffer[T] + + /** + * Remove the first element of the message buffer. Note that this class is + * immutable so the head element is removed in the returned instance. + * + * @throws `NoSuchElementException` if the buffer is empty + */ + def dropHead(): ImmutableStashBuffer[T] + + /** + * Remove the first `numberOfMessages` of the message buffer. Note that this class is + * immutable so the elements are removed in the returned instance. + */ + def drop(numberOfMessages: Int): ImmutableStashBuffer[T] + + /** + * Return the first element of the message buffer. + * + * @return the first element or throws `NoSuchElementException` if the buffer is empty + * @throws `NoSuchElementException` if the buffer is empty + */ + def head: T + + /** + * Iterate over all elements of the buffer and apply a function to each element. + * + * @param f the function to apply to each element + */ + def forEach(f: Consumer[T]): Unit + + /** + * Process all stashed messages with the `behavior` and the returned + * [[Behavior]] from each processed message. + */ + def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] + + /** + * Process `numberOfMessages` of the stashed messages with the `behavior` + * and the returned [[Behavior]] from each processed message. + */ + def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] + +} + +object MutableStashBuffer { + + /** + * Create an empty message buffer. + * + * @param capacity the buffer can hold at most this number of messages + * @return an empty message buffer + */ + def create[T](capacity: Int): MutableStashBuffer[T] = + MutableStashBufferImpl[T](capacity) +} + +/** + * A non thread safe mutable message buffer that can be used to buffer messages inside actors. + * + * The buffer can hold at most the given `capacity` number of messages. + */ +@DoNotInherit abstract class MutableStashBuffer[T] { + + /** + * Check if the message buffer is empty. + * + * @return if the buffer is empty + */ + def isEmpty: Boolean + + /** + * Check if the message buffer is not empty. + * + * @return if the buffer is not empty + */ + def nonEmpty: Boolean + + /** + * How many elements are in the message buffer. + * + * @return the number of elements in the message buffer + */ + def size: Int + + /** + * @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer + */ + def isFull: Boolean + + /** + * Add one element to the end of the message buffer. + * + * [[StashOverflowException]] is thrown if the buffer [[MutableStashBuffer#isFull]]. + * + * @param message the message to buffer + * @return this message buffer + * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + */ + def stash(message: T): MutableStashBuffer[T] + + /** + * Return the first element of the message buffer and removes it. + * + * @return the first element or throws `NoSuchElementException` if the buffer is empty + * @throws `NoSuchElementException` if the buffer is empty + */ + def dropHead(): T + + /** + * Return the first element of the message buffer without removing it. + * + * @return the first element or throws `NoSuchElementException` if the buffer is empty + * @throws `NoSuchElementException` if the buffer is empty + */ + def head: T + + /** + * Iterate over all elements of the buffer and apply a function to each element. + * + * @param f the function to apply to each element + */ + def forEach(f: Consumer[T]): Unit + + /** + * Process all stashed messages with the `behavior` and the returned + * [[Behavior]] from each processed message. The `MutableStashBuffer` will be + * empty after processing all messages, unless an exception is thrown. + * If an exception is thrown by processing a message a proceeding messages + * and the message causing the exception have been removed from the + * `MutableStashBuffer`, but unprocessed messages remain. + */ + def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] + + /** + * Process `numberOfMessages` of the stashed messages with the `behavior` + * and the returned [[Behavior]] from each processed message. + * If an exception is thrown by processing a message a proceeding messages + * and the message causing the exception have been removed from the + * `MutableStashBuffer`, but unprocessed messages remain. + */ + def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] + +} + +/** + * Is thrown when the size of the stash exceeds the capacity of the stash buffer. + */ +class StashOverflowException(message: String) extends scaladsl.StashOverflowException(message) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala new file mode 100644 index 0000000000..2275cf728e --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala @@ -0,0 +1,224 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.actor.typed.scaladsl + +import java.util.function.Consumer +import java.util.function.{ Function ⇒ JFunction } + +import scala.annotation.tailrec + +import akka.annotation.DoNotInherit +import akka.actor.typed.Behavior +import akka.actor.typed.internal.ImmutableStashBufferImpl +import akka.actor.typed.internal.MutableStashBufferImpl + +object ImmutableStashBuffer { + /** + * Create an empty message buffer. + * + * @param capacity the buffer can hold at most this number of messages + * @return an empty message buffer + */ + def apply[T](capacity: Int): ImmutableStashBuffer[T] = + ImmutableStashBufferImpl[T](capacity) + +} + +/** + * A thread safe immutable message buffer that can be used to buffer messages inside actors. + * + * The buffer can hold at most the given `capacity` number of messages. + */ +@DoNotInherit trait ImmutableStashBuffer[T] { + + /** + * Check if the message buffer is empty. + * + * @return if the buffer is empty + */ + def isEmpty: Boolean + + /** + * Check if the message buffer is not empty. + * + * @return if the buffer is not empty + */ + def nonEmpty: Boolean + + /** + * How many elements are in the message buffer. + * + * @return the number of elements in the message buffer + */ + def size: Int + + /** + * @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer + */ + def isFull: Boolean + + /** + * Add one element to the end of the message buffer. Note that this class is + * immutable so the returned instance contains the added message. + * + * @param message the message to buffer, must not be `null` + * @return this message buffer + * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + */ + def stash(message: T): ImmutableStashBuffer[T] + + /** + * Add one element to the end of the message buffer. Note that this class is + * immutable so the returned instance contains the added message. + * + * @param message the message to buffer, must not be `null` + * @return this message buffer + * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + */ + def :+(message: T): ImmutableStashBuffer[T] = stash(message) + + /** + * Remove the first element of the message buffer. Note that this class is + * immutable so the head element is removed in the returned instance. + * + * @throws `NoSuchElementException` if the buffer is empty + */ + def dropHead(): ImmutableStashBuffer[T] + + /** + * Remove the first `numberOfMessages` of the message buffer. Note that this class is + * immutable so the elements are removed in the returned instance. + */ + def drop(numberOfMessages: Int): ImmutableStashBuffer[T] + + /** + * Return the first element of the message buffer. + * + * @return the first element or throws `NoSuchElementException` if the buffer is empty + * @throws `NoSuchElementException` if the buffer is empty + */ + def head: T + + /** + * Iterate over all elements of the buffer and apply a function to each element. + * + * @param f the function to apply to each element + */ + def foreach(f: T ⇒ Unit): Unit + + /** + * Process all stashed messages with the `behavior` and the returned + * [[Behavior]] from each processed message. + */ + def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] + + /** + * Process `numberOfMessages` of the stashed messages with the `behavior` + * and the returned [[Behavior]] from each processed message. + */ + def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] + +} + +object MutableStashBuffer { + + /** + * Create an empty message buffer. + * + * @param capacity the buffer can hold at most this number of messages + * @return an empty message buffer + */ + def apply[T](capacity: Int): MutableStashBuffer[T] = + MutableStashBufferImpl[T](capacity) +} + +/** + * A non thread safe mutable message buffer that can be used to buffer messages inside actors. + * + * The buffer can hold at most the given `capacity` number of messages. + */ +@DoNotInherit trait MutableStashBuffer[T] { + /** + * Check if the message buffer is empty. + * + * @return if the buffer is empty + */ + def isEmpty: Boolean + + /** + * Check if the message buffer is not empty. + * + * @return if the buffer is not empty + */ + def nonEmpty: Boolean + + /** + * How many elements are in the message buffer. + * + * @return the number of elements in the message buffer + */ + def size: Int + + /** + * @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer + */ + def isFull: Boolean + + /** + * Add one element to the end of the message buffer. + * + * @param message the message to buffer + * @return this message buffer + * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + */ + def stash(message: T): MutableStashBuffer[T] + + /** + * Return the first element of the message buffer and removes it. + * + * @return the first element or throws `NoSuchElementException` if the buffer is empty + * @throws `NoSuchElementException` if the buffer is empty + */ + def dropHead(): T + + /** + * Return the first element of the message buffer without removing it. + * + * @return the first element or throws `NoSuchElementException` if the buffer is empty + * @throws `NoSuchElementException` if the buffer is empty + */ + def head: T + + /** + * Iterate over all elements of the buffer and apply a function to each element. + * + * @param f the function to apply to each element + */ + def foreach(f: T ⇒ Unit): Unit + + /** + * Process all stashed messages with the `behavior` and the returned + * [[Behavior]] from each processed message. The `MutableStashBuffer` will be + * empty after processing all messages, unless an exception is thrown. + * If an exception is thrown by processing a message a proceeding messages + * and the message causing the exception have been removed from the + * `MutableStashBuffer`, but unprocessed messages remain. + */ + def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] + + /** + * Process `numberOfMessages` of the stashed messages with the `behavior` + * and the returned [[Behavior]] from each processed message. + * If an exception is thrown by processing a message a proceeding messages + * and the message causing the exception have been removed from the + * `MutableStashBuffer`, but unprocessed messages remain. + */ + def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] + +} + +/** + * Is thrown when the size of the stash exceeds the capacity of the stash buffer. + */ +class StashOverflowException(message: String) extends RuntimeException(message)