From 8b38789d9d663c53eac7010da6ab380532712567 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 24 Jan 2018 15:38:20 +0100 Subject: [PATCH] document and cleanup stashing, #22275 * remove ImmutableStashBuffer * minor doc edits * java example * some more tests, and some more scaladoc details * rename head/tail to first/last --- .../java/jdocs/akka/typed/StashDocTest.java | 203 ++++++++++++++++++ .../scaladsl/ImmutableStashBufferSpec.scala | 79 ------- .../scaladsl/MutableStashBufferSpec.scala | 80 ------- .../typed/scaladsl/StashBufferSpec.scala | 162 ++++++++++++++ .../akka/actor/typed/scaladsl/StashSpec.scala | 182 ++++++++-------- .../scala/docs/akka/typed/StashDocSpec.scala | 132 ++++++++++++ .../scala/akka/actor/typed/ActorContext.scala | 2 + .../scala/akka/actor/typed/ActorSystem.scala | 2 + .../scala/akka/actor/typed/Behavior.scala | 2 + .../scala/akka/actor/typed/Extensions.scala | 2 +- .../main/scala/akka/actor/typed/Props.scala | 7 +- .../typed/internal/StashBufferImpl.scala | 124 +++-------- .../actor/typed/javadsl/ActorContext.scala | 2 + .../actor/typed/javadsl/StashBuffer.scala | 163 ++++---------- .../actor/typed/scaladsl/ActorContext.scala | 2 + .../actor/typed/scaladsl/StashBuffer.scala | 170 ++++----------- akka-docs/src/main/paradox/index-typed.md | 19 -- akka-docs/src/main/paradox/index.md | 2 +- akka-docs/src/main/paradox/typed/index.md | 20 ++ akka-docs/src/main/paradox/typed/stash.md | 46 ++++ .../akka/stream/StreamRefExceptions.scala | 6 - .../akka/testkit/typed/BehaviourTestkit.scala | 22 +- 22 files changed, 792 insertions(+), 637 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java delete mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala delete mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala create mode 100644 akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala delete mode 100644 akka-docs/src/main/paradox/index-typed.md create mode 100644 akka-docs/src/main/paradox/typed/index.md create mode 100644 akka-docs/src/main/paradox/typed/stash.md delete mode 100644 akka-stream/src/main/scala/akka/stream/StreamRefExceptions.scala diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java new file mode 100644 index 0000000000..59cba6e870 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java @@ -0,0 +1,203 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package jdocs.akka.typed; + +//#import +import akka.actor.typed.javadsl.StashBuffer; +//#import + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; +import akka.testkit.typed.BehaviorTestkit; +import akka.testkit.typed.TestInbox; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class StashDocTest extends JUnitSuite { + + //#db + + interface DB { + CompletionStage save(String id, String value); + CompletionStage load(String id); + } + //#db + + //#stashing + + public static class DataAccess { + + static interface Command { + } + + public static class Save implements Command { + public final String payload; + public final ActorRef replyTo; + + public Save(String payload, ActorRef replyTo) { + this.payload = payload; + this.replyTo = replyTo; + } + } + + public static class Get implements Command { + public final ActorRef replyTo; + + public Get(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static class InitialState implements Command { + public final String value; + + InitialState(String value) { + this.value = value; + } + } + + static class SaveSuccess implements Command { + public static final SaveSuccess instance = new SaveSuccess(); + + private SaveSuccess() { + } + } + + static class DBError implements Command { + public final RuntimeException cause; + + public DBError(RuntimeException cause) { + this.cause = cause; + } + } + + + private final StashBuffer buffer = StashBuffer.create(100); + private final String id; + private final DB db; + + public DataAccess(String id, DB db) { + this.id = id; + this.db = db; + } + + Behavior behavior() { + return Behaviors.deferred(ctx -> { + db.load(id) + .whenComplete((value, cause) -> { + if (cause == null) + ctx.getSelf().tell(new InitialState(value)); + else + ctx.getSelf().tell(new DBError(asRuntimeException(cause))); + }); + + return init(); + }); + } + + private Behavior init() { + return Behaviors.immutable(Command.class) + .onMessage(InitialState.class, (ctx, msg) -> { + // now we are ready to handle stashed messages if any + return buffer.unstashAll(ctx, active(msg.value)); + }) + .onMessage(DBError.class, (ctx, msg) -> { + throw msg.cause; + }) + .onMessage(Command.class, (ctx, msg) -> { + // stash all other messages for later processing + buffer.stash(msg); + return Behaviors.same(); + }) + .build(); + } + + private Behavior active(String state) { + return Behaviors.immutable(Command.class) + .onMessage(Get.class, (ctx, msg) -> { + msg.replyTo.tell(state); + return Behaviors.same(); + }) + .onMessage(Save.class, (ctx, msg) -> { + db.save(id, msg.payload) + .whenComplete((value, cause) -> { + if (cause == null) + ctx.getSelf().tell(SaveSuccess.instance); + else + ctx.getSelf().tell(new DBError(asRuntimeException(cause))); + }); + return saving(msg.payload, msg.replyTo); + }) + .build(); + } + + private Behavior saving(String state, ActorRef replyTo) { + return Behaviors.immutable(Command.class) + .onMessageEquals(SaveSuccess.instance, ctx -> { + replyTo.tell(Done.getInstance()); + return buffer.unstashAll(ctx, active(state)); + }) + .onMessage(DBError.class, (ctx, msg) -> { + throw msg.cause; + }) + .onMessage(Command.class, (ctx, msg) -> { + buffer.stash(msg); + return Behaviors.same(); + }) + .build(); + } + + + private static RuntimeException asRuntimeException(Throwable t) { + // can't throw Throwable in lambdas + if (t instanceof RuntimeException) { + return (RuntimeException) t; + } else { + return new RuntimeException(t); + } + } + + } + + //#stashing + + @Test + public void stashingExample() throws Exception { + final DB db = new DB() { + public CompletionStage save(String id, String value) { + return CompletableFuture.completedFuture(Done.getInstance()); + } + public CompletionStage load(String id) { + return CompletableFuture.completedFuture("TheValue"); + } + }; + final DataAccess dataAccess = new DataAccess("17", db); + BehaviorTestkit testKit = BehaviorTestkit.create(dataAccess.behavior()); + TestInbox getInbox = TestInbox.apply("getInbox"); + testKit.run(new DataAccess.Get(getInbox.ref())); + DataAccess.Command initialStateMsg = testKit.selfInbox().receiveMsg(); + testKit.run(initialStateMsg); + getInbox.expectMsg("TheValue"); + + TestInbox saveInbox = TestInbox.apply("saveInbox"); + testKit.run(new DataAccess.Save("UpdatedValue", saveInbox.ref())); + testKit.run(new DataAccess.Get(getInbox.ref())); + DataAccess.Command saveSuccessMsg = testKit.selfInbox().receiveMsg(); + testKit.run(saveSuccessMsg); + saveInbox.expectMsg(Done.getInstance()); + getInbox.expectMsg("UpdatedValue"); + + testKit.run(new DataAccess.Get(getInbox.ref())); + getInbox.expectMsg("UpdatedValue"); + } + +} + + + diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala deleted file mode 100644 index b77f6650a5..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutableStashBufferSpec.scala +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2017-2018 Lightbend Inc. - */ -package akka.actor.typed.scaladsl - -import org.scalatest.Matchers -import org.scalatest.WordSpec - -class ImmutableStashBufferSpec extends WordSpec with Matchers { - - "A ImmutableStashBuffer" must { - - "answer empty correctly" in { - var buffer = ImmutableStashBuffer[String](10) - buffer.isEmpty should ===(true) - buffer.nonEmpty should ===(false) - buffer = buffer.stash("m1") - buffer.isEmpty should ===(false) - buffer.nonEmpty should ===(true) - } - - "append and drop" in { - var buffer = ImmutableStashBuffer[String](10) - buffer.size should ===(0) - buffer = buffer.stash("m1") - buffer.size should ===(1) - buffer = buffer.stash("m2") - buffer.size should ===(2) - val m1 = buffer.head - m1 should ===("m1") - buffer.size should ===(2) - buffer = buffer.dropHead() - buffer.size should ===(1) - m1 should ===("m1") - val m2 = buffer.head - m2 should ===("m2") - buffer = buffer.dropHead() - buffer.size should ===(0) - intercept[NoSuchElementException] { - buffer = buffer.dropHead() - } - intercept[NoSuchElementException] { - buffer.head - } - buffer.size should ===(0) - } - - "enforce capacity" in { - var buffer = ImmutableStashBuffer[String](3) - buffer = buffer.stash("m1") - buffer = buffer.stash("m2") - buffer = buffer.stash("m3") - intercept[StashOverflowException] { - buffer = buffer.stash("m4") - } - // it's actually a javadsl.StashOverflowException - intercept[akka.actor.typed.javadsl.StashOverflowException] { - buffer.stash("m4") - } - buffer.size should ===(3) - } - - "process elements in the right order" in { - var buffer = ImmutableStashBuffer[String](10) - buffer = buffer.stash("m1") - buffer = buffer.stash("m2") - buffer = buffer.stash("m3") - val sb1 = new StringBuilder() - buffer.foreach(sb1.append(_)) - sb1.toString() should ===("m1m2m3") - buffer = buffer.dropHead() - val sb2 = new StringBuilder() - buffer.foreach(sb2.append(_)) - sb2.toString() should ===("m2m3") - } - } - -} - diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala deleted file mode 100644 index c99f170956..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MutableStashBufferSpec.scala +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright (C) 2017-2018 Lightbend Inc. - */ -package akka.actor.typed.scaladsl - -import org.scalatest.Matchers -import org.scalatest.WordSpec - -class MutableStashBufferSpec extends WordSpec with Matchers { - - "A MutableStashBuffer" must { - - "answer empty correctly" in { - val buffer = MutableStashBuffer[String](10) - buffer.isEmpty should ===(true) - buffer.nonEmpty should ===(false) - buffer.stash("m1") - buffer.isEmpty should ===(false) - buffer.nonEmpty should ===(true) - } - - "append and drop" in { - val buffer = MutableStashBuffer[String](10) - buffer.size should ===(0) - buffer.stash("m1") - buffer.size should ===(1) - buffer.stash("m2") - buffer.size should ===(2) - val m1 = buffer.head - m1 should ===("m1") - buffer.size should ===(2) - buffer.dropHead() - buffer.size should ===(1) - m1 should ===("m1") - val m2 = buffer.head - m2 should ===("m2") - buffer.dropHead() - buffer.size should ===(0) - intercept[NoSuchElementException] { - buffer.dropHead() - } - intercept[NoSuchElementException] { - buffer.head - } - buffer.size should ===(0) - } - - "enforce capacity" in { - val buffer = MutableStashBuffer[String](3) - buffer.stash("m1") - buffer.stash("m2") - buffer.stash("m3") - intercept[StashOverflowException] { - buffer.stash("m4") - } - // it's actually a javadsl.StashOverflowException - intercept[akka.actor.typed.javadsl.StashOverflowException] { - buffer.stash("m4") - } - buffer.size should ===(3) - } - - "process elements in the right order" in { - val buffer = MutableStashBuffer[String](10) - buffer.stash("m1") - buffer.stash("m2") - buffer.stash("m3") - val sb1 = new StringBuilder() - buffer.foreach(sb1.append(_)) - sb1.toString() should ===("m1m2m3") - buffer.dropHead() - val sb2 = new StringBuilder() - buffer.foreach(sb2.append(_)) - sb2.toString() should ===("m2m3") - } - - } - -} - diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala new file mode 100644 index 0000000000..dd0a22f8b6 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.actor.typed.scaladsl + +import akka.actor.typed.Behavior +import akka.testkit.typed.EffectfulActorContext +import akka.testkit.typed.TestInbox +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class StashBufferSpec extends WordSpec with Matchers { + + val ctx = new EffectfulActorContext[String]("StashBufferSpec") + + "A StashBuffer" must { + + "answer empty correctly" in { + val buffer = StashBuffer[String](10) + buffer.isEmpty should ===(true) + buffer.nonEmpty should ===(false) + buffer.stash("m1") + buffer.isEmpty should ===(false) + buffer.nonEmpty should ===(true) + } + + "append and drop" in { + val buffer = StashBuffer[String](10) + buffer.size should ===(0) + buffer.stash("m1") + buffer.size should ===(1) + buffer.stash("m2") + buffer.size should ===(2) + val m1 = buffer.head + m1 should ===("m1") + buffer.size should ===(2) + buffer.unstash(ctx, Behaviors.ignore, 1, identity) + buffer.size should ===(1) + m1 should ===("m1") + val m2 = buffer.head + m2 should ===("m2") + buffer.unstash(ctx, Behaviors.ignore, 1, identity) + buffer.size should ===(0) + intercept[NoSuchElementException] { + buffer.head + } + buffer.size should ===(0) + } + + "enforce capacity" in { + val buffer = StashBuffer[String](3) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + intercept[StashOverflowException] { + buffer.stash("m4") + } + // it's actually a javadsl.StashOverflowException + intercept[akka.actor.typed.javadsl.StashOverflowException] { + buffer.stash("m4") + } + buffer.size should ===(3) + } + + "process elements in the right order" in { + val buffer = StashBuffer[String](10) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + val sb1 = new StringBuilder() + buffer.foreach(sb1.append(_)) + sb1.toString() should ===("m1m2m3") + buffer.unstash(ctx, Behaviors.ignore, 1, identity) + val sb2 = new StringBuilder() + buffer.foreach(sb2.append(_)) + sb2.toString() should ===("m2m3") + } + + "unstash to returned behaviors" in { + val buffer = StashBuffer[String](10) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + buffer.stash("get") + + val valueInbox = TestInbox[String]() + def behavior(state: String): Behavior[String] = + Behaviors.immutable[String] { (_, msg) ⇒ + if (msg == "get") { + valueInbox.ref ! state + Behaviors.same + } else { + behavior(state + msg) + } + } + + buffer.unstashAll(ctx, behavior("")) + valueInbox.expectMsg("m1m2m3") + buffer.isEmpty should ===(true) + } + + "undefer returned behaviors when unstashing" in { + val buffer = StashBuffer[String](10) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + buffer.stash("get") + + val valueInbox = TestInbox[String]() + def behavior(state: String): Behavior[String] = + Behaviors.immutable[String] { (_, msg) ⇒ + if (msg == "get") { + valueInbox.ref ! state + Behaviors.same + } else { + Behaviors.deferred[String](_ ⇒ behavior(state + msg)) + } + } + + buffer.unstashAll(ctx, behavior("")) + valueInbox.expectMsg("m1m2m3") + buffer.isEmpty should ===(true) + } + + "be able to stash while unstashing" in { + val buffer = StashBuffer[String](10) + buffer.stash("m1") + buffer.stash("m2") + buffer.stash("m3") + buffer.stash("get") + + val valueInbox = TestInbox[String]() + def behavior(state: String): Behavior[String] = + Behaviors.immutable[String] { (_, msg) ⇒ + if (msg == "get") { + valueInbox.ref ! state + Behaviors.same + } else if (msg == "m2") { + buffer.stash("m2") + Behaviors.same + } else { + behavior(state + msg) + } + } + + // It's only supposed to unstash the messages that are in the buffer when + // the call is made, not unstash new messages added to the buffer while + // unstashing. + val b2 = buffer.unstashAll(ctx, behavior("")) + valueInbox.expectMsg("m1m3") + buffer.size should ===(1) + buffer.head should ===("m2") + + val b3 = buffer.unstashAll(ctx, b2) + buffer.size should ===(1) + buffer.head should ===("m2") + } + + } + +} + diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala index 33f15f2a71..706a758562 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala @@ -4,7 +4,6 @@ package akka.actor.typed package scaladsl -import akka.event.LoggingAdapter import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl.TestProbe @@ -19,98 +18,109 @@ object StashSpec { final case class GetProcessed(replyTo: ActorRef[Vector[String]]) extends Command final case class GetStashSize(replyTo: ActorRef[Int]) extends Command - def active(processed: Vector[String]): Behavior[Command] = - Behaviors.immutable { (ctx, cmd) ⇒ - cmd match { - case msg: Msg ⇒ - active(processed :+ msg.s) - case GetProcessed(replyTo) ⇒ - replyTo ! processed - Behaviors.same - case Stash ⇒ - stashing(ImmutableStashBuffer(capacity = 10), processed) - case GetStashSize(replyTo) ⇒ - replyTo ! 0 - Behaviors.same - case UnstashAll ⇒ - Behaviors.unhandled - case Unstash ⇒ - Behaviors.unhandled - case u: Unstashed ⇒ - throw new IllegalStateException(s"Unexpected $u in active") - } - } + val immutableStash: Behavior[Command] = + Behaviors.deferred[Command] { _ ⇒ + val buffer = StashBuffer[Command](capacity = 10) - def stashing(buffer: ImmutableStashBuffer[Command], processed: Vector[String]): Behavior[Command] = - Behaviors.immutable { (ctx, cmd) ⇒ - cmd match { - case msg: Msg ⇒ - stashing(buffer :+ msg, processed) - case g: GetProcessed ⇒ - stashing(buffer :+ g, processed) - case GetStashSize(replyTo) ⇒ - replyTo ! buffer.size - Behaviors.same - case UnstashAll ⇒ - buffer.unstashAll(ctx, active(processed)) - case Unstash ⇒ - ctx.log.debug(s"Unstash ${buffer.size}") - if (buffer.isEmpty) - active(processed) - else { - ctx.self ! Unstash // continue unstashing until buffer is empty - val numberOfMessages = 2 - ctx.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") - buffer.unstash(ctx, unstashing(buffer.drop(numberOfMessages), processed), numberOfMessages, Unstashed) + def active(processed: Vector[String]): Behavior[Command] = + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case msg: Msg ⇒ + active(processed :+ msg.s) + case GetProcessed(replyTo) ⇒ + replyTo ! processed + Behaviors.same + case Stash ⇒ + stashing(processed) + case GetStashSize(replyTo) ⇒ + replyTo ! 0 + Behaviors.same + case UnstashAll ⇒ + Behaviors.unhandled + case Unstash ⇒ + Behaviors.unhandled + case u: Unstashed ⇒ + throw new IllegalStateException(s"Unexpected $u in active") } - case Stash ⇒ - Behaviors.unhandled - case u: Unstashed ⇒ - throw new IllegalStateException(s"Unexpected $u in stashing") - } - } + } - def unstashing(buffer: ImmutableStashBuffer[Command], processed: Vector[String]): Behavior[Command] = - Behaviors.immutable { (ctx, cmd) ⇒ - cmd match { - case Unstashed(msg: Msg) ⇒ - ctx.log.debug(s"unstashed $msg") - unstashing(buffer, processed :+ msg.s) - case Unstashed(GetProcessed(replyTo)) ⇒ - ctx.log.debug(s"unstashed GetProcessed") - replyTo ! processed - Behaviors.same - case msg: Msg ⇒ - ctx.log.debug(s"got $msg in unstashing") - unstashing(buffer :+ msg, processed) - case get: GetProcessed ⇒ - ctx.log.debug(s"got GetProcessed in unstashing") - unstashing(buffer :+ get, processed) - case Stash ⇒ - stashing(buffer, processed) - case Unstash ⇒ - if (buffer.isEmpty) { - ctx.log.debug(s"unstashing done") - active(processed) - } else { - ctx.self ! Unstash // continue unstashing until buffer is empty - val numberOfMessages = 2 - ctx.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") - buffer.unstash(ctx, unstashing(buffer.drop(numberOfMessages), processed), numberOfMessages, Unstashed) + def stashing(processed: Vector[String]): Behavior[Command] = + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case msg: Msg ⇒ + buffer.stash(msg) + Behaviors.same + case g: GetProcessed ⇒ + buffer.stash(g) + Behaviors.same + case GetStashSize(replyTo) ⇒ + replyTo ! buffer.size + Behaviors.same + case UnstashAll ⇒ + buffer.unstashAll(ctx, active(processed)) + case Unstash ⇒ + ctx.log.debug(s"Unstash ${buffer.size}") + if (buffer.isEmpty) + active(processed) + else { + ctx.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + ctx.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(ctx, unstashing(processed), numberOfMessages, Unstashed) + } + case Stash ⇒ + Behaviors.unhandled + case u: Unstashed ⇒ + throw new IllegalStateException(s"Unexpected $u in stashing") } - case GetStashSize(replyTo) ⇒ - replyTo ! buffer.size - Behaviors.same - case UnstashAll ⇒ - Behaviors.unhandled - case u: Unstashed ⇒ - throw new IllegalStateException(s"Unexpected $u in unstashing") - } + } + + def unstashing(processed: Vector[String]): Behavior[Command] = + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case Unstashed(msg: Msg) ⇒ + ctx.log.debug(s"unstashed $msg") + unstashing(processed :+ msg.s) + case Unstashed(GetProcessed(replyTo)) ⇒ + ctx.log.debug(s"unstashed GetProcessed") + replyTo ! processed + Behaviors.same + case msg: Msg ⇒ + ctx.log.debug(s"got $msg in unstashing") + buffer.stash(msg) + Behaviors.same + case g: GetProcessed ⇒ + ctx.log.debug(s"got GetProcessed in unstashing") + buffer.stash(g) + Behaviors.same + case Stash ⇒ + stashing(processed) + case Unstash ⇒ + if (buffer.isEmpty) { + ctx.log.debug(s"unstashing done") + active(processed) + } else { + ctx.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + ctx.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(ctx, unstashing(processed), numberOfMessages, Unstashed) + } + case GetStashSize(replyTo) ⇒ + replyTo ! buffer.size + Behaviors.same + case UnstashAll ⇒ + Behaviors.unhandled + case u: Unstashed ⇒ + throw new IllegalStateException(s"Unexpected $u in unstashing") + } + } + + active(Vector.empty) } class MutableStash(ctx: ActorContext[Command]) extends Behaviors.MutableBehavior[Command] { - private val buffer = MutableStashBuffer.apply[Command](capacity = 10) + private val buffer = StashBuffer.apply[Command](capacity = 10) private var stashing = false private var processed = Vector.empty[String] @@ -167,7 +177,7 @@ object StashSpec { class ImmutableStashSpec extends StashSpec { import StashSpec._ def testQualifier: String = "immutable behavior" - def behaviorUnderTest: Behavior[Command] = active(Vector.empty) + def behaviorUnderTest: Behavior[Command] = immutableStash } class MutableStashSpec extends StashSpec { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala new file mode 100644 index 0000000000..51e0ef202d --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package docs.akka.typed + +import scala.concurrent.Future +import scala.util.Failure +import scala.util.Success + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.testkit.typed.BehaviorTestkit +import akka.testkit.typed.TestInbox +import org.scalatest.Matchers +import org.scalatest.WordSpec + +object StashDocSpec { + // #stashing + import akka.actor.typed.scaladsl.StashBuffer + + trait DB { + def save(id: String, value: String): Future[Done] + def load(id: String): Future[String] + } + + object DataAccess { + trait Command + final case class Save(value: String, replyTo: ActorRef[Done]) extends Command + final case class Get(replyTo: ActorRef[String]) extends Command + private final case class InitialState(value: String) extends Command + private final case object SaveSuccess extends Command + private final case class DBError(cause: Throwable) extends Command + + def behavior(id: String, db: DB): Behavior[Command] = + Behaviors.deferred[Command] { ctx ⇒ + + val buffer = StashBuffer[Command](capacity = 100) + + def init(): Behavior[Command] = + Behaviors.immutable[Command] { (ctx, msg) ⇒ + msg match { + case InitialState(value) ⇒ + // now we are ready to handle stashed messages if any + buffer.unstashAll(ctx, active(value)) + case DBError(cause) ⇒ + throw cause + case other ⇒ + // stash all other messages for later processing + buffer.stash(other) + Behaviors.same + } + } + + def active(state: String): Behavior[Command] = + Behaviors.immutable { (ctx, msg) ⇒ + msg match { + case Get(replyTo) ⇒ + replyTo ! state + Behaviors.same + case Save(value, replyTo) ⇒ + import ctx.executionContext + db.save(id, value).onComplete { + case Success(_) ⇒ ctx.self ! SaveSuccess + case Failure(cause) ⇒ ctx.self ! DBError(cause) + } + saving(value, replyTo) + } + } + + def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = + Behaviors.immutable[Command] { (ctx, msg) ⇒ + msg match { + case SaveSuccess ⇒ + replyTo ! Done + buffer.unstashAll(ctx, active(state)) + case DBError(cause) ⇒ + throw cause + case other ⇒ + buffer.stash(other) + Behaviors.same + } + } + + import ctx.executionContext + db.load(id).onComplete { + case Success(value) ⇒ + ctx.self ! InitialState(value) + case Failure(cause) ⇒ + ctx.self ! DBError(cause) + } + + init() + } + } + // #stashing +} + +class StashDocSpec extends WordSpec with Matchers { + import StashDocSpec.DB + import StashDocSpec.DataAccess + + "Stashing docs" must { + + "illustrate stash and unstashAll" in { + + val db = new DB { + override def save(id: String, value: String): Future[Done] = Future.successful(Done) + override def load(id: String): Future[String] = Future.successful("TheValue") + } + val testKit = BehaviorTestkit(DataAccess.behavior(id = "17", db)) + val getInbox = TestInbox[String]() + testKit.run(DataAccess.Get(getInbox.ref)) + val initialStateMsg = testKit.selfInbox().receiveMsg() + testKit.run(initialStateMsg) + getInbox.expectMsg("TheValue") + + val saveInbox = TestInbox[Done]() + testKit.run(DataAccess.Save("UpdatedValue", saveInbox.ref)) + testKit.run(DataAccess.Get(getInbox.ref)) + val saveSuccessMsg = testKit.selfInbox().receiveMsg() + testKit.run(saveSuccessMsg) + saveInbox.expectMsg(Done) + getInbox.expectMsg("UpdatedValue") + + testKit.run(DataAccess.Get(getInbox.ref)) + getInbox.expectMsg("UpdatedValue") + + } + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorContext.scala index fdd7e97af3..644c716147 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorContext.scala @@ -9,6 +9,8 @@ import akka.annotation.ApiMayChange /** * This trait is not meant to be extended by user code. If you do so, you may * lose binary compatibility. + * + * Not for user extension. */ @DoNotInherit @ApiMayChange diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 5b9918ddf8..a7b20caf71 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -26,6 +26,8 @@ import akka.actor.typed.receptionist.Receptionist * Actor of this hierarchy and which will create all other Actors beneath it. * A system also implements the [[ActorRef]] type, and sending a message to * the system directs that message to the root Actor. + * + * Not for user extension. */ @DoNotInherit @ApiMayChange diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 24ffafe866..726a6a54ad 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -28,6 +28,8 @@ import akka.util.OptionVal * * This base class is not meant to be extended by user code. If you do so, you may * lose binary compatibility. + * + * Not for user extension. */ @InternalApi @DoNotInherit diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala index ca3a5374e7..5e3c0527e9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala @@ -117,7 +117,7 @@ abstract class ExtensionId[T <: Extension] { /** * API for registering and looking up extensions. * - * Not intended to be extended by user code. + * Not for user extension. */ @DoNotInherit trait Extensions { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala index da401fc163..444f890621 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala @@ -29,6 +29,7 @@ object Props { * Deliberately not sealed in order to emphasize future extensibility by the * framework—this is not intended to be extended by user code. * + * Not for user extension. */ @DoNotInherit @ApiMayChange @@ -135,7 +136,7 @@ private[akka] case object EmptyProps extends Props { } /** - * Not intended for user extension. + * Not for user extension. */ @DoNotInherit sealed abstract class DispatcherSelector extends Props @@ -171,9 +172,9 @@ object DispatcherSelector { } /** - * Use the [[ActorSystem]] default executor to run the actor. - * * INTERNAL API + * + * Use the [[ActorSystem]] default executor to run the actor. */ @DoNotInherit @InternalApi diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index fc08b1534b..bb690fe01c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -6,109 +6,39 @@ package akka.actor.typed.internal import java.util.function.Consumer import java.util.function.{ Function ⇒ JFunction } -import scala.annotation.tailrec - -import akka.actor.typed.scaladsl -import akka.actor.typed.javadsl -import akka.annotation.InternalApi -import akka.actor.typed.Behavior import akka.actor.typed.ActorContext -import akka.actor.typed.Signal +import akka.actor.typed.Behavior +import akka.actor.typed.javadsl +import akka.actor.typed.scaladsl +import akka.annotation.InternalApi import akka.util.ConstantFun /** * INTERNAL API */ -@InternalApi private[akka] object ImmutableStashBufferImpl { - def apply[T](capacity: Int): ImmutableStashBufferImpl[T] = - new ImmutableStashBufferImpl(capacity, Vector.empty) -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class ImmutableStashBufferImpl[T](val capacity: Int, buffer: Vector[T]) - extends javadsl.ImmutableStashBuffer[T] with scaladsl.ImmutableStashBuffer[T] { - - override def isEmpty: Boolean = buffer.isEmpty - - override def nonEmpty: Boolean = !isEmpty - - override def size: Int = buffer.size - - override def isFull: Boolean = size == capacity - - override def stash(message: T): ImmutableStashBufferImpl[T] = { - if (message == null) throw new NullPointerException - if (isFull) - throw new javadsl.StashOverflowException(s"Couldn't add [${message.getClass.getName}] " + - s"because stash with capacity [$capacity] is full") - - new ImmutableStashBufferImpl(capacity, buffer :+ message) - } - - override def dropHead(): ImmutableStashBufferImpl[T] = - if (buffer.nonEmpty) new ImmutableStashBufferImpl(capacity, buffer.tail) - else throw new NoSuchElementException("head of empty buffer") - - override def drop(numberOfMessages: Int): ImmutableStashBufferImpl[T] = - if (isEmpty) this - else new ImmutableStashBufferImpl(capacity, buffer.drop(numberOfMessages)) - - override def head: T = - if (buffer.nonEmpty) buffer.head - else throw new NoSuchElementException("head of empty buffer") - - override def foreach(f: T ⇒ Unit): Unit = - buffer.foreach(f) - - override def forEach(f: Consumer[T]): Unit = foreach(f.accept) - - override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = - unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) - - override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = - unstashAll(ctx.asScala, behavior) - - override def unstash(scaladslCtx: scaladsl.ActorContext[T], behavior: Behavior[T], - numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { - val ctx = scaladslCtx.asInstanceOf[ActorContext[T]] - val iter = buffer.iterator.take(numberOfMessages).map(wrap) - Behavior.interpretMessages[T](behavior, ctx, iter) - } - - override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T], - numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = - unstash(ctx.asScala, behavior, numberOfMessages, x ⇒ wrap.apply(x)) - -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] object MutableStashBufferImpl { +@InternalApi private[akka] object StashBufferImpl { private final class Node[T](var next: Node[T], val message: T) { def apply(f: T ⇒ Unit): Unit = f(message) } - def apply[T](capacity: Int): MutableStashBufferImpl[T] = - new MutableStashBufferImpl(capacity, null, null) + def apply[T](capacity: Int): StashBufferImpl[T] = + new StashBufferImpl(capacity, null, null) } /** * INTERNAL API */ -@InternalApi private[akka] final class MutableStashBufferImpl[T] private ( - val capacity: Int, - private var _head: MutableStashBufferImpl.Node[T], - private var _tail: MutableStashBufferImpl.Node[T]) - extends javadsl.MutableStashBuffer[T] with scaladsl.MutableStashBuffer[T] { +@InternalApi private[akka] final class StashBufferImpl[T] private ( + val capacity: Int, + private var _first: StashBufferImpl.Node[T], + private var _last: StashBufferImpl.Node[T]) + extends javadsl.StashBuffer[T] with scaladsl.StashBuffer[T] { - import MutableStashBufferImpl.Node + import StashBufferImpl.Node - private var _size: Int = if (_head eq null) 0 else 1 + private var _size: Int = if (_first eq null) 0 else 1 - override def isEmpty: Boolean = _head eq null + override def isEmpty: Boolean = _first eq null override def nonEmpty: Boolean = !isEmpty @@ -116,7 +46,7 @@ import akka.util.ConstantFun override def isFull: Boolean = _size == capacity - override def stash(message: T): MutableStashBufferImpl[T] = { + override def stash(message: T): StashBufferImpl[T] = { if (message == null) throw new NullPointerException if (isFull) throw new javadsl.StashOverflowException(s"Couldn't add [${message.getClass.getName}] " + @@ -124,32 +54,32 @@ import akka.util.ConstantFun val node = new Node(null, message) if (isEmpty) { - _head = node - _tail = node + _first = node + _last = node } else { - _tail.next = node - _tail = node + _last.next = node + _last = node } _size += 1 this } - override def dropHead(): T = { + private def dropHead(): T = { val message = head - _head = _head.next + _first = _first.next _size -= 1 if (isEmpty) - _tail = null + _last = null message } override def head: T = - if (nonEmpty) _head.message + if (nonEmpty) _first.message else throw new NoSuchElementException("head of empty buffer") override def foreach(f: T ⇒ Unit): Unit = { - var node = _head + var node = _first while (node ne null) { node(f) node = node.next @@ -167,8 +97,8 @@ import akka.util.ConstantFun override def unstash(scaladslCtx: scaladsl.ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { val iter = new Iterator[T] { - override def hasNext: Boolean = MutableStashBufferImpl.this.nonEmpty - override def next(): T = MutableStashBufferImpl.this.dropHead() + override def hasNext: Boolean = StashBufferImpl.this.nonEmpty + override def next(): T = StashBufferImpl.this.dropHead() }.take(numberOfMessages).map(wrap) val ctx = scaladslCtx.asInstanceOf[ActorContext[T]] Behavior.interpretMessages[T](behavior, ctx, iter) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 2cde11f1e2..d6bb87adda 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -32,6 +32,8 @@ import scala.concurrent.ExecutionContextExecutor * An `ActorContext` in addition provides access to the Actor’s own identity (“`getSelf`”), * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it * created, access to [[Terminated]] and timed message scheduling. + * + * Not for user extension. */ @DoNotInherit @ApiMayChange diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala index 364e5d1a4b..cee213d421 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala @@ -6,113 +6,12 @@ package akka.actor.typed.javadsl import java.util.function.Consumer import java.util.function.{ Function ⇒ JFunction } -import scala.annotation.tailrec - -import akka.annotation.DoNotInherit import akka.actor.typed.Behavior -import akka.actor.typed.internal.ImmutableStashBufferImpl -import akka.actor.typed.internal.MutableStashBufferImpl +import akka.actor.typed.internal.StashBufferImpl import akka.actor.typed.scaladsl +import akka.annotation.DoNotInherit -object ImmutableStashBuffer { - /** - * Create an empty message buffer. - * - * @param capacity the buffer can hold at most this number of messages - * @return an empty message buffer - */ - def create[T](capacity: Int): ImmutableStashBuffer[T] = - ImmutableStashBufferImpl[T](capacity) - -} - -/** - * A thread safe immutable message buffer that can be used to buffer messages inside actors. - * - * The buffer can hold at most the given `capacity` number of messages. - */ -@DoNotInherit abstract class ImmutableStashBuffer[T] { - - /** - * Check if the message buffer is empty. - * - * @return if the buffer is empty - */ - def isEmpty: Boolean - - /** - * Check if the message buffer is not empty. - * - * @return if the buffer is not empty - */ - def nonEmpty: Boolean - - /** - * How many elements are in the message buffer. - * - * @return the number of elements in the message buffer - */ - def size: Int - - /** - * @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer - */ - def isFull: Boolean - - /** - * Add one element to the end of the message buffer. Note that this class is - * immutable so the returned instance contains the added message. - * - * @param message the message to buffer, must not be `null` - * @return this message buffer - * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. - */ - def stash(message: T): ImmutableStashBuffer[T] - - /** - * Remove the first element of the message buffer. Note that this class is - * immutable so the head element is removed in the returned instance. - * - * @throws `NoSuchElementException` if the buffer is empty - */ - def dropHead(): ImmutableStashBuffer[T] - - /** - * Remove the first `numberOfMessages` of the message buffer. Note that this class is - * immutable so the elements are removed in the returned instance. - */ - def drop(numberOfMessages: Int): ImmutableStashBuffer[T] - - /** - * Return the first element of the message buffer. - * - * @return the first element or throws `NoSuchElementException` if the buffer is empty - * @throws `NoSuchElementException` if the buffer is empty - */ - def head: T - - /** - * Iterate over all elements of the buffer and apply a function to each element. - * - * @param f the function to apply to each element - */ - def forEach(f: Consumer[T]): Unit - - /** - * Process all stashed messages with the `behavior` and the returned - * [[Behavior]] from each processed message. - */ - def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] - - /** - * Process `numberOfMessages` of the stashed messages with the `behavior` - * and the returned [[Behavior]] from each processed message. - */ - def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] - -} - -object MutableStashBuffer { +object StashBuffer { /** * Create an empty message buffer. @@ -120,16 +19,19 @@ object MutableStashBuffer { * @param capacity the buffer can hold at most this number of messages * @return an empty message buffer */ - def create[T](capacity: Int): MutableStashBuffer[T] = - MutableStashBufferImpl[T](capacity) + def create[T](capacity: Int): StashBuffer[T] = + StashBufferImpl[T](capacity) } /** - * A non thread safe mutable message buffer that can be used to buffer messages inside actors. + * A non thread safe mutable message buffer that can be used to buffer messages inside actors + * and then unstash them. * * The buffer can hold at most the given `capacity` number of messages. + * + * Not for user extension. */ -@DoNotInherit abstract class MutableStashBuffer[T] { +@DoNotInherit abstract class StashBuffer[T] { /** * Check if the message buffer is empty. @@ -160,21 +62,13 @@ object MutableStashBuffer { /** * Add one element to the end of the message buffer. * - * [[StashOverflowException]] is thrown if the buffer [[MutableStashBuffer#isFull]]. + * [[StashOverflowException]] is thrown if the buffer [[StashBuffer#isFull]]. * * @param message the message to buffer * @return this message buffer - * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + * @throws `StashOverflowException` is thrown if the buffer [[StashBuffer#isFull]]. */ - def stash(message: T): MutableStashBuffer[T] - - /** - * Return the first element of the message buffer and removes it. - * - * @return the first element or throws `NoSuchElementException` if the buffer is empty - * @throws `NoSuchElementException` if the buffer is empty - */ - def dropHead(): T + def stash(message: T): StashBuffer[T] /** * Return the first element of the message buffer without removing it. @@ -185,7 +79,8 @@ object MutableStashBuffer { def head: T /** - * Iterate over all elements of the buffer and apply a function to each element. + * Iterate over all elements of the buffer and apply a function to each element, + * without removing them. * * @param f the function to apply to each element */ @@ -193,20 +88,38 @@ object MutableStashBuffer { /** * Process all stashed messages with the `behavior` and the returned - * [[Behavior]] from each processed message. The `MutableStashBuffer` will be - * empty after processing all messages, unless an exception is thrown. + * [[Behavior]] from each processed message. The `StashBuffer` will be + * empty after processing all messages, unless an exception is thrown + * or messages are stashed while unstashing. + * * If an exception is thrown by processing a message a proceeding messages * and the message causing the exception have been removed from the - * `MutableStashBuffer`, but unprocessed messages remain. + * `StashBuffer`, but unprocessed messages remain. + * + * It's allowed to stash messages while unstashing. Those newly added + * messages will not be processed by this call and have to be unstashed + * in another call. */ def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] /** * Process `numberOfMessages` of the stashed messages with the `behavior` * and the returned [[Behavior]] from each processed message. + * + * The purpose of this method, compared to `unstashAll`, is to unstash a limited + * number of messages and then send a message to `self` before continuing unstashing + * more. That means that other new messages may arrive in-between and those must + * be stashed to keep the original order of messages. To differentiate between + * unstashed and new incoming messages the unstashed messages can be wrapped + * in another message with the `wrap`. + * * If an exception is thrown by processing a message a proceeding messages * and the message causing the exception have been removed from the - * `MutableStashBuffer`, but unprocessed messages remain. + * `StashBuffer`, but unprocessed messages remain. + * + * It's allowed to stash messages while unstashing. Those newly added + * messages will not be processed by this call and have to be unstashed + * in another call. */ def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] @@ -215,4 +128,4 @@ object MutableStashBuffer { /** * Is thrown when the size of the stash exceeds the capacity of the stash buffer. */ -class StashOverflowException(message: String) extends scaladsl.StashOverflowException(message) +final class StashOverflowException(message: String) extends scaladsl.StashOverflowException(message) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index d864a257c4..c41f571e6d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -31,6 +31,8 @@ import akka.annotation.InternalApi * An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”), * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it * created, access to [[Terminated]] and timed message scheduling. + * + * Not for user extension. */ @DoNotInherit @ApiMayChange diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala index 2275cf728e..b45d48004b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala @@ -3,125 +3,13 @@ */ package akka.actor.typed.scaladsl -import java.util.function.Consumer import java.util.function.{ Function ⇒ JFunction } -import scala.annotation.tailrec - -import akka.annotation.DoNotInherit import akka.actor.typed.Behavior -import akka.actor.typed.internal.ImmutableStashBufferImpl -import akka.actor.typed.internal.MutableStashBufferImpl +import akka.actor.typed.internal.StashBufferImpl +import akka.annotation.DoNotInherit -object ImmutableStashBuffer { - /** - * Create an empty message buffer. - * - * @param capacity the buffer can hold at most this number of messages - * @return an empty message buffer - */ - def apply[T](capacity: Int): ImmutableStashBuffer[T] = - ImmutableStashBufferImpl[T](capacity) - -} - -/** - * A thread safe immutable message buffer that can be used to buffer messages inside actors. - * - * The buffer can hold at most the given `capacity` number of messages. - */ -@DoNotInherit trait ImmutableStashBuffer[T] { - - /** - * Check if the message buffer is empty. - * - * @return if the buffer is empty - */ - def isEmpty: Boolean - - /** - * Check if the message buffer is not empty. - * - * @return if the buffer is not empty - */ - def nonEmpty: Boolean - - /** - * How many elements are in the message buffer. - * - * @return the number of elements in the message buffer - */ - def size: Int - - /** - * @return `true` if no more messages can be added, i.e. size equals the capacity of the stash buffer - */ - def isFull: Boolean - - /** - * Add one element to the end of the message buffer. Note that this class is - * immutable so the returned instance contains the added message. - * - * @param message the message to buffer, must not be `null` - * @return this message buffer - * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. - */ - def stash(message: T): ImmutableStashBuffer[T] - - /** - * Add one element to the end of the message buffer. Note that this class is - * immutable so the returned instance contains the added message. - * - * @param message the message to buffer, must not be `null` - * @return this message buffer - * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. - */ - def :+(message: T): ImmutableStashBuffer[T] = stash(message) - - /** - * Remove the first element of the message buffer. Note that this class is - * immutable so the head element is removed in the returned instance. - * - * @throws `NoSuchElementException` if the buffer is empty - */ - def dropHead(): ImmutableStashBuffer[T] - - /** - * Remove the first `numberOfMessages` of the message buffer. Note that this class is - * immutable so the elements are removed in the returned instance. - */ - def drop(numberOfMessages: Int): ImmutableStashBuffer[T] - - /** - * Return the first element of the message buffer. - * - * @return the first element or throws `NoSuchElementException` if the buffer is empty - * @throws `NoSuchElementException` if the buffer is empty - */ - def head: T - - /** - * Iterate over all elements of the buffer and apply a function to each element. - * - * @param f the function to apply to each element - */ - def foreach(f: T ⇒ Unit): Unit - - /** - * Process all stashed messages with the `behavior` and the returned - * [[Behavior]] from each processed message. - */ - def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] - - /** - * Process `numberOfMessages` of the stashed messages with the `behavior` - * and the returned [[Behavior]] from each processed message. - */ - def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] - -} - -object MutableStashBuffer { +object StashBuffer { /** * Create an empty message buffer. @@ -129,16 +17,19 @@ object MutableStashBuffer { * @param capacity the buffer can hold at most this number of messages * @return an empty message buffer */ - def apply[T](capacity: Int): MutableStashBuffer[T] = - MutableStashBufferImpl[T](capacity) + def apply[T](capacity: Int): StashBuffer[T] = + StashBufferImpl[T](capacity) } /** - * A non thread safe mutable message buffer that can be used to buffer messages inside actors. + * A non thread safe mutable message buffer that can be used to buffer messages inside actors + * and then unstash them. * * The buffer can hold at most the given `capacity` number of messages. + * + * Not for user extension. */ -@DoNotInherit trait MutableStashBuffer[T] { +@DoNotInherit trait StashBuffer[T] { /** * Check if the message buffer is empty. * @@ -170,17 +61,9 @@ object MutableStashBuffer { * * @param message the message to buffer * @return this message buffer - * @throws `StashOverflowException` is thrown if the buffer [[MutableStashBuffer#isFull]]. + * @throws `StashOverflowException` is thrown if the buffer [[StashBuffer#isFull]]. */ - def stash(message: T): MutableStashBuffer[T] - - /** - * Return the first element of the message buffer and removes it. - * - * @return the first element or throws `NoSuchElementException` if the buffer is empty - * @throws `NoSuchElementException` if the buffer is empty - */ - def dropHead(): T + def stash(message: T): StashBuffer[T] /** * Return the first element of the message buffer without removing it. @@ -191,7 +74,8 @@ object MutableStashBuffer { def head: T /** - * Iterate over all elements of the buffer and apply a function to each element. + * Iterate over all elements of the buffer and apply a function to each element, + * without removing them. * * @param f the function to apply to each element */ @@ -199,20 +83,38 @@ object MutableStashBuffer { /** * Process all stashed messages with the `behavior` and the returned - * [[Behavior]] from each processed message. The `MutableStashBuffer` will be - * empty after processing all messages, unless an exception is thrown. + * [[Behavior]] from each processed message. The `StashBuffer` will be + * empty after processing all messages, unless an exception is thrown + * or messages are stashed while unstashing. + * * If an exception is thrown by processing a message a proceeding messages * and the message causing the exception have been removed from the - * `MutableStashBuffer`, but unprocessed messages remain. + * `StashBuffer`, but unprocessed messages remain. + * + * It's allowed to stash messages while unstashing. Those newly added + * messages will not be processed by this call and have to be unstashed + * in another call. */ def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] /** * Process `numberOfMessages` of the stashed messages with the `behavior` * and the returned [[Behavior]] from each processed message. + * + * The purpose of this method, compared to `unstashAll` is to unstash a limited + * number of messages and then send a message to `self` before continuing unstashing + * more. That means that other new messages may arrive in-between and those must + * be stashed to keep the original order of messages. To differentiate between + * unstashed and new incoming messages the unstashed messages can be wrapped + * in another message with the `wrap`. + * * If an exception is thrown by processing a message a proceeding messages * and the message causing the exception have been removed from the - * `MutableStashBuffer`, but unprocessed messages remain. + * `StashBuffer`, but unprocessed messages remain. + * + * It's allowed to stash messages while unstashing. Those newly added + * messages will not be processed by this call and have to be unstashed + * in another call. */ def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md deleted file mode 100644 index f120823a58..0000000000 --- a/akka-docs/src/main/paradox/index-typed.md +++ /dev/null @@ -1,19 +0,0 @@ -# Akka Typed - -@@toc { depth=2 } - -@@@ index - -* [actors](typed/actors.md) -* [coexisting](typed/coexisting.md) -* [actor-lifecycle](typed/actor-lifecycle.md) -* [interaction patterns](typed/interaction-patterns.md) -* [fault-tolerance](typed/fault-tolerance.md) -* [actor-discovery](typed/actor-discovery.md) -* [cluster](typed/cluster.md) -* [cluster-singleton](cluster-singleton.md) -* [cluster-sharding](typed/cluster-sharding.md) -* [persistence](typed/persistence.md) -* [testing](typed/testing.md) - -@@@ diff --git a/akka-docs/src/main/paradox/index.md b/akka-docs/src/main/paradox/index.md index e01f4ee73f..70fb8de5b6 100644 --- a/akka-docs/src/main/paradox/index.md +++ b/akka-docs/src/main/paradox/index.md @@ -8,7 +8,7 @@ * [guide/index](guide/index.md) * [general/index](general/index.md) * [index-actors](index-actors.md) -* [index-typed](index-typed.md) +* [index-typed](typed/index.md) * [index-cluster](index-cluster.md) * [stream/index](stream/index.md) * [index-network](index-network.md) diff --git a/akka-docs/src/main/paradox/typed/index.md b/akka-docs/src/main/paradox/typed/index.md new file mode 100644 index 0000000000..81e3aa90fe --- /dev/null +++ b/akka-docs/src/main/paradox/typed/index.md @@ -0,0 +1,20 @@ +# Akka Typed + +@@toc { depth=2 } + +@@@ index + +* [actors](actors.md) +* [coexisting](coexisting.md) +* [actor-lifecycle](actor-lifecycle.md) +* [interaction patterns](interaction-patterns.md) +* [fault-tolerance](fault-tolerance.md) +* [actor-discovery](actor-discovery.md) +* [stash](stash.md) +* [cluster](cluster.md) +* [cluster-singleton](cluster-singleton.md) +* [cluster-sharding](cluster-sharding.md) +* [persistence](persistence.md) +* [testing](testing.md) + +@@@ diff --git a/akka-docs/src/main/paradox/typed/stash.md b/akka-docs/src/main/paradox/typed/stash.md new file mode 100644 index 0000000000..4fc81a5482 --- /dev/null +++ b/akka-docs/src/main/paradox/typed/stash.md @@ -0,0 +1,46 @@ +# Stash + +Stashing enables an actor to temporarily buffer all or some messages that cannot or should not +be handled using the actor's current behavior. + +A typical example when this is useful is if the actor has too load some initial state or initialize +some resources before it can accept the first real message. Another example is when the actor +is waiting for something to complete before processing next message. + +Let's illustrate these two with an example. It's an actor that is used like a single access point +to a value stored in a database. When it's started it loads current state from the database, and +while waiting for that initial value all incoming messages are stashed. + +When a new state is saved in the database it also stashes incoming messages to make the +processing sequential, one after the other without multiple pending writes. + +Scala +: @@snip [StashDocSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala) { #stashing } + +Java +: @@snip [StashDocTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java) { + #import + #db + #stashing +} + +One important thing to be aware of is that the `StashBuffer` is a buffer and stashed messages will be +kept in memory until they are unstashed (or the actor is stopped and garbage collected). It's recommended +to avoid stashing too many messages to avoid too much memory usage and even risking `OutOfMemoryError` +if many actors are stashing many messages. Therefore the `StashBuffer` is bounded and the `capacity` +of how many messages it can hold must be specified when it's created. + +If you try to stash more messages than the `capacity` a `StashOverflowException` will be thrown. +You can use `StashBuffer.isFull` before stashing a message to avoid that and take other actions, such as +dropping the message. + +When unstashing the buffered messages by calling `unstashAll` the messages will be processed sequentially +in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive +to other new messages until `unstashAll` is completed. That is another reason for keeping the number of +stashed messages low. Actors that hog the message processing thread for too long can result in starvation +of other actors. + +That can be mitigated by using the `StashBuffer.unstash` with `numberOfMessages` parameter and then send a +message to @scala[`ctx.self`]@java[`ctx.getSelf`] before continuing unstashing more. That means that other +new messages may arrive in-between and those must be stashed to keep the original order of messages. It +becomes more complicated, so better keep the number of stashed messages low. diff --git a/akka-stream/src/main/scala/akka/stream/StreamRefExceptions.scala b/akka-stream/src/main/scala/akka/stream/StreamRefExceptions.scala deleted file mode 100644 index e7d60a4898..0000000000 --- a/akka-stream/src/main/scala/akka/stream/StreamRefExceptions.scala +++ /dev/null @@ -1,6 +0,0 @@ -/** - * Copyright (C) 2018 Lightbend Inc. - */ -package akka.stream - -import akka.actor.ActorRef diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala index c1f87777e9..b880b33cba 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala @@ -5,16 +5,22 @@ package akka.testkit.typed import java.util.concurrent.ConcurrentLinkedQueue -import akka.actor.typed.{ ActorRef, Behavior, PostStop, Props, Signal } -import akka.annotation.{ ApiMayChange, InternalApi } -import akka.event.LoggingAdapter - import scala.annotation.tailrec import scala.collection.immutable +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import scala.language.existentials import scala.util.control.Exception.Catcher import scala.util.control.NonFatal -import scala.concurrent.duration.{ Duration, FiniteDuration } -import scala.language.existentials + +import akka.actor.typed.internal.ControlledExecutor +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.PostStop +import akka.actor.typed.Props +import akka.actor.typed.Signal +import akka.annotation.ApiMayChange +import akka.annotation.InternalApi /** * All tracked effects must extend implement this type. It is deliberately @@ -191,6 +197,10 @@ class BehaviorTestkit[T] private (_name: String, _initialBehavior: Behavior[T]) def run(msg: T): Unit = { try { current = Behavior.canonicalize(Behavior.interpretMessage(current, ctx, msg), current, ctx) + ctx.executionContext match { + case controlled: ControlledExecutor ⇒ controlled.runAll() + case _ ⇒ + } } catch handleException }