Typed Stash: Create via factory method on Behaviors (#27200)

* changing to trait in javadsl
This commit is contained in:
Christopher Batey 2019-07-05 09:28:07 +01:00 committed by Patrik Nordwall
parent 72680e93bf
commit 1dfe55fcc3
19 changed files with 362 additions and 324 deletions

View file

@ -37,7 +37,7 @@ public class StashDocTest extends JUnitSuite {
public static class DataAccess {
static interface Command {}
interface Command {}
public static class Save implements Command {
public final String payload;
@ -78,28 +78,33 @@ public class StashDocTest extends JUnitSuite {
}
private final ActorContext<Command> context;
private final StashBuffer<Command> buffer = StashBuffer.create(100);
private final StashBuffer<Command> buffer;
private final String id;
private final DB db;
private DataAccess(ActorContext<Command> context, String id, DB db) {
private DataAccess(
ActorContext<Command> context, StashBuffer<Command> buffer, String id, DB db) {
this.context = context;
this.buffer = buffer;
this.id = id;
this.db = db;
}
public static Behavior<Command> create(String id, DB db) {
return Behaviors.setup(
ctx -> {
ctx.pipeToSelf(
db.load(id),
(value, cause) -> {
if (cause == null) return new InitialState(value);
else return new DBError(asRuntimeException(cause));
});
ctx ->
Behaviors.withStash(
100,
stash -> {
ctx.pipeToSelf(
db.load(id),
(value, cause) -> {
if (cause == null) return new InitialState(value);
else return new DBError(asRuntimeException(cause));
});
return new DataAccess(ctx, id, db).init();
});
return new DataAccess(ctx, stash, id, db).init();
}));
}
private Behavior<Command> init() {
@ -108,7 +113,7 @@ public class StashDocTest extends JUnitSuite {
InitialState.class,
message -> {
// now we are ready to handle stashed messages if any
return buffer.unstashAll(context, active(message.value));
return buffer.unstashAll(active(message.value));
})
.onMessage(
DBError.class,
@ -153,7 +158,7 @@ public class StashDocTest extends JUnitSuite {
SaveSuccess.class,
message -> {
replyTo.tell(Done.getInstance());
return buffer.unstashAll(context, active(state));
return buffer.unstashAll(active(state));
})
.onMessage(
DBError.class,

View file

@ -18,7 +18,7 @@ class StashBufferSpec extends WordSpec with Matchers {
"A StashBuffer" must {
"answer empty correctly" in {
val buffer = StashBuffer[String](10)
val buffer = StashBuffer[String](context, 10)
buffer.isEmpty should ===(true)
buffer.nonEmpty should ===(false)
buffer.stash("m1")
@ -27,7 +27,7 @@ class StashBufferSpec extends WordSpec with Matchers {
}
"append and drop" in {
val buffer = StashBuffer[String](10)
val buffer = StashBuffer[String](context, 10)
buffer.size should ===(0)
buffer.stash("m1")
buffer.size should ===(1)
@ -36,12 +36,12 @@ class StashBufferSpec extends WordSpec with Matchers {
val m1 = buffer.head
m1 should ===("m1")
buffer.size should ===(2)
buffer.unstash(context, Behaviors.ignore, 1, identity)
buffer.unstash(Behaviors.ignore, 1, identity)
buffer.size should ===(1)
m1 should ===("m1")
val m2 = buffer.head
m2 should ===("m2")
buffer.unstash(context, Behaviors.ignore, 1, identity)
buffer.unstash(Behaviors.ignore, 1, identity)
buffer.size should ===(0)
intercept[NoSuchElementException] {
buffer.head
@ -50,7 +50,7 @@ class StashBufferSpec extends WordSpec with Matchers {
}
"enforce capacity" in {
val buffer = StashBuffer[String](3)
val buffer = StashBuffer[String](context, 3)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
@ -65,21 +65,21 @@ class StashBufferSpec extends WordSpec with Matchers {
}
"process elements in the right order" in {
val buffer = StashBuffer[String](10)
val buffer = StashBuffer[String](context, 10)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
val sb1 = new StringBuilder()
buffer.foreach(sb1.append(_))
sb1.toString() should ===("m1m2m3")
buffer.unstash(context, Behaviors.ignore, 1, identity)
buffer.unstash(Behaviors.ignore, 1, identity)
val sb2 = new StringBuilder()
buffer.foreach(sb2.append(_))
sb2.toString() should ===("m2m3")
}
"unstash to returned behaviors" in {
val buffer = StashBuffer[String](10)
val buffer = StashBuffer[String](context, 10)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
@ -96,13 +96,13 @@ class StashBufferSpec extends WordSpec with Matchers {
}
}
buffer.unstashAll(context, behavior(""))
buffer.unstashAll(behavior(""))
valueInbox.expectMessage("m1m2m3")
buffer.isEmpty should ===(true)
}
"undefer returned behaviors when unstashing" in {
val buffer = StashBuffer[String](10)
val buffer = StashBuffer[String](context, 10)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
@ -119,13 +119,13 @@ class StashBufferSpec extends WordSpec with Matchers {
}
}
buffer.unstashAll(context, behavior(""))
buffer.unstashAll(behavior(""))
valueInbox.expectMessage("m1m2m3")
buffer.isEmpty should ===(true)
}
"be able to stash while unstashing" in {
val buffer = StashBuffer[String](10)
val buffer = StashBuffer[String](context, 10)
buffer.stash("m1")
buffer.stash("m2")
buffer.stash("m3")
@ -148,16 +148,21 @@ class StashBufferSpec extends WordSpec with Matchers {
// It's only supposed to unstash the messages that are in the buffer when
// the call is made, not unstash new messages added to the buffer while
// unstashing.
val b2 = buffer.unstashAll(context, behavior(""))
val b2 = buffer.unstashAll(behavior(""))
valueInbox.expectMessage("m1m3")
buffer.size should ===(1)
buffer.head should ===("m2")
buffer.unstashAll(context, b2)
buffer.unstashAll(b2)
buffer.size should ===(1)
buffer.head should ===("m2")
}
}
"fail quick on invalid start behavior" in {
val stash = StashBuffer[String](context, 10)
stash.stash("one")
intercept[IllegalArgumentException](stash.unstashAll(Behaviors.unhandled))
}
}
}

View file

@ -28,108 +28,107 @@ object AbstractStashSpec {
final case class GetStashSize(replyTo: ActorRef[Int]) extends Command
val immutableStash: Behavior[Command] =
Behaviors.setup[Command] { _ =>
val buffer = StashBuffer[Command](capacity = 10)
def active(processed: Vector[String]): Behavior[Command] =
Behaviors.receive { (_, cmd) =>
cmd match {
case message: Msg =>
active(processed :+ message.s)
case GetProcessed(replyTo) =>
replyTo ! processed
Behaviors.same
case Stash =>
stashing(processed)
case GetStashSize(replyTo) =>
replyTo ! 0
Behaviors.same
case UnstashAll =>
Behaviors.unhandled
case Unstash =>
Behaviors.unhandled
case u: Unstashed =>
throw new IllegalStateException(s"Unexpected $u in active")
Behaviors.setup[Command] { ctx =>
Behaviors.withStash(10) { buffer =>
def active(processed: Vector[String]): Behavior[Command] =
Behaviors.receive { (_, cmd) =>
cmd match {
case message: Msg =>
active(processed :+ message.s)
case GetProcessed(replyTo) =>
replyTo ! processed
Behaviors.same
case Stash =>
stashing(processed)
case GetStashSize(replyTo) =>
replyTo ! 0
Behaviors.same
case UnstashAll =>
Behaviors.unhandled
case Unstash =>
Behaviors.unhandled
case u: Unstashed =>
throw new IllegalStateException(s"Unexpected $u in active")
}
}
}
def stashing(processed: Vector[String]): Behavior[Command] =
Behaviors.receive { (context, cmd) =>
cmd match {
case message: Msg =>
buffer.stash(message)
Behaviors.same
case g: GetProcessed =>
buffer.stash(g)
Behaviors.same
case GetStashSize(replyTo) =>
replyTo ! buffer.size
Behaviors.same
case UnstashAll =>
buffer.unstashAll(context, active(processed))
case Unstash =>
context.log.debug(s"Unstash ${buffer.size}")
if (buffer.isEmpty)
active(processed)
else {
context.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(context, unstashing(processed), numberOfMessages, Unstashed)
}
case Stash =>
Behaviors.unhandled
case u: Unstashed =>
throw new IllegalStateException(s"Unexpected $u in stashing")
def stashing(processed: Vector[String]): Behavior[Command] =
Behaviors.receive { (context, cmd) =>
cmd match {
case message: Msg =>
buffer.stash(message)
Behaviors.same
case g: GetProcessed =>
buffer.stash(g)
Behaviors.same
case GetStashSize(replyTo) =>
replyTo ! buffer.size
Behaviors.same
case UnstashAll =>
buffer.unstashAll(active(processed))
case Unstash =>
context.log.debug(s"Unstash ${buffer.size}")
if (buffer.isEmpty)
active(processed)
else {
context.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(unstashing(processed), numberOfMessages, Unstashed)
}
case Stash =>
Behaviors.unhandled
case u: Unstashed =>
throw new IllegalStateException(s"Unexpected $u in stashing")
}
}
}
def unstashing(processed: Vector[String]): Behavior[Command] =
Behaviors.receive { (context, cmd) =>
cmd match {
case Unstashed(message: Msg) =>
context.log.debug(s"unstashed $message")
unstashing(processed :+ message.s)
case Unstashed(GetProcessed(replyTo)) =>
context.log.debug(s"unstashed GetProcessed")
replyTo ! processed
Behaviors.same
case message: Msg =>
context.log.debug(s"got $message in unstashing")
buffer.stash(message)
Behaviors.same
case g: GetProcessed =>
context.log.debug(s"got GetProcessed in unstashing")
buffer.stash(g)
Behaviors.same
case Stash =>
stashing(processed)
case Unstash =>
if (buffer.isEmpty) {
context.log.debug(s"unstashing done")
active(processed)
} else {
context.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(context, unstashing(processed), numberOfMessages, Unstashed)
}
case GetStashSize(replyTo) =>
replyTo ! buffer.size
Behaviors.same
case UnstashAll =>
Behaviors.unhandled
case u: Unstashed =>
throw new IllegalStateException(s"Unexpected $u in unstashing")
def unstashing(processed: Vector[String]): Behavior[Command] =
Behaviors.receive { (context, cmd) =>
cmd match {
case Unstashed(message: Msg) =>
context.log.debug(s"unstashed $message")
unstashing(processed :+ message.s)
case Unstashed(GetProcessed(replyTo)) =>
context.log.debug(s"unstashed GetProcessed")
replyTo ! processed
Behaviors.same
case message: Msg =>
context.log.debug(s"got $message in unstashing")
buffer.stash(message)
Behaviors.same
case g: GetProcessed =>
context.log.debug(s"got GetProcessed in unstashing")
buffer.stash(g)
Behaviors.same
case Stash =>
stashing(processed)
case Unstash =>
if (buffer.isEmpty) {
context.log.debug(s"unstashing done")
active(processed)
} else {
context.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(unstashing(processed), numberOfMessages, Unstashed)
}
case GetStashSize(replyTo) =>
replyTo ! buffer.size
Behaviors.same
case UnstashAll =>
Behaviors.unhandled
case u: Unstashed =>
throw new IllegalStateException(s"Unexpected $u in unstashing")
}
}
}
active(Vector.empty)
active(Vector.empty)
}
}
class MutableStash(context: ActorContext[Command]) extends AbstractBehavior[Command] {
class MutableStash(context: ActorContext[Command], buffer: StashBuffer[Command]) extends AbstractBehavior[Command] {
private val buffer = StashBuffer.apply[Command](capacity = 10)
private var stashing = false
private var processed = Vector.empty[String]
@ -155,7 +154,7 @@ object AbstractStashSpec {
this
case UnstashAll =>
stashing = false
buffer.unstashAll(context, this)
buffer.unstashAll(this)
case Unstash =>
if (buffer.isEmpty) {
stashing = false
@ -164,7 +163,7 @@ object AbstractStashSpec {
context.self ! Unstash // continue unstashing until buffer is empty
val numberOfMessages = 2
context.log.debug(s"Unstash $numberOfMessages of ${buffer.size}, starting with ${buffer.head}")
buffer.unstash(context, this, numberOfMessages, Unstashed)
buffer.unstash(this, numberOfMessages, Unstashed)
}
case Unstashed(message: Msg) =>
context.log.debug(s"unstashed $message")
@ -192,7 +191,10 @@ class ImmutableStashSpec extends AbstractStashSpec {
class MutableStashSpec extends AbstractStashSpec {
import AbstractStashSpec._
def testQualifier: String = "mutable behavior"
def behaviorUnderTest: Behavior[Command] = Behaviors.setup(context => new MutableStash(context))
def behaviorUnderTest: Behavior[Command] =
Behaviors.withStash(10) { stash =>
Behaviors.setup(context => new MutableStash(context, stash))
}
}
abstract class AbstractStashSpec extends ScalaTestWithActorTestKit with WordSpecLike {
@ -267,52 +269,54 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
Behaviors.same
}
private def stashingBehavior(probe: ActorRef[String], withSlowStoppingChild: Option[CountDownLatch] = None) = {
private def stashingBehavior(
probe: ActorRef[String],
withSlowStoppingChild: Option[CountDownLatch] = None): Behavior[String] = {
Behaviors.setup[String] { ctx =>
withSlowStoppingChild.foreach(latch => ctx.spawnAnonymous(slowStoppingChild(latch)))
val stash = StashBuffer[String](10)
Behaviors.withStash(10) { stash =>
def unstashing(n: Int): Behavior[String] =
Behaviors
.receiveMessage[String] {
case "stash" =>
probe.ref ! s"unstashing-$n"
unstashing(n + 1)
case "stash-fail" =>
probe.ref ! s"stash-fail-$n"
throw TestException("unstash-fail")
case "get-current" =>
probe.ref ! s"current-$n"
Behaviors.same
case "get-stash-size" =>
probe.ref ! s"stash-size-${stash.size}"
Behaviors.same
case "unstash" =>
// when testing resume
stash.unstashAll(unstashing(n))
}
.receiveSignal {
case (_, PreRestart) =>
probe.ref ! s"pre-restart-$n"
Behaviors.same
case (_, PostStop) =>
probe.ref ! s"post-stop-$n"
Behaviors.same
}
def unstashing(n: Int): Behavior[String] =
Behaviors
.receiveMessage[String] {
case "stash" =>
probe.ref ! s"unstashing-$n"
unstashing(n + 1)
case "stash-fail" =>
probe.ref ! s"stash-fail-$n"
throw TestException("unstash-fail")
case "get-current" =>
probe.ref ! s"current-$n"
Behaviors.same
case "get-stash-size" =>
probe.ref ! s"stash-size-${stash.size}"
Behaviors.same
case "unstash" =>
// when testing resume
stash.unstashAll(ctx, unstashing(n))
}
.receiveSignal {
case (_, PreRestart) =>
probe.ref ! s"pre-restart-$n"
Behaviors.same
case (_, PostStop) =>
probe.ref ! s"post-stop-$n"
Behaviors.same
}
Behaviors.receiveMessage[String] {
case msg if msg.startsWith("stash") =>
stash.stash(msg)
Behaviors.same
case "unstash" =>
stash.unstashAll(ctx, unstashing(0))
case "get-current" =>
probe.ref ! s"current-00"
Behaviors.same
case "get-stash-size" =>
probe.ref ! s"stash-size-${stash.size}"
Behaviors.same
Behaviors.receiveMessage[String] {
case msg if msg.startsWith("stash") =>
stash.stash(msg)
Behaviors.same
case "unstash" =>
stash.unstashAll(unstashing(0))
case "get-current" =>
probe.ref ! s"current-00"
Behaviors.same
case "get-stash-size" =>
probe.ref ! s"stash-size-${stash.size}"
Behaviors.same
}
}
}
}
@ -326,11 +330,11 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
val probe = TestProbe[String]()
// unstashing is inside setup
val ref = spawn(Behaviors.receive[String] {
case (ctx, "unstash") =>
val stash = StashBuffer[String](10)
stash.stash("one")
stash.unstashAll(ctx, Behaviors.same)
case (_, "unstash") =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.unstashAll(Behaviors.same)
}
case (_, msg) =>
probe.ref ! msg
Behaviors.same
@ -344,14 +348,15 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
val probe = TestProbe[String]()
// unstashing is inside setup
val ref = spawn(Behaviors.receivePartial[String] {
case (ctx, "unstash") =>
val stash = StashBuffer[String](10)
stash.stash("one")
stash.stash("two")
stash.unstashAll(ctx, Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
})
case (_, "unstash") =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.stash("two")
stash.unstashAll(Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
})
}
})
ref ! "unstash"
@ -371,10 +376,10 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
Behaviors
.supervise(Behaviors.receivePartial[String] {
case (ctx, "unstash") =>
val stash = StashBuffer[String](10)
stash.stash("one")
stash.unstashAll(ctx, Behaviors.same)
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.unstashAll(Behaviors.same)
}
case (_, msg) =>
probe.ref ! msg
Behaviors.same
@ -394,13 +399,14 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
Behaviors
.supervise(Behaviors.receivePartial[String] {
case (ctx, "unstash") =>
val stash = StashBuffer[String](10)
stash.stash("one")
stash.stash("two")
stash.unstashAll(ctx, Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
})
Behaviors.withStash(10) { stash =>
stash.stash("one")
stash.stash("two")
stash.unstashAll(Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
})
}
})
.onFailure[TestException](SupervisorStrategy.stop))
@ -558,20 +564,22 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
"be possible in combination with setup" in {
val probe = TestProbe[String]()
val ref = spawn(Behaviors.setup[String] { _ =>
val stash = StashBuffer[String](10)
stash.stash("one")
val ref = spawn(Behaviors.setup[String] { ctx =>
Behaviors.withStash(10) { stash =>
stash.stash("one")
// unstashing is inside setup
Behaviors.receiveMessage {
case "unstash" =>
Behaviors.setup[String] { ctx =>
stash.unstashAll(ctx, Behaviors.same)
}
case msg =>
probe.ref ! msg
Behaviors.same
// unstashing is inside setup
Behaviors.receiveMessage {
case "unstash" =>
Behaviors.setup[String] { ctx =>
stash.unstashAll(Behaviors.same)
}
case msg =>
probe.ref ! msg
Behaviors.same
}
}
})
ref ! "unstash"
@ -580,8 +588,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
"deal with unhandled the same way as normal unhandled" in {
val probe = TestProbe[String]()
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
val ref = spawn(Behaviors.withStash[String](10) { stash =>
stash.stash("unhandled")
stash.stash("handled")
stash.stash("handled")
@ -598,7 +605,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, unstashing(1))
stash.unstashAll(unstashing(1))
}
})
@ -613,21 +620,14 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
probe.expectMessage("handled 4")
}
"fail quick on invalid start behavior" in {
val stash = StashBuffer[String](10)
stash.stash("one")
intercept[IllegalArgumentException](stash.unstashAll(null, Behaviors.unhandled))
}
"deal with initial stop" in {
val probe = TestProbe[Any]
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
val ref = spawn(Behaviors.withStash[String](10) { stash =>
stash.stash("one")
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, Behaviors.stopped)
stash.unstashAll(Behaviors.stopped)
}
})
@ -639,14 +639,13 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
val probe = TestProbe[Any]
import akka.actor.typed.scaladsl.adapter._
untypedSys.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter])
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
val ref = spawn(Behaviors.withStash[String](10) { stash =>
stash.stash("one")
stash.stash("two")
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, Behaviors.receiveMessage {
stash.unstashAll(Behaviors.receiveMessage {
case unstashed =>
probe.ref ! unstashed
Behaviors.stopped
@ -663,14 +662,13 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
"work with initial same" in {
val probe = TestProbe[Any]
val ref = spawn(Behaviors.setup[String] { ctx =>
val stash = StashBuffer[String](10)
val ref = spawn(Behaviors.withStash[String](10) { stash =>
stash.stash("one")
stash.stash("two")
Behaviors.receiveMessage {
case "unstash" =>
stash.unstashAll(ctx, Behaviors.same)
stash.unstashAll(Behaviors.same)
case msg =>
probe.ref ! msg
Behaviors.same

View file

@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit
import akka.actor.ActorSystemImpl
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.Matchers

View file

@ -17,8 +17,6 @@ import org.scalatest.WordSpecLike
object StashDocSpec {
// #stashing
import akka.actor.typed.scaladsl.StashBuffer
trait DB {
def save(id: String, value: String): Future[Done]
def load(id: String): Future[String]
@ -33,15 +31,13 @@ object StashDocSpec {
private final case class DBError(cause: Throwable) extends Command
def behavior(id: String, db: DB): Behavior[Command] =
Behaviors.setup[Command] { context =>
val buffer = StashBuffer[Command](capacity = 100)
def init(): Behavior[Command] =
Behaviors.receive[Command] { (context, message) =>
message match {
Behaviors.withStash(100) { buffer =>
Behaviors.setup[Command] { context =>
def init(): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case InitialState(value) =>
// now we are ready to handle stashed messages if any
buffer.unstashAll(context, active(value))
buffer.unstashAll(active(value))
case DBError(cause) =>
throw cause
case other =>
@ -49,43 +45,41 @@ object StashDocSpec {
buffer.stash(other)
Behaviors.same
}
}
def active(state: String): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Get(replyTo) =>
replyTo ! state
Behaviors.same
case Save(value, replyTo) =>
context.pipeToSelf(db.save(id, value)) {
case Success(_) => SaveSuccess
case Failure(cause) => DBError(cause)
}
saving(value, replyTo)
def active(state: String): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Get(replyTo) =>
replyTo ! state
Behaviors.same
case Save(value, replyTo) =>
context.pipeToSelf(db.save(id, value)) {
case Success(_) => SaveSuccess
case Failure(cause) => DBError(cause)
}
saving(value, replyTo)
}
}
}
def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] =
Behaviors.receive[Command] { (context, message) =>
message match {
def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case SaveSuccess =>
replyTo ! Done
buffer.unstashAll(context, active(state))
buffer.unstashAll(active(state))
case DBError(cause) =>
throw cause
case other =>
buffer.stash(other)
Behaviors.same
}
context.pipeToSelf(db.load(id)) {
case Success(value) => InitialState(value)
case Failure(cause) => DBError(cause)
}
context.pipeToSelf(db.load(id)) {
case Success(value) => InitialState(value)
case Failure(cause) => DBError(cause)
init()
}
init()
}
}
// #stashing

View file

@ -16,8 +16,9 @@ import akka.actor.typed.Signal
import akka.actor.typed.TypedActorContext
import akka.actor.typed.javadsl
import akka.actor.typed.scaladsl
import akka.annotation.InternalApi
import akka.util.ConstantFun
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.util.{ unused, ConstantFun }
/**
* INTERNAL API
@ -27,14 +28,15 @@ import akka.util.ConstantFun
def apply(f: T => Unit): Unit = f(message)
}
def apply[T](capacity: Int): StashBufferImpl[T] =
new StashBufferImpl(capacity, null, null)
def apply[T](ctx: ActorContext[T], capacity: Int): StashBufferImpl[T] =
new StashBufferImpl(ctx, capacity, null, null)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class StashBufferImpl[T] private (
ctx: ActorContext[T],
val capacity: Int,
private var _first: StashBufferImpl.Node[T],
private var _last: StashBufferImpl.Node[T])
@ -60,7 +62,7 @@ import akka.util.ConstantFun
s"Couldn't add [${message.getClass.getName}] " +
s"because stash with capacity [$capacity] is full")
val node = new Node(null, message)
val node = createNode(message, ctx)
if (isEmpty) {
_first = node
_last = node
@ -68,12 +70,19 @@ import akka.util.ConstantFun
_last.next = node
_last = node
}
_size += 1
this
}
private def dropHead(): T = {
val message = head
@InternalStableApi
private def createNode(message: T, @unused ctx: scaladsl.ActorContext[T]): Node[T] = {
new Node(null, message)
}
@InternalStableApi
private def dropHeadForUnstash(): Node[T] = {
val message = rawHead
_first = _first.next
_size -= 1
if (isEmpty)
@ -82,6 +91,10 @@ import akka.util.ConstantFun
message
}
private def rawHead: Node[T] =
if (nonEmpty) _first
else throw new NoSuchElementException("head of empty buffer")
override def head: T =
if (nonEmpty) _first.message
else throw new NoSuchElementException("head of empty buffer")
@ -96,23 +109,23 @@ import akka.util.ConstantFun
override def forEach(f: Consumer[T]): Unit = foreach(f.accept)
override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T])
override def unstashAll(behavior: Behavior[T]): Behavior[T] = {
val behav = unstash(behavior, size, ConstantFun.scalaIdentityFunction[T])
stashCleared(ctx)
behav
}
override def unstashAll(ctx: javadsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstashAll(ctx.asScala, behavior)
override def unstash(
ctx: scaladsl.ActorContext[T],
behavior: Behavior[T],
numberOfMessages: Int,
wrap: T => T): Behavior[T] = {
override def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T] = {
if (isEmpty)
behavior // optimization
else {
val iter = new Iterator[T] {
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
override def next(): T = wrap(StashBufferImpl.this.dropHead())
override def next(): T = {
val next = StashBufferImpl.this.dropHeadForUnstash()
unstashed(ctx, next)
wrap(next.message)
}
}.take(numberOfMessages)
interpretUnstashedMessages(behavior, ctx, iter)
}
@ -178,15 +191,18 @@ import akka.util.ConstantFun
scalaCtx.system.deadLetters ! DeadLetter(msg, untypedDeadLetters, ctx.asScala.self.toUntyped))
}
override def unstash(
ctx: javadsl.ActorContext[T],
behavior: Behavior[T],
numberOfMessages: Int,
wrap: JFunction[T, T]): Behavior[T] =
unstash(ctx.asScala, behavior, numberOfMessages, x => wrap.apply(x))
override def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] =
unstash(behavior, numberOfMessages, x => wrap.apply(x))
override def toString: String =
s"StashBuffer($size/$capacity)"
@InternalStableApi
private[akka] def unstashed(@unused ctx: ActorContext[T], @unused node: Node[T]): Unit = ()
@InternalStableApi
private def stashCleared(@unused ctx: ActorContext[T]): Unit = ()
}
/**

View file

@ -330,7 +330,8 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
val stashCapacity =
if (strategy.stashCapacity >= 0) strategy.stashCapacity
else ctx.asScala.system.settings.RestartStashCapacity
restartingInProgress = OptionVal.Some((StashBuffer[Any](stashCapacity), childrenToStop))
restartingInProgress = OptionVal.Some(
(StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], stashCapacity), childrenToStop))
strategy match {
case backoff: Backoff =>
@ -364,7 +365,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
case OptionVal.None => newBehavior
case OptionVal.Some((stashBuffer, _)) =>
restartingInProgress = OptionVal.None
stashBuffer.unstashAll(ctx.asScala, newBehavior.unsafeCast)
stashBuffer.unstashAll(newBehavior.unsafeCast)
}
nextBehavior.narrow
} catch handleException(ctx, signalRestart = {

View file

@ -8,9 +8,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.Signal
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.StashBuffer
import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors, StashOverflowException }
import akka.annotation.InternalApi
/**
@ -33,17 +31,22 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav
import GuardianStartupBehavior.Start
private val stash = StashBuffer[Any](1000)
private var tempStash: List[Any] = Nil
override def onMessage(msg: Any): Behavior[Any] =
msg match {
case Start =>
// ctx is not available initially so we cannot use it until here
Behaviors.setup(ctx =>
stash
.unstashAll(ctx, Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any])))
Behaviors.withStash[Any](1000) { stash =>
tempStash.reverse.foreach(stash.stash)
tempStash = null
stash.unstashAll(Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any]))
}
case other =>
stash.stash(other)
tempStash = other :: tempStash
if (tempStash.size > 1000) {
throw new StashOverflowException("Guardian Behavior did not receive start and buffer is full.")
}
this
}

View file

@ -49,14 +49,14 @@ private final class InitialGroupRouterImpl[T](
// messages to a router
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing])
private val stash = StashBuffer[T](capacity = 10000)
private val stash = StashBuffer[T](ctx, capacity = 10000)
def onMessage(msg: T): Behavior[T] = msg match {
case serviceKey.Listing(update) =>
// we don't need to watch, because receptionist already does that
routingLogic.routeesUpdated(update)
val activeGroupRouter = new GroupRouterImpl[T](ctx, serviceKey, routingLogic, update.isEmpty)
stash.unstashAll(ctx, activeGroupRouter)
stash.unstashAll(activeGroupRouter)
case msg: T @unchecked =>
import akka.actor.typed.scaladsl.adapter._
if (!stash.isFull) stash.stash(msg)

View file

@ -8,7 +8,13 @@ import java.util.Collections
import java.util.function.{ Supplier, Function => JFunction }
import akka.actor.typed._
import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl, WithMdcBehaviorInterceptor }
import akka.actor.typed.internal.{
BehaviorImpl,
StashBufferImpl,
Supervisor,
TimerSchedulerImpl,
WithMdcBehaviorInterceptor
}
import akka.japi.function.{ Effect, Function2 => JapiFunction2 }
import akka.japi.pf.PFBuilder
import akka.util.unused
@ -40,6 +46,14 @@ object Behaviors {
def setup[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] =
BehaviorImpl.DeferredBehavior(ctx => factory.apply(ctx.asJava))
/**
* Support for stashing messages to unstash at a later timej.
*/
def withStash[T](capacity: Int, factory: java.util.function.Function[StashBuffer[T], Behavior[T]]): Behavior[T] =
setup(ctx => {
factory(StashBufferImpl[T](ctx.asScala, capacity))
})
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to

View file

@ -8,22 +8,9 @@ import java.util.function.Consumer
import java.util.function.{ Function => JFunction }
import akka.actor.typed.Behavior
import akka.actor.typed.internal.StashBufferImpl
import akka.actor.typed.scaladsl
import akka.annotation.DoNotInherit
object StashBuffer {
/**
* Create an empty message buffer.
*
* @param capacity the buffer can hold at most this number of messages
* @return an empty message buffer
*/
def create[T](capacity: Int): StashBuffer[T] =
StashBufferImpl[T](capacity)
}
/**
* A non thread safe mutable message buffer that can be used to buffer messages inside actors
* and then unstash them.
@ -32,7 +19,7 @@ object StashBuffer {
*
* Not for user extension.
*/
@DoNotInherit abstract class StashBuffer[T] {
@DoNotInherit trait StashBuffer[T] {
/**
* Check if the message buffer is empty.
@ -103,7 +90,7 @@ object StashBuffer {
*
* The `behavior` passed to `unstashAll` must not be `unhandled`.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
def unstashAll(behavior: Behavior[T]): Behavior[T]
/**
* Process `numberOfMessages` of the stashed messages with the `behavior`
@ -126,7 +113,7 @@ object StashBuffer {
*
* The `behavior` passed to `unstash` must not be `unhandled`.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T]
def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T]
}

View file

@ -29,6 +29,15 @@ object Behaviors {
def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] =
BehaviorImpl.DeferredBehavior(factory)
/**
* Support for stashing messages to unstash at a later timej.
*/
def withStash[T](capacity: Int)(factory: StashBuffer[T] => Behavior[T]): Behavior[T] =
setup(ctx => {
val stash = StashBuffer[T](ctx, capacity)
factory(stash)
})
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to

View file

@ -6,18 +6,22 @@ package akka.actor.typed.scaladsl
import akka.actor.typed.Behavior
import akka.actor.typed.internal.StashBufferImpl
import akka.annotation.DoNotInherit
import akka.annotation.{ DoNotInherit, InternalApi }
object StashBuffer {
/**
* INTERNAL API
*/
@InternalApi private[akka] object StashBuffer {
/**
* INTERNAL API
* Create an empty message buffer.
*
* @param capacity the buffer can hold at most this number of messages
* @return an empty message buffer
*/
def apply[T](capacity: Int): StashBuffer[T] =
StashBufferImpl[T](capacity)
@InternalApi private[akka] def apply[T](ctx: ActorContext[T], capacity: Int): StashBuffer[T] =
StashBufferImpl[T](ctx, capacity)
}
/**
@ -97,7 +101,7 @@ object StashBuffer {
*
* The initial `behavior` passed to `unstashAll` must not be `unhandled`.
*/
def unstashAll(ctx: ActorContext[T], behavior: Behavior[T]): Behavior[T]
def unstashAll(behavior: Behavior[T]): Behavior[T]
/**
* Process `numberOfMessages` of the stashed messages with the `behavior`
@ -120,7 +124,7 @@ object StashBuffer {
*
* The `behavior` passed to `unstash` must not be `unhandled`.
*/
def unstash(ctx: ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T]
def unstash(behavior: Behavior[T], numberOfMessages: Int, wrap: T => T): Behavior[T]
}

View file

@ -12,7 +12,7 @@ import scala.collection.immutable
* INTERNAL API
*
* Compatibility wrapper for `scala.concurrent.Future` to be able to compile the same code
* against Scala 2.12, 2.13
* against Scala 2.12, 2.13
*
* Remove these classes as soon as support for Scala 2.12 is dropped!
*/

View file

@ -5,8 +5,8 @@
package akka.actor
import scala.collection.immutable
import akka.AkkaException
import akka.annotation.InternalStableApi
import akka.dispatch.{
DequeBasedMessageQueueSemantics,
Envelope,
@ -230,6 +230,7 @@ private[akka] trait StashSupport {
* @param filterPredicate only stashed messages selected by this predicate are
* prepended to the mailbox.
*/
@InternalStableApi
private[akka] def unstashAll(filterPredicate: Any => Boolean): Unit = {
try {
val i = theStash.reverseIterator.filter(envelope => filterPredicate(envelope.message))
@ -244,6 +245,7 @@ private[akka] trait StashSupport {
*
* Clears the stash and and returns all envelopes that have not been unstashed.
*/
@InternalStableApi
private[akka] def clearStash(): Vector[Envelope] = {
val stashed = theStash
theStash = Vector.empty[Envelope]

View file

@ -392,6 +392,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter].
`interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter.
* `Behavior.orElse` has been removed because it wasn't safe together with `narrow`.
* `StashBuffer`s are now created with `Behaviors.withStash` rather than instantiating directly
#### Akka Typed Stream API changes

View file

@ -84,7 +84,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
// stashState outside supervise because StashState should survive restarts due to persist failures
val stashState = new StashState(settings)
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
val actualSignalHandler: PartialFunction[(State, Signal), Unit] = signalHandler.orElse {
// default signal handler is always the fallback

View file

@ -72,7 +72,7 @@ private[akka] trait StashManagement[C, E, S] {
stashState.decrementUnstashAllProgress()
buffer.unstash(setup.context, behavior, 1, ConstantFun.scalaIdentityFunction)
buffer.unstash(behavior, 1, ConstantFun.scalaIdentityFunction)
} else behavior
}
@ -123,10 +123,10 @@ private[akka] trait StashManagement[C, E, S] {
/** INTERNAL API: stash buffer state in order to survive restart of internal behavior */
@InternalApi
private[akka] class StashState(settings: EventSourcedSettings) {
private[akka] class StashState(ctx: ActorContext[InternalProtocol], settings: EventSourcedSettings) {
private var _internalStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(settings.stashCapacity)
private var _userStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(settings.stashCapacity)
private var _internalStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(ctx, settings.stashCapacity)
private var _userStashBuffer: StashBuffer[InternalProtocol] = StashBuffer(ctx, settings.stashCapacity)
private var unstashAllInProgress = 0
def internalStashBuffer: StashBuffer[InternalProtocol] = _internalStashBuffer
@ -134,8 +134,8 @@ private[akka] class StashState(settings: EventSourcedSettings) {
def userStashBuffer: StashBuffer[InternalProtocol] = _userStashBuffer
def clearStashBuffers(): Unit = {
_internalStashBuffer = StashBuffer(settings.stashCapacity)
_userStashBuffer = StashBuffer(settings.stashCapacity)
_internalStashBuffer = StashBuffer(ctx, settings.stashCapacity)
_userStashBuffer = StashBuffer(ctx, settings.stashCapacity)
unstashAllInProgress = 0
}

View file

@ -38,8 +38,8 @@ class StashStateSpec extends ScalaTestWithActorTestKit with WordSpecLike {
def apply(probe: TestProbe[Int]): Behavior[InternalProtocol] = {
val settings = dummySettings()
Behaviors.setup[InternalProtocol] { _ =>
val stashState = new StashState(settings)
Behaviors.setup[InternalProtocol] { ctx =>
val stashState = new StashState(ctx, settings)
Behaviors
.receiveMessagePartial[InternalProtocol] {
case RecoveryPermitGranted =>