From 1dfe55fcc37510d3be774aeaab6dc3312b93fd06 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 5 Jul 2019 09:28:07 +0100 Subject: [PATCH] Typed Stash: Create via factory method on Behaviors (#27200) * changing to trait in javadsl --- .../java/jdocs/akka/typed/StashDocTest.java | 33 +- .../typed/scaladsl/StashBufferSpec.scala | 35 +- .../akka/actor/typed/scaladsl/StashSpec.scala | 382 +++++++++--------- .../adapter/GuardianStartupSpec.scala | 1 - .../scala/docs/akka/typed/StashDocSpec.scala | 58 ++- .../typed/internal/StashBufferImpl.scala | 64 +-- .../actor/typed/internal/Supervision.scala | 5 +- .../adapter/GuardianStartupBehavior.scala | 19 +- .../internal/routing/GroupRouterImpl.scala | 4 +- .../akka/actor/typed/javadsl/Behaviors.scala | 16 +- .../actor/typed/javadsl/StashBuffer.scala | 19 +- .../akka/actor/typed/scaladsl/Behaviors.scala | 9 + .../actor/typed/scaladsl/StashBuffer.scala | 16 +- .../main/scala-2.13/akka/compat/Future.scala | 2 +- .../src/main/scala/akka/actor/Stash.scala | 4 +- .../project/migration-guide-2.5.x-2.6.x.md | 1 + .../internal/EventSourcedBehaviorImpl.scala | 2 +- .../typed/internal/StashManagement.scala | 12 +- .../typed/internal/StashStateSpec.scala | 4 +- 19 files changed, 362 insertions(+), 324 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java index 4590fed7a7..b34619fbab 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StashDocTest.java @@ -37,7 +37,7 @@ public class StashDocTest extends JUnitSuite { public static class DataAccess { - static interface Command {} + interface Command {} public static class Save implements Command { public final String payload; @@ -78,28 +78,33 @@ public class StashDocTest extends JUnitSuite { } private final ActorContext context; - private final StashBuffer buffer = StashBuffer.create(100); + private final StashBuffer buffer; private final String id; private final DB db; - private DataAccess(ActorContext context, String id, DB db) { + private DataAccess( + ActorContext context, StashBuffer buffer, String id, DB db) { this.context = context; + this.buffer = buffer; this.id = id; this.db = db; } public static Behavior create(String id, DB db) { return Behaviors.setup( - ctx -> { - ctx.pipeToSelf( - db.load(id), - (value, cause) -> { - if (cause == null) return new InitialState(value); - else return new DBError(asRuntimeException(cause)); - }); + ctx -> + Behaviors.withStash( + 100, + stash -> { + ctx.pipeToSelf( + db.load(id), + (value, cause) -> { + if (cause == null) return new InitialState(value); + else return new DBError(asRuntimeException(cause)); + }); - return new DataAccess(ctx, id, db).init(); - }); + return new DataAccess(ctx, stash, id, db).init(); + })); } private Behavior init() { @@ -108,7 +113,7 @@ public class StashDocTest extends JUnitSuite { InitialState.class, message -> { // now we are ready to handle stashed messages if any - return buffer.unstashAll(context, active(message.value)); + return buffer.unstashAll(active(message.value)); }) .onMessage( DBError.class, @@ -153,7 +158,7 @@ public class StashDocTest extends JUnitSuite { SaveSuccess.class, message -> { replyTo.tell(Done.getInstance()); - return buffer.unstashAll(context, active(state)); + return buffer.unstashAll(active(state)); }) .onMessage( DBError.class, diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala index 39221dcec5..27bd3ac4a8 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashBufferSpec.scala @@ -18,7 +18,7 @@ class StashBufferSpec extends WordSpec with Matchers { "A StashBuffer" must { "answer empty correctly" in { - val buffer = StashBuffer[String](10) + val buffer = StashBuffer[String](context, 10) buffer.isEmpty should ===(true) buffer.nonEmpty should ===(false) buffer.stash("m1") @@ -27,7 +27,7 @@ class StashBufferSpec extends WordSpec with Matchers { } "append and drop" in { - val buffer = StashBuffer[String](10) + val buffer = StashBuffer[String](context, 10) buffer.size should ===(0) buffer.stash("m1") buffer.size should ===(1) @@ -36,12 +36,12 @@ class StashBufferSpec extends WordSpec with Matchers { val m1 = buffer.head m1 should ===("m1") buffer.size should ===(2) - buffer.unstash(context, Behaviors.ignore, 1, identity) + buffer.unstash(Behaviors.ignore, 1, identity) buffer.size should ===(1) m1 should ===("m1") val m2 = buffer.head m2 should ===("m2") - buffer.unstash(context, Behaviors.ignore, 1, identity) + buffer.unstash(Behaviors.ignore, 1, identity) buffer.size should ===(0) intercept[NoSuchElementException] { buffer.head @@ -50,7 +50,7 @@ class StashBufferSpec extends WordSpec with Matchers { } "enforce capacity" in { - val buffer = StashBuffer[String](3) + val buffer = StashBuffer[String](context, 3) buffer.stash("m1") buffer.stash("m2") buffer.stash("m3") @@ -65,21 +65,21 @@ class StashBufferSpec extends WordSpec with Matchers { } "process elements in the right order" in { - val buffer = StashBuffer[String](10) + val buffer = StashBuffer[String](context, 10) buffer.stash("m1") buffer.stash("m2") buffer.stash("m3") val sb1 = new StringBuilder() buffer.foreach(sb1.append(_)) sb1.toString() should ===("m1m2m3") - buffer.unstash(context, Behaviors.ignore, 1, identity) + buffer.unstash(Behaviors.ignore, 1, identity) val sb2 = new StringBuilder() buffer.foreach(sb2.append(_)) sb2.toString() should ===("m2m3") } "unstash to returned behaviors" in { - val buffer = StashBuffer[String](10) + val buffer = StashBuffer[String](context, 10) buffer.stash("m1") buffer.stash("m2") buffer.stash("m3") @@ -96,13 +96,13 @@ class StashBufferSpec extends WordSpec with Matchers { } } - buffer.unstashAll(context, behavior("")) + buffer.unstashAll(behavior("")) valueInbox.expectMessage("m1m2m3") buffer.isEmpty should ===(true) } "undefer returned behaviors when unstashing" in { - val buffer = StashBuffer[String](10) + val buffer = StashBuffer[String](context, 10) buffer.stash("m1") buffer.stash("m2") buffer.stash("m3") @@ -119,13 +119,13 @@ class StashBufferSpec extends WordSpec with Matchers { } } - buffer.unstashAll(context, behavior("")) + buffer.unstashAll(behavior("")) valueInbox.expectMessage("m1m2m3") buffer.isEmpty should ===(true) } "be able to stash while unstashing" in { - val buffer = StashBuffer[String](10) + val buffer = StashBuffer[String](context, 10) buffer.stash("m1") buffer.stash("m2") buffer.stash("m3") @@ -148,16 +148,21 @@ class StashBufferSpec extends WordSpec with Matchers { // It's only supposed to unstash the messages that are in the buffer when // the call is made, not unstash new messages added to the buffer while // unstashing. - val b2 = buffer.unstashAll(context, behavior("")) + val b2 = buffer.unstashAll(behavior("")) valueInbox.expectMessage("m1m3") buffer.size should ===(1) buffer.head should ===("m2") - buffer.unstashAll(context, b2) + buffer.unstashAll(b2) buffer.size should ===(1) buffer.head should ===("m2") } - } + "fail quick on invalid start behavior" in { + val stash = StashBuffer[String](context, 10) + stash.stash("one") + intercept[IllegalArgumentException](stash.unstashAll(Behaviors.unhandled)) + } + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala index 5ac9f33f99..751c50ff6f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala @@ -28,108 +28,107 @@ object AbstractStashSpec { final case class GetStashSize(replyTo: ActorRef[Int]) extends Command val immutableStash: Behavior[Command] = - Behaviors.setup[Command] { _ => - val buffer = StashBuffer[Command](capacity = 10) - - def active(processed: Vector[String]): Behavior[Command] = - Behaviors.receive { (_, cmd) => - cmd match { - case message: Msg => - active(processed :+ message.s) - case GetProcessed(replyTo) => - replyTo ! processed - Behaviors.same - case Stash => - stashing(processed) - case GetStashSize(replyTo) => - replyTo ! 0 - Behaviors.same - case UnstashAll => - Behaviors.unhandled - case Unstash => - Behaviors.unhandled - case u: Unstashed => - throw new IllegalStateException(s"Unexpected $u in active") + Behaviors.setup[Command] { ctx => + Behaviors.withStash(10) { buffer => + def active(processed: Vector[String]): Behavior[Command] = + Behaviors.receive { (_, cmd) => + cmd match { + case message: Msg => + active(processed :+ message.s) + case GetProcessed(replyTo) => + replyTo ! processed + Behaviors.same + case Stash => + stashing(processed) + case GetStashSize(replyTo) => + replyTo ! 0 + Behaviors.same + case UnstashAll => + Behaviors.unhandled + case Unstash => + Behaviors.unhandled + case u: Unstashed => + throw new IllegalStateException(s"Unexpected $u in active") + } } - } - def stashing(processed: Vector[String]): Behavior[Command] = - Behaviors.receive { (context, cmd) => - cmd match { - case message: Msg => - buffer.stash(message) - Behaviors.same - case g: GetProcessed => - buffer.stash(g) - Behaviors.same - case GetStashSize(replyTo) => - replyTo ! buffer.size - Behaviors.same - case UnstashAll => - buffer.unstashAll(context, active(processed)) - case Unstash => - context.log.debug(s"Unstash ${buffer.size}") - if (buffer.isEmpty) - active(processed) - else { - context.self ! Unstash // continue unstashing until buffer is empty - val numberOfMessages = 2 - context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") - buffer.unstash(context, unstashing(processed), numberOfMessages, Unstashed) - } - case Stash => - Behaviors.unhandled - case u: Unstashed => - throw new IllegalStateException(s"Unexpected $u in stashing") + def stashing(processed: Vector[String]): Behavior[Command] = + Behaviors.receive { (context, cmd) => + cmd match { + case message: Msg => + buffer.stash(message) + Behaviors.same + case g: GetProcessed => + buffer.stash(g) + Behaviors.same + case GetStashSize(replyTo) => + replyTo ! buffer.size + Behaviors.same + case UnstashAll => + buffer.unstashAll(active(processed)) + case Unstash => + context.log.debug(s"Unstash ${buffer.size}") + if (buffer.isEmpty) + active(processed) + else { + context.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(unstashing(processed), numberOfMessages, Unstashed) + } + case Stash => + Behaviors.unhandled + case u: Unstashed => + throw new IllegalStateException(s"Unexpected $u in stashing") + } } - } - def unstashing(processed: Vector[String]): Behavior[Command] = - Behaviors.receive { (context, cmd) => - cmd match { - case Unstashed(message: Msg) => - context.log.debug(s"unstashed $message") - unstashing(processed :+ message.s) - case Unstashed(GetProcessed(replyTo)) => - context.log.debug(s"unstashed GetProcessed") - replyTo ! processed - Behaviors.same - case message: Msg => - context.log.debug(s"got $message in unstashing") - buffer.stash(message) - Behaviors.same - case g: GetProcessed => - context.log.debug(s"got GetProcessed in unstashing") - buffer.stash(g) - Behaviors.same - case Stash => - stashing(processed) - case Unstash => - if (buffer.isEmpty) { - context.log.debug(s"unstashing done") - active(processed) - } else { - context.self ! Unstash // continue unstashing until buffer is empty - val numberOfMessages = 2 - context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") - buffer.unstash(context, unstashing(processed), numberOfMessages, Unstashed) - } - case GetStashSize(replyTo) => - replyTo ! buffer.size - Behaviors.same - case UnstashAll => - Behaviors.unhandled - case u: Unstashed => - throw new IllegalStateException(s"Unexpected $u in unstashing") + def unstashing(processed: Vector[String]): Behavior[Command] = + Behaviors.receive { (context, cmd) => + cmd match { + case Unstashed(message: Msg) => + context.log.debug(s"unstashed $message") + unstashing(processed :+ message.s) + case Unstashed(GetProcessed(replyTo)) => + context.log.debug(s"unstashed GetProcessed") + replyTo ! processed + Behaviors.same + case message: Msg => + context.log.debug(s"got $message in unstashing") + buffer.stash(message) + Behaviors.same + case g: GetProcessed => + context.log.debug(s"got GetProcessed in unstashing") + buffer.stash(g) + Behaviors.same + case Stash => + stashing(processed) + case Unstash => + if (buffer.isEmpty) { + context.log.debug(s"unstashing done") + active(processed) + } else { + context.self ! Unstash // continue unstashing until buffer is empty + val numberOfMessages = 2 + context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") + buffer.unstash(unstashing(processed), numberOfMessages, Unstashed) + } + case GetStashSize(replyTo) => + replyTo ! buffer.size + Behaviors.same + case UnstashAll => + Behaviors.unhandled + case u: Unstashed => + throw new IllegalStateException(s"Unexpected $u in unstashing") + } } - } - active(Vector.empty) + active(Vector.empty) + } } - class MutableStash(context: ActorContext[Command]) extends AbstractBehavior[Command] { + class MutableStash(context: ActorContext[Command], buffer: StashBuffer[Command]) extends AbstractBehavior[Command] { - private val buffer = StashBuffer.apply[Command](capacity = 10) private var stashing = false private var processed = Vector.empty[String] @@ -155,7 +154,7 @@ object AbstractStashSpec { this case UnstashAll => stashing = false - buffer.unstashAll(context, this) + buffer.unstashAll(this) case Unstash => if (buffer.isEmpty) { stashing = false @@ -164,7 +163,7 @@ object AbstractStashSpec { context.self ! Unstash // continue unstashing until buffer is empty val numberOfMessages = 2 context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}") - buffer.unstash(context, this, numberOfMessages, Unstashed) + buffer.unstash(this, numberOfMessages, Unstashed) } case Unstashed(message: Msg) => context.log.debug(s"unstashed $message") @@ -192,7 +191,10 @@ class ImmutableStashSpec extends AbstractStashSpec { class MutableStashSpec extends AbstractStashSpec { import AbstractStashSpec._ def testQualifier: String = "mutable behavior" - def behaviorUnderTest: Behavior[Command] = Behaviors.setup(context => new MutableStash(context)) + def behaviorUnderTest: Behavior[Command] = + Behaviors.withStash(10) { stash => + Behaviors.setup(context => new MutableStash(context, stash)) + } } abstract class AbstractStashSpec extends ScalaTestWithActorTestKit with WordSpecLike { @@ -267,52 +269,54 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" Behaviors.same } - private def stashingBehavior(probe: ActorRef[String], withSlowStoppingChild: Option[CountDownLatch] = None) = { + private def stashingBehavior( + probe: ActorRef[String], + withSlowStoppingChild: Option[CountDownLatch] = None): Behavior[String] = { Behaviors.setup[String] { ctx => withSlowStoppingChild.foreach(latch => ctx.spawnAnonymous(slowStoppingChild(latch))) - val stash = StashBuffer[String](10) + Behaviors.withStash(10) { stash => + def unstashing(n: Int): Behavior[String] = + Behaviors + .receiveMessage[String] { + case "stash" => + probe.ref ! s"unstashing-$n" + unstashing(n + 1) + case "stash-fail" => + probe.ref ! s"stash-fail-$n" + throw TestException("unstash-fail") + case "get-current" => + probe.ref ! s"current-$n" + Behaviors.same + case "get-stash-size" => + probe.ref ! s"stash-size-${stash.size}" + Behaviors.same + case "unstash" => + // when testing resume + stash.unstashAll(unstashing(n)) + } + .receiveSignal { + case (_, PreRestart) => + probe.ref ! s"pre-restart-$n" + Behaviors.same + case (_, PostStop) => + probe.ref ! s"post-stop-$n" + Behaviors.same + } - def unstashing(n: Int): Behavior[String] = - Behaviors - .receiveMessage[String] { - case "stash" => - probe.ref ! s"unstashing-$n" - unstashing(n + 1) - case "stash-fail" => - probe.ref ! s"stash-fail-$n" - throw TestException("unstash-fail") - case "get-current" => - probe.ref ! s"current-$n" - Behaviors.same - case "get-stash-size" => - probe.ref ! s"stash-size-${stash.size}" - Behaviors.same - case "unstash" => - // when testing resume - stash.unstashAll(ctx, unstashing(n)) - } - .receiveSignal { - case (_, PreRestart) => - probe.ref ! s"pre-restart-$n" - Behaviors.same - case (_, PostStop) => - probe.ref ! s"post-stop-$n" - Behaviors.same - } - - Behaviors.receiveMessage[String] { - case msg if msg.startsWith("stash") => - stash.stash(msg) - Behaviors.same - case "unstash" => - stash.unstashAll(ctx, unstashing(0)) - case "get-current" => - probe.ref ! s"current-00" - Behaviors.same - case "get-stash-size" => - probe.ref ! s"stash-size-${stash.size}" - Behaviors.same + Behaviors.receiveMessage[String] { + case msg if msg.startsWith("stash") => + stash.stash(msg) + Behaviors.same + case "unstash" => + stash.unstashAll(unstashing(0)) + case "get-current" => + probe.ref ! s"current-00" + Behaviors.same + case "get-stash-size" => + probe.ref ! s"stash-size-${stash.size}" + Behaviors.same + } } } } @@ -326,11 +330,11 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" val probe = TestProbe[String]() // unstashing is inside setup val ref = spawn(Behaviors.receive[String] { - case (ctx, "unstash") => - val stash = StashBuffer[String](10) - stash.stash("one") - stash.unstashAll(ctx, Behaviors.same) - + case (_, "unstash") => + Behaviors.withStash(10) { stash => + stash.stash("one") + stash.unstashAll(Behaviors.same) + } case (_, msg) => probe.ref ! msg Behaviors.same @@ -344,14 +348,15 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" val probe = TestProbe[String]() // unstashing is inside setup val ref = spawn(Behaviors.receivePartial[String] { - case (ctx, "unstash") => - val stash = StashBuffer[String](10) - stash.stash("one") - stash.stash("two") - stash.unstashAll(ctx, Behaviors.receiveMessage { msg => - probe.ref ! msg - Behaviors.same - }) + case (_, "unstash") => + Behaviors.withStash(10) { stash => + stash.stash("one") + stash.stash("two") + stash.unstashAll(Behaviors.receiveMessage { msg => + probe.ref ! msg + Behaviors.same + }) + } }) ref ! "unstash" @@ -371,10 +376,10 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" Behaviors .supervise(Behaviors.receivePartial[String] { case (ctx, "unstash") => - val stash = StashBuffer[String](10) - stash.stash("one") - stash.unstashAll(ctx, Behaviors.same) - + Behaviors.withStash(10) { stash => + stash.stash("one") + stash.unstashAll(Behaviors.same) + } case (_, msg) => probe.ref ! msg Behaviors.same @@ -394,13 +399,14 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" Behaviors .supervise(Behaviors.receivePartial[String] { case (ctx, "unstash") => - val stash = StashBuffer[String](10) - stash.stash("one") - stash.stash("two") - stash.unstashAll(ctx, Behaviors.receiveMessage { msg => - probe.ref ! msg - Behaviors.same - }) + Behaviors.withStash(10) { stash => + stash.stash("one") + stash.stash("two") + stash.unstashAll(Behaviors.receiveMessage { msg => + probe.ref ! msg + Behaviors.same + }) + } }) .onFailure[TestException](SupervisorStrategy.stop)) @@ -558,20 +564,22 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" "be possible in combination with setup" in { val probe = TestProbe[String]() - val ref = spawn(Behaviors.setup[String] { _ => - val stash = StashBuffer[String](10) - stash.stash("one") + val ref = spawn(Behaviors.setup[String] { ctx => + Behaviors.withStash(10) { stash => + stash.stash("one") - // unstashing is inside setup - Behaviors.receiveMessage { - case "unstash" => - Behaviors.setup[String] { ctx => - stash.unstashAll(ctx, Behaviors.same) - } - case msg => - probe.ref ! msg - Behaviors.same + // unstashing is inside setup + Behaviors.receiveMessage { + case "unstash" => + Behaviors.setup[String] { ctx => + stash.unstashAll(Behaviors.same) + } + case msg => + probe.ref ! msg + Behaviors.same + } } + }) ref ! "unstash" @@ -580,8 +588,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" "deal with unhandled the same way as normal unhandled" in { val probe = TestProbe[String]() - val ref = spawn(Behaviors.setup[String] { ctx => - val stash = StashBuffer[String](10) + val ref = spawn(Behaviors.withStash[String](10) { stash => stash.stash("unhandled") stash.stash("handled") stash.stash("handled") @@ -598,7 +605,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" Behaviors.receiveMessage { case "unstash" => - stash.unstashAll(ctx, unstashing(1)) + stash.unstashAll(unstashing(1)) } }) @@ -613,21 +620,14 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage("handled 4") } - "fail quick on invalid start behavior" in { - val stash = StashBuffer[String](10) - stash.stash("one") - intercept[IllegalArgumentException](stash.unstashAll(null, Behaviors.unhandled)) - } - "deal with initial stop" in { val probe = TestProbe[Any] - val ref = spawn(Behaviors.setup[String] { ctx => - val stash = StashBuffer[String](10) + val ref = spawn(Behaviors.withStash[String](10) { stash => stash.stash("one") Behaviors.receiveMessage { case "unstash" => - stash.unstashAll(ctx, Behaviors.stopped) + stash.unstashAll(Behaviors.stopped) } }) @@ -639,14 +639,13 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" val probe = TestProbe[Any] import akka.actor.typed.scaladsl.adapter._ untypedSys.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter]) - val ref = spawn(Behaviors.setup[String] { ctx => - val stash = StashBuffer[String](10) + val ref = spawn(Behaviors.withStash[String](10) { stash => stash.stash("one") stash.stash("two") Behaviors.receiveMessage { case "unstash" => - stash.unstashAll(ctx, Behaviors.receiveMessage { + stash.unstashAll(Behaviors.receiveMessage { case unstashed => probe.ref ! unstashed Behaviors.stopped @@ -663,14 +662,13 @@ class UnstashingSpec extends ScalaTestWithActorTestKit(""" "work with initial same" in { val probe = TestProbe[Any] - val ref = spawn(Behaviors.setup[String] { ctx => - val stash = StashBuffer[String](10) + val ref = spawn(Behaviors.withStash[String](10) { stash => stash.stash("one") stash.stash("two") Behaviors.receiveMessage { case "unstash" => - stash.unstashAll(ctx, Behaviors.same) + stash.unstashAll(Behaviors.same) case msg => probe.ref ! msg Behaviors.same diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/GuardianStartupSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/GuardianStartupSpec.scala index c53542539e..c00c4d168d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/GuardianStartupSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/GuardianStartupSpec.scala @@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit import akka.actor.ActorSystemImpl import akka.actor.testkit.typed.scaladsl.ActorTestKit -import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import org.scalatest.Matchers diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala index 92770f8b16..acff845c4e 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/StashDocSpec.scala @@ -17,8 +17,6 @@ import org.scalatest.WordSpecLike object StashDocSpec { // #stashing - import akka.actor.typed.scaladsl.StashBuffer - trait DB { def save(id: String, value: String): Future[Done] def load(id: String): Future[String] @@ -33,15 +31,13 @@ object StashDocSpec { private final case class DBError(cause: Throwable) extends Command def behavior(id: String, db: DB): Behavior[Command] = - Behaviors.setup[Command] { context => - val buffer = StashBuffer[Command](capacity = 100) - - def init(): Behavior[Command] = - Behaviors.receive[Command] { (context, message) => - message match { + Behaviors.withStash(100) { buffer => + Behaviors.setup[Command] { context => + def init(): Behavior[Command] = + Behaviors.receiveMessage[Command] { case InitialState(value) => // now we are ready to handle stashed messages if any - buffer.unstashAll(context, active(value)) + buffer.unstashAll(active(value)) case DBError(cause) => throw cause case other => @@ -49,43 +45,41 @@ object StashDocSpec { buffer.stash(other) Behaviors.same } - } - def active(state: String): Behavior[Command] = - Behaviors.receive { (context, message) => - message match { - case Get(replyTo) => - replyTo ! state - Behaviors.same - case Save(value, replyTo) => - context.pipeToSelf(db.save(id, value)) { - case Success(_) => SaveSuccess - case Failure(cause) => DBError(cause) - } - saving(value, replyTo) + def active(state: String): Behavior[Command] = + Behaviors.receive { (context, message) => + message match { + case Get(replyTo) => + replyTo ! state + Behaviors.same + case Save(value, replyTo) => + context.pipeToSelf(db.save(id, value)) { + case Success(_) => SaveSuccess + case Failure(cause) => DBError(cause) + } + saving(value, replyTo) + } } - } - def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = - Behaviors.receive[Command] { (context, message) => - message match { + def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = + Behaviors.receiveMessage[Command] { case SaveSuccess => replyTo ! Done - buffer.unstashAll(context, active(state)) + buffer.unstashAll(active(state)) case DBError(cause) => throw cause case other => buffer.stash(other) Behaviors.same } + + context.pipeToSelf(db.load(id)) { + case Success(value) => InitialState(value) + case Failure(cause) => DBError(cause) } - context.pipeToSelf(db.load(id)) { - case Success(value) => InitialState(value) - case Failure(cause) => DBError(cause) + init() } - - init() } } // #stashing diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index 4e4c8d9a20..3423a172c3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -16,8 +16,9 @@ import akka.actor.typed.Signal import akka.actor.typed.TypedActorContext import akka.actor.typed.javadsl import akka.actor.typed.scaladsl -import akka.annotation.InternalApi -import akka.util.ConstantFun +import akka.actor.typed.scaladsl.ActorContext +import akka.annotation.{ InternalApi, InternalStableApi } +import akka.util.{ unused, ConstantFun } /** * INTERNAL API @@ -27,14 +28,15 @@ import akka.util.ConstantFun def apply(f: T => Unit): Unit = f(message) } - def apply[T](capacity: Int): StashBufferImpl[T] = - new StashBufferImpl(capacity, null, null) + def apply[T](ctx: ActorContext[T], capacity: Int): StashBufferImpl[T] = + new StashBufferImpl(ctx, capacity, null, null) } /** * INTERNAL API */ @InternalApi private[akka] final class StashBufferImpl[T] private ( + ctx: ActorContext[T], val capacity: Int, private var _first: StashBufferImpl.Node[T], private var _last: StashBufferImpl.Node[T]) @@ -60,7 +62,7 @@ import akka.util.ConstantFun s"Couldn't add [${message.getClass.getName}] " + s"because stash with capacity [$capacity] is full") - val node = new Node(null, message) + val node = createNode(message, ctx) if (isEmpty) { _first = node _last = node @@ -68,12 +70,19 @@ import akka.util.ConstantFun _last.next = node _last = node } + _size += 1 this } - private def dropHead(): T = { - val message = head + @InternalStableApi + private def createNode(message: T, @unused ctx: scaladsl.ActorContext[T]): Node[T] = { + new Node(null, message) + } + + @InternalStableApi + private def dropHeadForUnstash(): Node[T] = { + val message = rawHead _first = _first.next _size -= 1 if (isEmpty) @@ -82,6 +91,10 @@ import akka.util.ConstantFun message } + private def rawHead: Node[T] = + if (nonEmpty) _first + else throw new NoSuchElementException("head of empty buffer") + override def head: T = if (nonEmpty) _first.message else throw new NoSuchElementException("head of empty buffer") @@ -96,23 +109,23 @@ import akka.util.ConstantFun override def forEach(f: Consumer[T]): Unit = foreach(f.accept) - override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = - unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) + override def unstashAll(behavior: Behavior[T]): Behavior[T] = { + val behav = unstash(behavior, size, ConstantFun.scalaIdentityFunction[T]) + stashCleared(ctx) + behav + } - override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = - unstashAll(ctx.asScala, behavior) - - override def unstash( - ctx: scaladsl.ActorContext[T], - behavior: Behavior[T], - numberOfMessages: Int, - wrap: T => T): Behavior[T] = { + override def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T] = { if (isEmpty) behavior // optimization else { val iter = new Iterator[T] { override def hasNext: Boolean = StashBufferImpl.this.nonEmpty - override def next(): T = wrap(StashBufferImpl.this.dropHead()) + override def next(): T = { + val next = StashBufferImpl.this.dropHeadForUnstash() + unstashed(ctx, next) + wrap(next.message) + } }.take(numberOfMessages) interpretUnstashedMessages(behavior, ctx, iter) } @@ -178,15 +191,18 @@ import akka.util.ConstantFun scalaCtx.system.deadLetters ! DeadLetter(msg, untypedDeadLetters, ctx.asScala.self.toUntyped)) } - override def unstash( - ctx: javadsl.ActorContext[T], - behavior: Behavior[T], - numberOfMessages: Int, - wrap: JFunction[T, T]): Behavior[T] = - unstash(ctx.asScala, behavior, numberOfMessages, x => wrap.apply(x)) + override def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] = + unstash(behavior, numberOfMessages, x => wrap.apply(x)) override def toString: String = s"StashBuffer($size/$capacity)" + + @InternalStableApi + private[akka] def unstashed(@unused ctx: ActorContext[T], @unused node: Node[T]): Unit = () + + @InternalStableApi + private def stashCleared(@unused ctx: ActorContext[T]): Unit = () + } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 7937a4a846..f4cff625c5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -330,7 +330,8 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior val stashCapacity = if (strategy.stashCapacity >= 0) strategy.stashCapacity else ctx.asScala.system.settings.RestartStashCapacity - restartingInProgress = OptionVal.Some((StashBuffer[Any](stashCapacity), childrenToStop)) + restartingInProgress = OptionVal.Some( + (StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], stashCapacity), childrenToStop)) strategy match { case backoff: Backoff => @@ -364,7 +365,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior case OptionVal.None => newBehavior case OptionVal.Some((stashBuffer, _)) => restartingInProgress = OptionVal.None - stashBuffer.unstashAll(ctx.asScala, newBehavior.unsafeCast) + stashBuffer.unstashAll(newBehavior.unsafeCast) } nextBehavior.narrow } catch handleException(ctx, signalRestart = { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala index cb294e9266..8dd7b59317 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala @@ -8,9 +8,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor import akka.actor.typed.Signal import akka.actor.typed.TypedActorContext -import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.StashBuffer +import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors, StashOverflowException } import akka.annotation.InternalApi /** @@ -33,17 +31,22 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav import GuardianStartupBehavior.Start - private val stash = StashBuffer[Any](1000) + private var tempStash: List[Any] = Nil override def onMessage(msg: Any): Behavior[Any] = msg match { case Start => // ctx is not available initially so we cannot use it until here - Behaviors.setup(ctx => - stash - .unstashAll(ctx, Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any]))) + Behaviors.withStash[Any](1000) { stash => + tempStash.reverse.foreach(stash.stash) + tempStash = null + stash.unstashAll(Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any])) + } case other => - stash.stash(other) + tempStash = other :: tempStash + if (tempStash.size > 1000) { + throw new StashOverflowException("Guardian Behavior did not receive start and buffer is full.") + } this } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala index b8a73e5ab9..7215c28e05 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala @@ -49,14 +49,14 @@ private final class InitialGroupRouterImpl[T]( // messages to a router ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing]) - private val stash = StashBuffer[T](capacity = 10000) + private val stash = StashBuffer[T](ctx, capacity = 10000) def onMessage(msg: T): Behavior[T] = msg match { case serviceKey.Listing(update) => // we don't need to watch, because receptionist already does that routingLogic.routeesUpdated(update) val activeGroupRouter = new GroupRouterImpl[T](ctx, serviceKey, routingLogic, update.isEmpty) - stash.unstashAll(ctx, activeGroupRouter) + stash.unstashAll(activeGroupRouter) case msg: T @unchecked => import akka.actor.typed.scaladsl.adapter._ if (!stash.isFull) stash.stash(msg) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index e3f6916faf..91570f7548 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -8,7 +8,13 @@ import java.util.Collections import java.util.function.{ Supplier, Function => JFunction } import akka.actor.typed._ -import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl, WithMdcBehaviorInterceptor } +import akka.actor.typed.internal.{ + BehaviorImpl, + StashBufferImpl, + Supervisor, + TimerSchedulerImpl, + WithMdcBehaviorInterceptor +} import akka.japi.function.{ Effect, Function2 => JapiFunction2 } import akka.japi.pf.PFBuilder import akka.util.unused @@ -40,6 +46,14 @@ object Behaviors { def setup[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] = BehaviorImpl.DeferredBehavior(ctx => factory.apply(ctx.asJava)) + /** + * Support for stashing messages to unstash at a later timej. + */ + def withStash[T](capacity: Int, factory: java.util.function.Function[StashBuffer[T], Behavior[T]]): Behavior[T] = + setup(ctx => { + factory(StashBufferImpl[T](ctx.asScala, capacity)) + }) + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala index b8763a73fc..768b6813e6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/StashBuffer.scala @@ -8,22 +8,9 @@ import java.util.function.Consumer import java.util.function.{ Function => JFunction } import akka.actor.typed.Behavior -import akka.actor.typed.internal.StashBufferImpl import akka.actor.typed.scaladsl import akka.annotation.DoNotInherit -object StashBuffer { - - /** - * Create an empty message buffer. - * - * @param capacity the buffer can hold at most this number of messages - * @return an empty message buffer - */ - def create[T](capacity: Int): StashBuffer[T] = - StashBufferImpl[T](capacity) -} - /** * A non thread safe mutable message buffer that can be used to buffer messages inside actors * and then unstash them. @@ -32,7 +19,7 @@ object StashBuffer { * * Not for user extension. */ -@DoNotInherit abstract class StashBuffer[T] { +@DoNotInherit trait StashBuffer[T] { /** * Check if the message buffer is empty. @@ -103,7 +90,7 @@ object StashBuffer { * * The `behavior` passed to `unstashAll` must not be `unhandled`. */ - def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] + def unstashAll(behavior: Behavior[T]): Behavior[T] /** * Process `numberOfMessages` of the stashed messages with the `behavior` @@ -126,7 +113,7 @@ object StashBuffer { * * The `behavior` passed to `unstash` must not be `unhandled`. */ - def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] + def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index f86e53783b..6cce00e0f4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -29,6 +29,15 @@ object Behaviors { def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] = BehaviorImpl.DeferredBehavior(factory) + /** + * Support for stashing messages to unstash at a later timej. + */ + def withStash[T](capacity: Int)(factory: StashBuffer[T] => Behavior[T]): Behavior[T] = + setup(ctx => { + val stash = StashBuffer[T](ctx, capacity) + factory(stash) + }) + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala index a142a2fe02..9f36ac21c5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/StashBuffer.scala @@ -6,18 +6,22 @@ package akka.actor.typed.scaladsl import akka.actor.typed.Behavior import akka.actor.typed.internal.StashBufferImpl -import akka.annotation.DoNotInherit +import akka.annotation.{ DoNotInherit, InternalApi } -object StashBuffer { +/** + * INTERNAL API + */ +@InternalApi private[akka] object StashBuffer { /** + * INTERNAL API * Create an empty message buffer. * * @param capacity the buffer can hold at most this number of messages * @return an empty message buffer */ - def apply[T](capacity: Int): StashBuffer[T] = - StashBufferImpl[T](capacity) + @InternalApi private[akka] def apply[T](ctx: ActorContext[T], capacity: Int): StashBuffer[T] = + StashBufferImpl[T](ctx, capacity) } /** @@ -97,7 +101,7 @@ object StashBuffer { * * The initial `behavior` passed to `unstashAll` must not be `unhandled`. */ - def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T] + def unstashAll(behavior: Behavior[T]): Behavior[T] /** * Process `numberOfMessages` of the stashed messages with the `behavior` @@ -120,7 +124,7 @@ object StashBuffer { * * The `behavior` passed to `unstash` must not be `unhandled`. */ - def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T] + def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T] } diff --git a/akka-actor/src/main/scala-2.13/akka/compat/Future.scala b/akka-actor/src/main/scala-2.13/akka/compat/Future.scala index 8144c5a674..996b37a951 100644 --- a/akka-actor/src/main/scala-2.13/akka/compat/Future.scala +++ b/akka-actor/src/main/scala-2.13/akka/compat/Future.scala @@ -12,7 +12,7 @@ import scala.collection.immutable * INTERNAL API * * Compatibility wrapper for `scala.concurrent.Future` to be able to compile the same code - * against Scala 2.12, 2.13 + * against Scala 2.12, 2.13 * * Remove these classes as soon as support for Scala 2.12 is dropped! */ diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 4fed3d955e..21307a1993 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -5,8 +5,8 @@ package akka.actor import scala.collection.immutable - import akka.AkkaException +import akka.annotation.InternalStableApi import akka.dispatch.{ DequeBasedMessageQueueSemantics, Envelope, @@ -230,6 +230,7 @@ private[akka] trait StashSupport { * @param filterPredicate only stashed messages selected by this predicate are * prepended to the mailbox. */ + @InternalStableApi private[akka] def unstashAll(filterPredicate: Any => Boolean): Unit = { try { val i = theStash.reverseIterator.filter(envelope => filterPredicate(envelope.message)) @@ -244,6 +245,7 @@ private[akka] trait StashSupport { * * Clears the stash and and returns all envelopes that have not been unstashed. */ + @InternalStableApi private[akka] def clearStash(): Vector[Envelope] = { val stashed = theStash theStash = Vector.empty[Envelope] diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 8d7fa471f1..ad5122f177 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -392,6 +392,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter]. `interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter. * `Behavior.orElse` has been removed because it wasn't safe together with `narrow`. +* `StashBuffer`s are now created with `Behaviors.withStash` rather than instantiating directly #### Akka Typed Stream API changes diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index d36df67542..c68e08ca2b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -84,7 +84,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse("")) // stashState outside supervise because StashState should survive restarts due to persist failures - val stashState = new StashState(settings) + val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings) val actualSignalHandler: PartialFunction[(State, Signal), Unit] = signalHandler.orElse { // default signal handler is always the fallback diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala index 7c659bf17b..56e8a661cc 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala @@ -72,7 +72,7 @@ private[akka] trait StashManagement[C, E, S] { stashState.decrementUnstashAllProgress() - buffer.unstash(setup.context, behavior, 1, ConstantFun.scalaIdentityFunction) + buffer.unstash(behavior, 1, ConstantFun.scalaIdentityFunction) } else behavior } @@ -123,10 +123,10 @@ private[akka] trait StashManagement[C, E, S] { /** INTERNAL API: stash buffer state in order to survive restart of internal behavior */ @InternalApi -private[akka] class StashState(settings: EventSourcedSettings) { +private[akka] class StashState(ctx: ActorContext[InternalProtocol], settings: EventSourcedSettings) { - private var _internalStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(settings.stashCapacity) - private var _userStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(settings.stashCapacity) + private var _internalStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(ctx, settings.stashCapacity) + private var _userStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(ctx, settings.stashCapacity) private var unstashAllInProgress = 0 def internalStashBuffer: StashBuffer[InternalProtocol] = _internalStashBuffer @@ -134,8 +134,8 @@ private[akka] class StashState(settings: EventSourcedSettings) { def userStashBuffer: StashBuffer[InternalProtocol] = _userStashBuffer def clearStashBuffers(): Unit = { - _internalStashBuffer = StashBuffer(settings.stashCapacity) - _userStashBuffer = StashBuffer(settings.stashCapacity) + _internalStashBuffer = StashBuffer(ctx, settings.stashCapacity) + _userStashBuffer = StashBuffer(ctx, settings.stashCapacity) unstashAllInProgress = 0 } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala index a6c45e9f81..3351488af9 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashStateSpec.scala @@ -38,8 +38,8 @@ class StashStateSpec extends ScalaTestWithActorTestKit with WordSpecLike { def apply(probe: TestProbe[Int]): Behavior[InternalProtocol] = { val settings = dummySettings() - Behaviors.setup[InternalProtocol] { _ => - val stashState = new StashState(settings) + Behaviors.setup[InternalProtocol] { ctx => + val stashState = new StashState(ctx, settings) Behaviors .receiveMessagePartial[InternalProtocol] { case RecoveryPermitGranted =>