Typed: Renaming deferred to setup #24548
This commit is contained in:
parent
cdf36c21f5
commit
14bb878da1
48 changed files with 124 additions and 122 deletions
|
|
@ -38,7 +38,7 @@ public class ActorCompile {
|
|||
Behavior<MyMsg> actor5 = ignore();
|
||||
Behavior<MyMsg> actor6 = tap((ctx, signal) -> {}, (ctx, msg) -> {}, actor5);
|
||||
Behavior<MyMsgA> actor7 = actor6.narrow();
|
||||
Behavior<MyMsg> actor8 = deferred(ctx -> {
|
||||
Behavior<MyMsg> actor8 = setup(ctx -> {
|
||||
final ActorRef<MyMsg> self = ctx.getSelf();
|
||||
return monitor(self, ignore());
|
||||
});
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ public class ActorContextAskTest extends JUnitSuite {
|
|||
|
||||
final TestProbe<Object> probe = new TestProbe<>(Adapter.toTyped(system));
|
||||
|
||||
final Behavior<Object> snitch = Behaviors.deferred((ActorContext<Object> ctx) -> {
|
||||
final Behavior<Object> snitch = Behaviors.setup((ActorContext<Object> ctx) -> {
|
||||
ctx.ask(Pong.class,
|
||||
pingPong,
|
||||
new Timeout(3, TimeUnit.SECONDS),
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ public class FaultToleranceDocTest extends JUnitSuite {
|
|||
})
|
||||
.build();
|
||||
|
||||
Behavior<Message> middleManagementBehavior = Behaviors.deferred((ctx) -> {
|
||||
Behavior<Message> middleManagementBehavior = Behaviors.setup((ctx) -> {
|
||||
ctx.getLog().info("Middle management starting up");
|
||||
final ActorRef<Message> child = ctx.spawn(failingChildBehavior, "child");
|
||||
// we want to know when the child terminates, but since we do not handle
|
||||
|
|
@ -49,7 +49,7 @@ public class FaultToleranceDocTest extends JUnitSuite {
|
|||
}).build();
|
||||
});
|
||||
|
||||
Behavior<Message> bossBehavior = Behaviors.deferred((ctx) -> {
|
||||
Behavior<Message> bossBehavior = Behaviors.setup((ctx) -> {
|
||||
ctx.getLog().info("Boss starting up");
|
||||
final ActorRef<Message> middleManagement = ctx.spawn(middleManagementBehavior, "middle-management");
|
||||
ctx.watch(middleManagement);
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
*/
|
||||
package jdocs.akka.typed;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
|
|
@ -376,7 +375,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
public static Behavior<DaveProtocol> daveBehavior(final ActorRef<HalCommand> hal) {
|
||||
return Behaviors.deferred((ActorContext<DaveProtocol> ctx) -> {
|
||||
return Behaviors.setup((ActorContext<DaveProtocol> ctx) -> {
|
||||
|
||||
// asking someone requires a timeout, if the timeout hits without response
|
||||
// the ask is failed with a TimeoutException
|
||||
|
|
@ -507,7 +506,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
|||
|
||||
// actor behavior
|
||||
public Behavior<HomeCommand> homeBehavior() {
|
||||
return Behaviors.deferred((ctx) -> {
|
||||
return Behaviors.setup((ctx) -> {
|
||||
final ActorRef<GetKeys> keyCabinet = ctx.spawn(keyCabinetBehavior, "key-cabinet");
|
||||
final ActorRef<GetWallet> drawer = ctx.spawn(drawerBehavior, "drawer");
|
||||
|
||||
|
|
|
|||
|
|
@ -212,7 +212,7 @@ public class IntroTest {
|
|||
public static void runChatRoom() throws Exception {
|
||||
|
||||
//#chatroom-main
|
||||
Behavior<Void> main = Behaviors.deferred(ctx -> {
|
||||
Behavior<Void> main = Behaviors.setup(ctx -> {
|
||||
ActorRef<ChatRoom.RoomCommand> chatRoom =
|
||||
ctx.spawn(ChatRoom.behavior(), "chatRoom");
|
||||
ActorRef<ChatRoom.SessionEvent> gabbler =
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ public class StashDocTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
Behavior<Command> behavior() {
|
||||
return Behaviors.deferred(ctx -> {
|
||||
return Behaviors.setup(ctx -> {
|
||||
db.load(id)
|
||||
.whenComplete((value, cause) -> {
|
||||
if (cause == null)
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ public class TypedWatchingUntypedTest extends JUnitSuite {
|
|||
public static class Pong implements Command { }
|
||||
|
||||
public static Behavior<Command> behavior() {
|
||||
return akka.actor.typed.javadsl.Behaviors.deferred(context -> {
|
||||
return akka.actor.typed.javadsl.Behaviors.setup(context -> {
|
||||
akka.actor.ActorRef second = Adapter.actorOf(context, Untyped.props(), "second");
|
||||
|
||||
Adapter.watch(context, second);
|
||||
|
|
|
|||
|
|
@ -848,13 +848,13 @@ class WidenedActorContextSpec extends ActorContextSpec {
|
|||
class DeferredActorContextSpec extends ActorContextSpec {
|
||||
override def suite = "deferred"
|
||||
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
|
||||
Behaviors.deferred(_ ⇒ subject(ctx.self, ignorePostStop))
|
||||
Behaviors.setup(_ ⇒ subject(ctx.self, ignorePostStop))
|
||||
}
|
||||
|
||||
class NestedDeferredActorContextSpec extends ActorContextSpec {
|
||||
override def suite = "nexted-deferred"
|
||||
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
|
||||
Behaviors.deferred(_ ⇒ Behaviors.deferred(_ ⇒ subject(ctx.self, ignorePostStop)))
|
||||
Behaviors.setup(_ ⇒ Behaviors.setup(_ ⇒ subject(ctx.self, ignorePostStop)))
|
||||
}
|
||||
|
||||
class TapActorContextSpec extends ActorContextSpec {
|
||||
|
|
|
|||
|
|
@ -494,7 +494,7 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec {
|
|||
|
||||
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
|
||||
val inbox = TestInbox[Done]("deferredListener")
|
||||
(SActor.deferred(_ ⇒ {
|
||||
(SActor.setup(_ ⇒ {
|
||||
inbox.ref ! Done
|
||||
super.behavior(monitor)._1
|
||||
}), inbox)
|
||||
|
|
@ -594,7 +594,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec {
|
|||
|
||||
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
|
||||
val inbox = TestInbox[Done]("deferredListener")
|
||||
(JActor.deferred(df(_ ⇒ {
|
||||
(JActor.setup(df(_ ⇒ {
|
||||
inbox.ref ! Done
|
||||
super.behavior(monitor)._1
|
||||
})), inbox)
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ class DeferredSpec extends TestKit with TypedAkkaSpec {
|
|||
"Deferred behavior" must {
|
||||
"must create underlying" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.setup[Command] { _ ⇒
|
||||
probe.ref ! Started
|
||||
target(probe.ref)
|
||||
}
|
||||
|
|
@ -46,8 +46,8 @@ class DeferredSpec extends TestKit with TypedAkkaSpec {
|
|||
|
||||
"must stop when exception from factory" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = Behaviors.deferred[Command] { ctx ⇒
|
||||
val child = ctx.spawnAnonymous(Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.setup[Command] { ctx ⇒
|
||||
val child = ctx.spawnAnonymous(Behaviors.setup[Command] { _ ⇒
|
||||
probe.ref ! Started
|
||||
throw new RuntimeException("simulated exc from factory") with NoStackTrace
|
||||
})
|
||||
|
|
@ -65,8 +65,8 @@ class DeferredSpec extends TestKit with TypedAkkaSpec {
|
|||
|
||||
"must stop when deferred result it Stopped" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = Behaviors.deferred[Command] { ctx ⇒
|
||||
val child = ctx.spawnAnonymous(Behaviors.deferred[Command](_ ⇒ Behaviors.stopped))
|
||||
val behv = Behaviors.setup[Command] { ctx ⇒
|
||||
val child = ctx.spawnAnonymous(Behaviors.setup[Command](_ ⇒ Behaviors.stopped))
|
||||
ctx.watch(child)
|
||||
Behaviors.immutable[Command]((_, _) ⇒ Behaviors.same).onSignal {
|
||||
case (_, Terminated(`child`)) ⇒
|
||||
|
|
@ -80,8 +80,8 @@ class DeferredSpec extends TestKit with TypedAkkaSpec {
|
|||
|
||||
"must create underlying when nested" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = Behaviors.deferred[Command] { _ ⇒
|
||||
Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.setup[Command] { _ ⇒
|
||||
Behaviors.setup[Command] { _ ⇒
|
||||
probe.ref ! Started
|
||||
target(probe.ref)
|
||||
}
|
||||
|
|
@ -92,7 +92,7 @@ class DeferredSpec extends TestKit with TypedAkkaSpec {
|
|||
|
||||
"must un-defer underlying when wrapped by widen" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.setup[Command] { _ ⇒
|
||||
probe.ref ! Started
|
||||
target(probe.ref)
|
||||
}.widen[Command] {
|
||||
|
|
@ -110,7 +110,7 @@ class DeferredSpec extends TestKit with TypedAkkaSpec {
|
|||
// monitor is implemented with tap, so this is testing both
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val monitorProbe = TestProbe[Command]("monitor")
|
||||
val behv = Behaviors.monitor(monitorProbe.ref, Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.monitor(monitorProbe.ref, Behaviors.setup[Command] { _ ⇒
|
||||
probe.ref ! Started
|
||||
target(probe.ref)
|
||||
})
|
||||
|
|
@ -131,7 +131,7 @@ class DeferredStubbedSpec extends TypedAkkaSpec {
|
|||
|
||||
"must create underlying deferred behavior immediately" in {
|
||||
val inbox = TestInbox[Event]("evt")
|
||||
val behv = Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.setup[Command] { _ ⇒
|
||||
inbox.ref ! Started
|
||||
target(inbox.ref)
|
||||
}
|
||||
|
|
@ -143,7 +143,7 @@ class DeferredStubbedSpec extends TypedAkkaSpec {
|
|||
"must stop when exception from factory" in {
|
||||
val inbox = TestInbox[Event]("evt")
|
||||
val exc = new RuntimeException("simulated exc from factory") with NoStackTrace
|
||||
val behv = Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.setup[Command] { _ ⇒
|
||||
inbox.ref ! Started
|
||||
throw exc
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ object StepWise {
|
|||
}
|
||||
|
||||
def apply[T](f: (scaladsl.ActorContext[T], StartWith[T]) ⇒ Steps[T, _]): Behavior[T] =
|
||||
deferred[Any] { ctx ⇒
|
||||
setup[Any] { ctx ⇒
|
||||
run(ctx, f(ctx.asInstanceOf[scaladsl.ActorContext[T]], new StartWith(keepTraces = false)).ops.reverse, ())
|
||||
}.narrow
|
||||
|
||||
|
|
|
|||
|
|
@ -229,7 +229,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
|
||||
"create underlying deferred behavior immediately" in {
|
||||
val inbox = TestInbox[Event]("evt")
|
||||
val behv = supervise(deferred[Command] { _ ⇒
|
||||
val behv = supervise(setup[Command] { _ ⇒
|
||||
inbox.ref ! Started
|
||||
targetBehavior(inbox.ref)
|
||||
}).onFailure[Exc1](SupervisorStrategy.restart)
|
||||
|
|
@ -270,7 +270,7 @@ class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseStri
|
|||
class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy) {
|
||||
val probe = TestProbe[AnyRef]("evt")
|
||||
val failCounter = new AtomicInteger(0)
|
||||
def behv = supervise(deferred[Command] { _ ⇒
|
||||
def behv = supervise(setup[Command] { _ ⇒
|
||||
val count = failCounter.getAndIncrement()
|
||||
if (count < failCount) {
|
||||
probe.ref ! StartFailed
|
||||
|
|
@ -284,7 +284,7 @@ class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseStri
|
|||
|
||||
class FailingUnhandledTestSetup(strategy: SupervisorStrategy) {
|
||||
val probe = TestProbe[AnyRef]("evt")
|
||||
def behv = supervise(deferred[Command] { _ ⇒
|
||||
def behv = supervise(setup[Command] { _ ⇒
|
||||
probe.ref ! StartFailed
|
||||
throw new TE("construction failed")
|
||||
}).onFailure[IllegalArgumentException](strategy)
|
||||
|
|
@ -449,7 +449,7 @@ class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseStri
|
|||
val strategy = SupervisorStrategy
|
||||
.restartWithBackoff(minBackoff, 10.seconds, 0.0)
|
||||
.withResetBackoffAfter(10.seconds)
|
||||
val behv = Behaviors.supervise(Behaviors.deferred[Command] { _ ⇒
|
||||
val behv = Behaviors.supervise(Behaviors.setup[Command] { _ ⇒
|
||||
startedProbe.ref ! Started
|
||||
targetBehavior(probe.ref)
|
||||
}).onFailure[Exception](strategy)
|
||||
|
|
@ -520,7 +520,7 @@ class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseStri
|
|||
|
||||
"create underlying deferred behavior immediately" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = supervise(deferred[Command] { _ ⇒
|
||||
val behv = supervise(setup[Command] { _ ⇒
|
||||
probe.ref ! Started
|
||||
targetBehavior(probe.ref)
|
||||
}).onFailure[Exception](SupervisorStrategy.restart)
|
||||
|
|
@ -632,8 +632,8 @@ class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseStri
|
|||
"work with nested supervisions and defers" in {
|
||||
val strategy = SupervisorStrategy.restartWithLimit(3, 1.second)
|
||||
val probe = TestProbe[AnyRef]("p")
|
||||
val beh = supervise[String](deferred(ctx ⇒
|
||||
supervise[String](deferred { ctx ⇒
|
||||
val beh = supervise[String](setup(ctx ⇒
|
||||
supervise[String](setup { ctx ⇒
|
||||
probe.ref ! Started
|
||||
scaladsl.Behaviors.empty[String]
|
||||
}).onFailure[RuntimeException](strategy)
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ class WatchSpec extends TestKit("WordSpec", WatchSpec.config)
|
|||
case class Failed(t: Terminated) // we need to wrap it as it is handled specially
|
||||
val probe = TestProbe[Any]()
|
||||
val ex = new TestException("boom")
|
||||
val parent = spawn(Behaviors.deferred[Any] { ctx ⇒
|
||||
val parent = spawn(Behaviors.setup[Any] { ctx ⇒
|
||||
val child = ctx.spawn(Behaviors.immutable[Any]((ctx, msg) ⇒
|
||||
throw ex
|
||||
), "child")
|
||||
|
|
@ -99,8 +99,8 @@ class WatchSpec extends TestKit("WordSpec", WatchSpec.config)
|
|||
case class Failed(t: Terminated) // we need to wrap it as it is handled specially
|
||||
val probe = TestProbe[Any]()
|
||||
val ex = new TestException("boom")
|
||||
val grossoBosso = spawn(Behaviors.deferred[Any] { ctx ⇒
|
||||
val middleManagement = ctx.spawn(Behaviors.deferred[Any] { ctx ⇒
|
||||
val grossoBosso = spawn(Behaviors.setup[Any] { ctx ⇒
|
||||
val middleManagement = ctx.spawn(Behaviors.setup[Any] { ctx ⇒
|
||||
val sixPackJoe = ctx.spawn(Behaviors.immutable[Any]((ctx, msg) ⇒
|
||||
throw ex
|
||||
), "joe")
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ class ActorContextAskSpec extends TestKit(ActorContextAskSpec.config) with Typed
|
|||
|
||||
val probe = TestProbe[AnyRef]()
|
||||
|
||||
val snitch = Behaviors.deferred[Pong] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[Pong] { (ctx) ⇒
|
||||
|
||||
// Timeout comes from TypedAkkaSpec
|
||||
|
||||
|
|
@ -86,7 +86,7 @@ class ActorContextAskSpec extends TestKit(ActorContextAskSpec.config) with Typed
|
|||
}
|
||||
))
|
||||
|
||||
val snitch = Behaviors.deferred[AnyRef] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[AnyRef] { (ctx) ⇒
|
||||
ctx.ask(pingPong)(Ping) {
|
||||
case Success(msg) ⇒ throw new NotImplementedError(msg.toString)
|
||||
case Failure(x) ⇒ x
|
||||
|
|
@ -114,7 +114,7 @@ class ActorContextAskSpec extends TestKit(ActorContextAskSpec.config) with Typed
|
|||
|
||||
"deal with timeouts in ask" in {
|
||||
val probe = TestProbe[AnyRef]()
|
||||
val snitch = Behaviors.deferred[AnyRef] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[AnyRef] { (ctx) ⇒
|
||||
|
||||
ctx.ask[String, String](system.deadLetters)(ref ⇒ "boo") {
|
||||
case Success(m) ⇒ m
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class ActorLoggingSpec extends TestKit(ConfigFactory.parseString(
|
|||
|
||||
"be conveniently available from the ctx" in {
|
||||
val actor = EventFilter.info("Started", source = "akka://ActorLoggingSpec/user/the-actor", occurrences = 1).intercept {
|
||||
spawn(Behaviors.deferred[String] { ctx ⇒
|
||||
spawn(Behaviors.setup[String] { ctx ⇒
|
||||
ctx.log.info("Started")
|
||||
|
||||
Behaviors.immutable { (ctx, msg) ⇒
|
||||
|
|
@ -47,7 +47,7 @@ class ActorLoggingSpec extends TestKit(ConfigFactory.parseString(
|
|||
EventFilter.custom({
|
||||
case event: LogEventWithMarker if event.marker == marker ⇒ true
|
||||
}, occurrences = 5).intercept(
|
||||
spawn(Behaviors.deferred[Any] { ctx ⇒
|
||||
spawn(Behaviors.setup[Any] { ctx ⇒
|
||||
ctx.log.debug(marker, "whatever")
|
||||
ctx.log.info(marker, "whatever")
|
||||
ctx.log.warning(marker, "whatever")
|
||||
|
|
@ -62,7 +62,7 @@ class ActorLoggingSpec extends TestKit(ConfigFactory.parseString(
|
|||
EventFilter.custom({
|
||||
case event: LogEventWithCause if event.cause == cause ⇒ true
|
||||
}, occurrences = 2).intercept(
|
||||
spawn(Behaviors.deferred[Any] { ctx ⇒
|
||||
spawn(Behaviors.setup[Any] { ctx ⇒
|
||||
ctx.log.warning(cause, "whatever")
|
||||
ctx.log.warning(marker, cause, "whatever")
|
||||
Behaviors.stopped
|
||||
|
|
@ -77,7 +77,7 @@ class ActorLoggingSpec extends TestKit(ConfigFactory.parseString(
|
|||
EventFilter.custom({
|
||||
case _ ⇒ true // any is fine, we're just after the right count of statements reaching the listener
|
||||
}, occurrences = 72).intercept {
|
||||
spawn(Behaviors.deferred[String] { ctx ⇒
|
||||
spawn(Behaviors.setup[String] { ctx ⇒
|
||||
ctx.log.debug("message")
|
||||
ctx.log.debug("{}", "arg1")
|
||||
ctx.log.debug("{} {}", "arg1", "arg2")
|
||||
|
|
@ -180,7 +180,7 @@ class ActorLoggingSpec extends TestKit(ConfigFactory.parseString(
|
|||
)
|
||||
else Map("txId" -> msg.transactionId)
|
||||
},
|
||||
Behaviors.deferred { ctx ⇒
|
||||
Behaviors.setup { ctx ⇒
|
||||
ctx.log.info("Starting")
|
||||
Behaviors.immutable { (ctx, msg) ⇒
|
||||
ctx.log.info("Got message!")
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ final class GracefulStopSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
val probe = TestProbe[String]("probe")
|
||||
|
||||
val behavior =
|
||||
Behaviors.deferred[akka.NotUsed] { context ⇒
|
||||
Behaviors.setup[akka.NotUsed] { context ⇒
|
||||
val c1 = context.spawn[NotUsed](Behaviors.onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
probe.ref ! "child-done"
|
||||
|
|
@ -50,7 +50,7 @@ final class GracefulStopSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
val probe = TestProbe[Done]("probe")
|
||||
|
||||
val behavior =
|
||||
Behaviors.deferred[akka.NotUsed] { context ⇒
|
||||
Behaviors.setup[akka.NotUsed] { context ⇒
|
||||
// do not spawn any children
|
||||
Behaviors.stopped {
|
||||
Behaviors.onSignal {
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ class MessageAdapterSpec extends TestKit(MessageAdapterSpec.config) with TypedAk
|
|||
|
||||
val probe = TestProbe[AnotherPong]()
|
||||
|
||||
val snitch = Behaviors.deferred[AnotherPong] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[AnotherPong] { (ctx) ⇒
|
||||
|
||||
val replyTo = ctx.messageAdapter[Response](_ ⇒
|
||||
AnotherPong(ctx.self.path.name, Thread.currentThread().getName))
|
||||
|
|
@ -109,7 +109,7 @@ class MessageAdapterSpec extends TestKit(MessageAdapterSpec.config) with TypedAk
|
|||
|
||||
val probe = TestProbe[Wrapped]()
|
||||
|
||||
val snitch = Behaviors.deferred[Wrapped] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[Wrapped] { (ctx) ⇒
|
||||
|
||||
ctx.messageAdapter[Response](pong ⇒ Wrapped(qualifier = "wrong", pong)) // this is replaced
|
||||
val replyTo1: ActorRef[Response] = ctx.messageAdapter(pong ⇒ Wrapped(qualifier = "1", pong))
|
||||
|
|
@ -154,7 +154,7 @@ class MessageAdapterSpec extends TestKit(MessageAdapterSpec.config) with TypedAk
|
|||
|
||||
val probe = TestProbe[Wrapped]()
|
||||
|
||||
val snitch = Behaviors.deferred[Wrapped] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[Wrapped] { (ctx) ⇒
|
||||
|
||||
val replyTo1 = ctx.messageAdapter[Pong1](pong ⇒ Wrapped(qualifier = "1", pong))
|
||||
pingPong ! Ping1(replyTo1)
|
||||
|
|
@ -191,7 +191,7 @@ class MessageAdapterSpec extends TestKit(MessageAdapterSpec.config) with TypedAk
|
|||
|
||||
val probe = TestProbe[Any]()
|
||||
|
||||
val snitch = Behaviors.deferred[Wrapped] { (ctx) ⇒
|
||||
val snitch = Behaviors.setup[Wrapped] { (ctx) ⇒
|
||||
|
||||
var count = 0
|
||||
val replyTo = ctx.messageAdapter[Pong] { pong ⇒
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ final class OnSignalSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
"must correctly install the signal handler" in {
|
||||
val probe = TestProbe[Done]("probe")
|
||||
val behavior =
|
||||
Behaviors.deferred[Nothing] { context ⇒
|
||||
Behaviors.setup[Nothing] { context ⇒
|
||||
val stoppedChild = context.spawn(Behaviors.stopped, "stopped-child")
|
||||
context.watch(stoppedChild)
|
||||
Behaviors.onSignal[Nothing] {
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ class StashBufferSpec extends WordSpec with Matchers {
|
|||
valueInbox.ref ! state
|
||||
Behaviors.same
|
||||
} else {
|
||||
Behaviors.deferred[String](_ ⇒ behavior(state + msg))
|
||||
Behaviors.setup[String](_ ⇒ behavior(state + msg))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ object StashSpec {
|
|||
final case class GetStashSize(replyTo: ActorRef[Int]) extends Command
|
||||
|
||||
val immutableStash: Behavior[Command] =
|
||||
Behaviors.deferred[Command] { _ ⇒
|
||||
Behaviors.setup[Command] { _ ⇒
|
||||
val buffer = StashBuffer[Command](capacity = 10)
|
||||
|
||||
def active(processed: Vector[String]): Behavior[Command] =
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ class AdapterSpec extends AkkaSpec {
|
|||
for { _ ← 0 to 10 } {
|
||||
var system: akka.actor.typed.ActorSystem[NotUsed] = null
|
||||
try {
|
||||
system = ActorSystem.create(Behaviors.deferred[NotUsed](_ ⇒ Behavior.stopped[NotUsed]), "AdapterSpec-stopping-guardian")
|
||||
system = ActorSystem.create(Behaviors.setup[NotUsed](_ ⇒ Behavior.stopped[NotUsed]), "AdapterSpec-stopping-guardian")
|
||||
} finally if (system != null) shutdown(system.toUntyped)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class FaultToleranceDocSpec extends TestKit(ConfigFactory.parseString(
|
|||
}
|
||||
}
|
||||
|
||||
val middleManagementBehavior = Behaviors.deferred[Message] { ctx ⇒
|
||||
val middleManagementBehavior = Behaviors.setup[Message] { ctx ⇒
|
||||
ctx.log.info("Middle management starting up")
|
||||
val child = ctx.spawn(worker, "child")
|
||||
ctx.watch(child)
|
||||
|
|
@ -44,7 +44,7 @@ class FaultToleranceDocSpec extends TestKit(ConfigFactory.parseString(
|
|||
}
|
||||
}
|
||||
|
||||
val bossBehavior = Behaviors.supervise(Behaviors.deferred[Message] { ctx ⇒
|
||||
val bossBehavior = Behaviors.supervise(Behaviors.setup[Message] { ctx ⇒
|
||||
ctx.log.info("Boss starting up")
|
||||
val middleManagment = ctx.spawn(middleManagementBehavior, "middle-management")
|
||||
ctx.watch(middleManagment)
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
private final case class WrappedBackendResponse(response: Backend.Response) extends Command
|
||||
|
||||
def translator(backend: ActorRef[Backend.Request]): Behavior[Command] =
|
||||
Behaviors.deferred[Command] { ctx ⇒
|
||||
Behaviors.setup[Command] { ctx ⇒
|
||||
val backendResponseMapper: ActorRef[Backend.Response] =
|
||||
ctx.messageAdapter(rsp ⇒ WrappedBackendResponse(rsp))
|
||||
|
||||
|
|
@ -224,7 +224,7 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
// this is a part of the protocol that is internal to the actor itself
|
||||
case class AdaptedResponse(message: String) extends DaveMessage
|
||||
|
||||
def daveBehavior(hal: ActorRef[HalCommand]) = Behaviors.deferred[DaveMessage] { ctx ⇒
|
||||
def daveBehavior(hal: ActorRef[HalCommand]) = Behaviors.setup[DaveMessage] { ctx ⇒
|
||||
|
||||
// asking someone requires a timeout, if the timeout hits without response
|
||||
// the ask is failed with a TimeoutException
|
||||
|
|
@ -321,7 +321,7 @@ class InteractionPatternsSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
// we don't _really_ care about the actor protocol here as nobody will send us
|
||||
// messages except for responses to our queries, so we just accept any kind of message
|
||||
// but narrow that to more limited types then we interact
|
||||
Behaviors.deferred[AnyRef] { ctx ⇒
|
||||
Behaviors.setup[AnyRef] { ctx ⇒
|
||||
var wallet: Option[Wallet] = None
|
||||
var keys: Option[Keys] = None
|
||||
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
|
||||
//#chatroom-main
|
||||
val main: Behavior[NotUsed] =
|
||||
Behaviors.deferred { ctx ⇒
|
||||
Behaviors.setup { ctx ⇒
|
||||
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
|
||||
val gabblerRef = ctx.spawn(gabbler, "gabbler")
|
||||
ctx.watch(gabblerRef)
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ class MutableIntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
|
||||
//#chatroom-main
|
||||
val main: Behavior[String] =
|
||||
Behaviors.deferred { ctx ⇒
|
||||
Behaviors.setup { ctx ⇒
|
||||
val chatRoom = ctx.spawn(ChatRoom.behavior(), "chatroom")
|
||||
val gabblerRef = ctx.spawn(gabbler, "gabbler")
|
||||
ctx.watch(gabblerRef)
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ object StashDocSpec {
|
|||
private final case class DBError(cause: Throwable) extends Command
|
||||
|
||||
def behavior(id: String, db: DB): Behavior[Command] =
|
||||
Behaviors.deferred[Command] { ctx ⇒
|
||||
Behaviors.setup[Command] { ctx ⇒
|
||||
|
||||
val buffer = StashBuffer[Command](capacity = 100)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ object TypedWatchingUntypedSpec {
|
|||
case object Pong extends Command
|
||||
|
||||
val behavior: Behavior[Command] =
|
||||
Behaviors.deferred { context ⇒
|
||||
Behaviors.setup { context ⇒
|
||||
// context.spawn is an implicit extension method
|
||||
val untyped = context.actorOf(Untyped.props(), "second")
|
||||
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ object Behavior {
|
|||
@InternalApi
|
||||
private[akka] final case class DeferredBehavior[T](factory: SAC[T] ⇒ Behavior[T]) extends Behavior[T] {
|
||||
|
||||
/** "undefer" the deferred behavior */
|
||||
/** start the deferred behavior */
|
||||
@throws(classOf[Exception])
|
||||
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala)
|
||||
|
||||
|
|
@ -227,7 +227,7 @@ object Behavior {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Return special behaviors as is, undefer deferred, if behavior is "non-special" apply the wrap function `f` to get
|
||||
* Return special behaviors as is, start deferred, if behavior is "non-special" apply the wrap function `f` to get
|
||||
* and return the result from that. Useful for cases where a [[Behavior]] implementation that is decorating another
|
||||
* behavior has processed a message and needs to re-wrap the resulting behavior with itself.
|
||||
*/
|
||||
|
|
@ -238,14 +238,18 @@ object Behavior {
|
|||
case SameBehavior | `currentBehavior` ⇒ same
|
||||
case UnhandledBehavior ⇒ unhandled
|
||||
case StoppedBehavior ⇒ stopped
|
||||
case deferred: DeferredBehavior[T] ⇒ wrap(currentBehavior, undefer(deferred, ctx), ctx)(f)
|
||||
case deferred: DeferredBehavior[T] ⇒ wrap(currentBehavior, start(deferred, ctx), ctx)(f)
|
||||
case other ⇒ f(other)
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts deferred behavior and nested deferred behaviors until a non deferred behavior is reached
|
||||
* and that is then returned.
|
||||
*/
|
||||
@tailrec
|
||||
def undefer[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
|
||||
def start[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
|
||||
behavior match {
|
||||
case innerDeferred: DeferredBehavior[T] ⇒ undefer(innerDeferred(ctx), ctx)
|
||||
case innerDeferred: DeferredBehavior[T] ⇒ start(innerDeferred(ctx), ctx)
|
||||
case _ ⇒ behavior
|
||||
}
|
||||
}
|
||||
|
|
@ -302,7 +306,7 @@ object Behavior {
|
|||
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
|
||||
case _: UntypedBehavior[_] ⇒
|
||||
throw new IllegalArgumentException(s"cannot wrap behavior [$behavior] in " +
|
||||
"Actor.deferred, Actor.supervise or similar")
|
||||
"Behaviors.setup, Behaviors.supervise or similar")
|
||||
case d: DeferredBehavior[_] ⇒ throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
|
||||
case IgnoreBehavior ⇒ SameBehavior.asInstanceOf[Behavior[T]]
|
||||
case s: StoppedBehavior[T] ⇒ s
|
||||
|
|
@ -312,7 +316,7 @@ object Behavior {
|
|||
case signal: Signal ⇒ ext.receiveSignal(ctx, signal)
|
||||
case m ⇒ ext.receiveMessage(ctx, m.asInstanceOf[T])
|
||||
}
|
||||
undefer(possiblyDeferredResult, ctx)
|
||||
start(possiblyDeferredResult, ctx)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -323,7 +327,7 @@ object Behavior {
|
|||
*/
|
||||
@InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: ActorContext[T], messages: Iterator[T]): Behavior[T] = {
|
||||
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
|
||||
val b2 = Behavior.undefer(b, ctx)
|
||||
val b2 = Behavior.start(b, ctx)
|
||||
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
|
||||
else {
|
||||
val nextB = messages.next() match {
|
||||
|
|
@ -336,7 +340,7 @@ object Behavior {
|
|||
}
|
||||
}
|
||||
|
||||
interpretOne(Behavior.undefer(behavior, ctx))
|
||||
interpretOne(Behavior.start(behavior, ctx))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import scala.reflect.ClassTag
|
|||
case d: DeferredBehavior[T] ⇒
|
||||
DeferredBehavior[U] { ctx ⇒
|
||||
val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]]
|
||||
val b = Behavior.validateAsInitial(Behavior.undefer(d, c))
|
||||
val b = Behavior.validateAsInitial(Behavior.start(d, c))
|
||||
Widened(b, matcher)
|
||||
}
|
||||
case _ ⇒
|
||||
|
|
@ -109,7 +109,7 @@ import scala.reflect.ClassTag
|
|||
case d: DeferredBehavior[T] ⇒
|
||||
DeferredBehavior[T] { ctx ⇒
|
||||
val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]]
|
||||
val b = Behavior.validateAsInitial(Behavior.undefer(d, c))
|
||||
val b = Behavior.validateAsInitial(Behavior.start(d, c))
|
||||
Intercept(beforeMessage, beforeSignal, afterMessage, afterSignal, b, toStringPrefix)
|
||||
}
|
||||
case _ ⇒
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import scala.util.control.NonFatal
|
|||
*/
|
||||
@InternalApi private[akka] object Supervisor {
|
||||
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] =
|
||||
Behaviors.deferred[T] { ctx ⇒
|
||||
Behaviors.setup[T] { ctx ⇒
|
||||
val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]]
|
||||
val supervisor: Supervisor[T, Thr] = strategy match {
|
||||
case Restart(-1, _, loggingEnabled) ⇒
|
||||
|
|
@ -112,7 +112,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
def init(ctx: ActorContext[T]) =
|
||||
// no handling of errors for Resume as that could lead to infinite restart-loop
|
||||
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false)
|
||||
wrap(Behavior.validateAsInitial(Behavior.start(behavior, ctx)), afterException = false)
|
||||
|
||||
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] = {
|
||||
case NonFatal(ex: Thr) ⇒
|
||||
|
|
@ -132,7 +132,7 @@ import scala.util.control.NonFatal
|
|||
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
|
||||
|
||||
def init(ctx: ActorContext[T]): Supervisor[T, Thr] =
|
||||
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), false)
|
||||
wrap(Behavior.validateAsInitial(Behavior.start(behavior, ctx)), false)
|
||||
|
||||
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(ex: Thr) ⇒
|
||||
|
|
@ -154,7 +154,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
override def init(ctx: ActorContext[T]) =
|
||||
// no handling of errors for Restart as that could lead to infinite restart-loop
|
||||
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false)
|
||||
wrap(Behavior.validateAsInitial(Behavior.start(behavior, ctx)), afterException = false)
|
||||
|
||||
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] = {
|
||||
case NonFatal(ex: Thr) ⇒
|
||||
|
|
@ -177,7 +177,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
override def init(ctx: ActorContext[T]) =
|
||||
try {
|
||||
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false)
|
||||
wrap(Behavior.validateAsInitial(Behavior.start(behavior, ctx)), afterException = false)
|
||||
} catch {
|
||||
case NonFatal(ex: Thr) ⇒
|
||||
log(ctx, ex)
|
||||
|
|
@ -252,7 +252,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
def init(ctx: ActorContext[Any]): Supervisor[Any, Thr] =
|
||||
try {
|
||||
val startedBehavior = Behavior.validateAsInitial(Behavior.undefer(initialBehavior, ctx))
|
||||
val startedBehavior = Behavior.validateAsInitial(Behavior.start(initialBehavior, ctx))
|
||||
new BackoffRestarter(initialBehavior, startedBehavior, strategy, restartCount, blackhole)
|
||||
} catch {
|
||||
case NonFatal(ex: Thr) ⇒
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import scala.reflect.ClassTag
|
|||
final case class TimerMsg(key: Any, generation: Int, owner: AnyRef)
|
||||
|
||||
def withTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T]): Behavior[T] = {
|
||||
scaladsl.Behaviors.deferred[T] { ctx ⇒
|
||||
scaladsl.Behaviors.setup[T] { ctx ⇒
|
||||
val timerScheduler = new TimerSchedulerImpl[T](ctx)
|
||||
val behavior = factory(timerScheduler)
|
||||
timerScheduler.intercept(behavior)
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ import scala.util.control.NonFatal
|
|||
protected def start(): Unit = {
|
||||
context.become(running)
|
||||
initializeContext()
|
||||
behavior = validateAsInitial(undefer(behavior, ctx))
|
||||
behavior = validateAsInitial(Behavior.start(behavior, ctx))
|
||||
if (!isAlive(behavior)) context.stop(self)
|
||||
}
|
||||
|
||||
|
|
@ -144,7 +144,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
override def postRestart(reason: Throwable): Unit = {
|
||||
initializeContext()
|
||||
behavior = validateAsInitial(undefer(behavior, ctx))
|
||||
behavior = validateAsInitial(Behavior.start(behavior, ctx))
|
||||
if (!isAlive(behavior)) context.stop(self)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
|
|||
type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
|
||||
|
||||
private[akka] def init[State](externalInterfaceFactory: ActorContext[AllCommands] ⇒ ExternalInterface[State]): Behavior[Command] =
|
||||
Behaviors.deferred[AllCommands] { ctx ⇒
|
||||
Behaviors.setup[AllCommands] { ctx ⇒
|
||||
val externalInterface = externalInterfaceFactory(ctx)
|
||||
behavior(
|
||||
TypedMultiMap.empty[AbstractServiceKey, KV],
|
||||
|
|
@ -89,7 +89,7 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
|
|||
* FIXME: replace by simple map in our state
|
||||
*/
|
||||
def watchWith(ctx: ActorContext[AllCommands], target: ActorRef[_], msg: AllCommands): Unit =
|
||||
ctx.spawnAnonymous[Nothing](Behaviors.deferred[Nothing] { innerCtx ⇒
|
||||
ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx ⇒
|
||||
innerCtx.watch(target)
|
||||
Behaviors.immutable[Nothing]((_, _) ⇒ Behaviors.same)
|
||||
.onSignal {
|
||||
|
|
|
|||
|
|
@ -29,17 +29,17 @@ object Behaviors {
|
|||
private def unitFunction[T] = _unitFunction.asInstanceOf[((SAC[T], Signal) ⇒ Unit)]
|
||||
|
||||
/**
|
||||
* `deferred` is a factory for a behavior. Creation of the behavior instance is deferred until
|
||||
* `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
|
||||
* the actor is started, as opposed to [[Behaviors#immutable]] that creates the behavior instance
|
||||
* immediately before the actor is running. The `factory` function pass the `ActorContext`
|
||||
* as parameter and that can for example be used for spawning child actors.
|
||||
*
|
||||
* `deferred` is typically used as the outer most behavior when spawning an actor, but it
|
||||
* `setup` is typically used as the outer most behavior when spawning an actor, but it
|
||||
* can also be returned as the next behavior when processing a message or signal. In that
|
||||
* case it will be "undeferred" immediately after it is returned, i.e. next message will be
|
||||
* processed by the undeferred behavior.
|
||||
* case it will be started immediately after it is returned, i.e. next message will be
|
||||
* processed by the started behavior.
|
||||
*/
|
||||
def deferred[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] =
|
||||
def setup[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] =
|
||||
Behavior.DeferredBehavior(ctx ⇒ factory.apply(ctx.asJava))
|
||||
|
||||
/**
|
||||
|
|
@ -55,7 +55,7 @@ object Behaviors {
|
|||
* @return the deferred behavior
|
||||
*/
|
||||
def mutable[T](factory: akka.japi.function.Function[ActorContext[T], MutableBehavior[T]]): Behavior[T] =
|
||||
deferred(factory)
|
||||
setup(factory)
|
||||
|
||||
/**
|
||||
* Mutable behavior can be implemented by extending this class and implement the
|
||||
|
|
|
|||
|
|
@ -40,17 +40,17 @@ object Behaviors {
|
|||
}
|
||||
|
||||
/**
|
||||
* `deferred` is a factory for a behavior. Creation of the behavior instance is deferred until
|
||||
* `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
|
||||
* the actor is started, as opposed to [[Behaviors.immutable]] that creates the behavior instance
|
||||
* immediately before the actor is running. The `factory` function pass the `ActorContext`
|
||||
* as parameter and that can for example be used for spawning child actors.
|
||||
*
|
||||
* `deferred` is typically used as the outer most behavior when spawning an actor, but it
|
||||
* `setup` is typically used as the outer most behavior when spawning an actor, but it
|
||||
* can also be returned as the next behavior when processing a message or signal. In that
|
||||
* case it will be "undeferred" immediately after it is returned, i.e. next message will be
|
||||
* processed by the undeferred behavior.
|
||||
* case it will be started immediately after it is returned, i.e. next message will be
|
||||
* processed by the started behavior.
|
||||
*/
|
||||
def deferred[T](factory: ActorContext[T] ⇒ Behavior[T]): Behavior[T] =
|
||||
def setup[T](factory: ActorContext[T] ⇒ Behavior[T]): Behavior[T] =
|
||||
Behavior.DeferredBehavior(factory)
|
||||
|
||||
/**
|
||||
|
|
@ -61,12 +61,11 @@ object Behaviors {
|
|||
* function. The reason for the deferred creation is to avoid sharing the same instance in
|
||||
* multiple actors, and to create a new instance when the actor is restarted.
|
||||
*
|
||||
* @param factory
|
||||
* behavior factory that takes the child actor’s context as argument
|
||||
* @param factory behavior factory that takes the child actor’s context as argument
|
||||
* @return the deferred behavior
|
||||
*/
|
||||
def mutable[T](factory: ActorContext[T] ⇒ MutableBehavior[T]): Behavior[T] =
|
||||
deferred(factory)
|
||||
setup(factory)
|
||||
|
||||
/**
|
||||
* Mutable behavior can be implemented by extending this class and implement the
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ import akka.actor.typed.Terminated
|
|||
|
||||
def behavior(settings: dd.ReplicatorSettings, underlyingReplicator: Option[akka.actor.ActorRef]): Behavior[SReplicator.Command] = {
|
||||
|
||||
Behaviors.deferred { ctx ⇒
|
||||
Behaviors.setup { ctx ⇒
|
||||
val untypedReplicator = underlyingReplicator match {
|
||||
case Some(ref) ⇒ ref
|
||||
case None ⇒
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ private[akka] object AdapterClusterImpl {
|
|||
private case object Up extends SeenState
|
||||
private case class Removed(previousStatus: MemberStatus) extends SeenState
|
||||
|
||||
private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster) = Behaviors.deferred[ClusterStateSubscription] { ctx ⇒
|
||||
private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster) = Behaviors.setup[ClusterStateSubscription] { ctx ⇒
|
||||
var seenState: SeenState = BeforeUp
|
||||
var upSubscribers: List[ActorRef[SelfUp]] = Nil
|
||||
var removedSubscribers: List[ActorRef[SelfRemoved]] = Nil
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ public class ReceptionistExampleTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
static Behavior<Ping> pingService() {
|
||||
return Behaviors.deferred((ctx) -> {
|
||||
return Behaviors.setup((ctx) -> {
|
||||
ctx.getSystem().receptionist()
|
||||
.tell(new Receptionist.Register<>(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters()));
|
||||
return Behaviors.immutable(Ping.class)
|
||||
|
|
@ -43,7 +43,7 @@ public class ReceptionistExampleTest extends JUnitSuite {
|
|||
|
||||
//#pinger
|
||||
static Behavior<Pong> pinger(ActorRef<Ping> pingService) {
|
||||
return Behaviors.deferred((ctx) -> {
|
||||
return Behaviors.setup((ctx) -> {
|
||||
pingService.tell(new Ping(ctx.getSelf()));
|
||||
return Behaviors.immutable(Pong.class)
|
||||
.onMessage(Pong.class, (c, msg) -> {
|
||||
|
|
@ -56,7 +56,7 @@ public class ReceptionistExampleTest extends JUnitSuite {
|
|||
|
||||
//#pinger-guardian
|
||||
static Behavior<Receptionist.Listing<Ping>> guardian() {
|
||||
return Behaviors.deferred((ctx) -> {
|
||||
return Behaviors.setup((ctx) -> {
|
||||
ctx.getSystem().receptionist()
|
||||
.tell(new Receptionist.Subscribe<>(PingServiceKey, ctx.getSelf()));
|
||||
ActorRef<Ping> ps = ctx.spawnAnonymous(pingService());
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ object ReplicatorSpec {
|
|||
val Key = GCounterKey("counter")
|
||||
|
||||
def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] =
|
||||
Behaviors.deferred[ClientCommand] { ctx ⇒
|
||||
Behaviors.setup[ClientCommand] { ctx ⇒
|
||||
|
||||
val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
|
||||
ctx.messageAdapter(InternalUpdateResponse.apply)
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class RemoteContextAskSpec extends TestKit(RemoteContextAskSpec.config) with Typ
|
|||
// wait until the service is seen on the first node
|
||||
val remoteRef = node1Probe.expectMessageType[Receptionist.Listing[Ping]].serviceInstances.head
|
||||
|
||||
spawn(Behaviors.deferred[AnyRef] { (ctx) ⇒
|
||||
spawn(Behaviors.setup[AnyRef] { (ctx) ⇒
|
||||
implicit val timeout: Timeout = 3.seconds
|
||||
|
||||
ctx.ask(remoteRef)(Ping) {
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class RemoteDeployNotAllowedSpec extends TestKit(RemoteDeployNotAllowedSpec.conf
|
|||
// this should throw
|
||||
try {
|
||||
ctx.spawn(
|
||||
Behaviors.deferred[AnyRef] { ctx ⇒ Behaviors.empty },
|
||||
Behaviors.setup[AnyRef] { ctx ⇒ Behaviors.empty },
|
||||
name)
|
||||
} catch {
|
||||
case ex: Exception ⇒ probe.ref ! ex
|
||||
|
|
@ -72,7 +72,7 @@ class RemoteDeployNotAllowedSpec extends TestKit(RemoteDeployNotAllowedSpec.conf
|
|||
case SpawnAnonymous ⇒
|
||||
// this should throw
|
||||
try {
|
||||
ctx.spawnAnonymous(Behaviors.deferred[AnyRef] { ctx ⇒ Behaviors.empty })
|
||||
ctx.spawnAnonymous(Behaviors.setup[AnyRef] { ctx ⇒ Behaviors.empty })
|
||||
} catch {
|
||||
case ex: Exception ⇒ probe.ref ! ex
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import scala.collection.immutable.Set
|
|||
object RandomRouter {
|
||||
|
||||
def router[T](serviceKey: ServiceKey[T]): Behavior[T] =
|
||||
Behaviors.deferred[Any] { ctx ⇒
|
||||
Behaviors.setup[Any] { ctx ⇒
|
||||
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
|
||||
|
||||
def routingBehavior(routees: Vector[ActorRef[T]]): Behavior[Any] =
|
||||
|
|
@ -45,7 +45,7 @@ object RandomRouter {
|
|||
// same as above, but also subscribes to cluster reachability events and
|
||||
// avoids routees that are unreachable
|
||||
def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
|
||||
Behaviors.deferred[Any] { ctx ⇒
|
||||
Behaviors.setup[Any] { ctx ⇒
|
||||
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
|
||||
|
||||
val cluster = Cluster(ctx.system)
|
||||
|
|
@ -93,7 +93,7 @@ object PingPongExample {
|
|||
final case object Pong
|
||||
|
||||
val pingService: Behavior[Ping] =
|
||||
Behaviors.deferred { ctx ⇒
|
||||
Behaviors.setup { ctx ⇒
|
||||
ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self, ctx.system.deadLetters)
|
||||
Behaviors.immutable[Ping] { (_, msg) ⇒
|
||||
msg match {
|
||||
|
|
@ -106,7 +106,7 @@ object PingPongExample {
|
|||
//#ping-service
|
||||
|
||||
//#pinger
|
||||
def pinger(pingService: ActorRef[Ping]) = Behaviors.deferred[Pong.type] { ctx ⇒
|
||||
def pinger(pingService: ActorRef[Ping]) = Behaviors.setup[Pong.type] { ctx ⇒
|
||||
pingService ! Ping(ctx.self)
|
||||
Behaviors.immutable { (_, msg) ⇒
|
||||
println("I was ponged!!" + msg)
|
||||
|
|
@ -116,7 +116,7 @@ object PingPongExample {
|
|||
//#pinger
|
||||
|
||||
//#pinger-guardian
|
||||
val guardian: Behavior[Listing[Ping]] = Behaviors.deferred { ctx ⇒
|
||||
val guardian: Behavior[Listing[Ping]] = Behaviors.setup { ctx ⇒
|
||||
ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self)
|
||||
val ps = ctx.spawnAnonymous(pingService)
|
||||
ctx.watch(ps)
|
||||
|
|
@ -133,7 +133,7 @@ object PingPongExample {
|
|||
//#pinger-guardian
|
||||
|
||||
//#pinger-guardian-pinger-service
|
||||
val guardianJustPingService: Behavior[Listing[Ping]] = Behaviors.deferred { ctx ⇒
|
||||
val guardianJustPingService: Behavior[Listing[Ping]] = Behaviors.setup { ctx ⇒
|
||||
val ps = ctx.spawnAnonymous(pingService)
|
||||
ctx.watch(ps)
|
||||
Behaviors.immutablePartial[Listing[Ping]] {
|
||||
|
|
@ -149,7 +149,7 @@ object PingPongExample {
|
|||
//#pinger-guardian-pinger-service
|
||||
|
||||
//#pinger-guardian-just-pinger
|
||||
val guardianJustPinger: Behavior[Listing[Ping]] = Behaviors.deferred { ctx ⇒
|
||||
val guardianJustPinger: Behavior[Listing[Ping]] = Behaviors.setup { ctx ⇒
|
||||
ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self)
|
||||
Behaviors.immutablePartial[Listing[Ping]] {
|
||||
case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty ⇒
|
||||
|
|
|
|||
|
|
@ -252,10 +252,10 @@ particular one using the `immutable` behavior decorator. The
|
|||
provided `onSignal` function will be invoked for signals (subclasses of `Signal`)
|
||||
or the `onMessage` function for user messages.
|
||||
|
||||
This particular `main` Actor is created using `Behaviors.deferred`, which is like a factory for a behavior.
|
||||
This particular `main` Actor is created using `Behaviors.onStart`, which is like a factory for a behavior.
|
||||
Creation of the behavior instance is deferred until the actor is started, as opposed to `Behaviors.immutable`
|
||||
that creates the behavior instance immediately before the actor is running. The factory function in
|
||||
`deferred` pass the `ActorContext` as parameter and that can for example be used for spawning child actors.
|
||||
`onStart` is passed the `ActorContext` as parameter and that can for example be used for spawning child actors.
|
||||
This `main` Actor creates the chat room and the gabbler and the session between them is initiated, and when the
|
||||
gabbler is finished we will receive the `Terminated` event due to having
|
||||
called `ctx.watch` for it. This allows us to shut down the Actor system: when
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ their registration order, i.e. the last registered first.
|
|||
|
||||
A message adapter (and the returned `ActorRef`) has the same lifecycle as
|
||||
the receiving actor. It's recommended to register the adapters in a top level
|
||||
`Behaviors.deferred` or constructor of `MutableBehavior` but it's possible to
|
||||
`Behaviors.setup` or constructor of `MutableBehavior` but it's possible to
|
||||
register them later also if needed.
|
||||
|
||||
The adapter function is running in the receiving actor and can safely access state of it, but if it throws an exception the actor is stopped.
|
||||
|
|
|
|||
|
|
@ -231,6 +231,6 @@ Java
|
|||
|
||||
## Current limitations
|
||||
|
||||
* The `PersistentBehavior` can't be wrapped in other behaviors, such as `Behaviors.deferred`. See [#23694](https://github.com/akka/akka/issues/23694)
|
||||
* The `PersistentBehavior` can't be wrapped in other behaviors, such as `Behaviors.setup`. See [#23694](https://github.com/akka/akka/issues/23694)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -267,7 +267,7 @@ object PersistentActorCompileOnlyTest {
|
|||
|
||||
def isFullyHydrated(basket: Basket, ids: List[Id]) = basket.items.map(_.id) == ids
|
||||
|
||||
Behaviors.deferred { ctx: ActorContext[Command] ⇒
|
||||
Behaviors.setup { ctx: ActorContext[Command] ⇒
|
||||
// FIXME this doesn't work, wrapping not supported
|
||||
|
||||
var basket = Basket(Nil)
|
||||
|
|
|
|||
|
|
@ -362,7 +362,7 @@ class PersistentActorSpec extends TestKit(PersistentActorSpec.config) with Event
|
|||
|
||||
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
|
||||
val probe = TestProbe[String]()
|
||||
val w = Behaviors.deferred[Any] { (ctx) ⇒
|
||||
val w = Behaviors.setup[Any] { (ctx) ⇒
|
||||
ctx.watch(toWatch)
|
||||
Behaviors.immutable[Any] { (_, _) ⇒ Behaviors.same }
|
||||
.onSignal {
|
||||
|
|
|
|||
|
|
@ -177,7 +177,7 @@ class BehaviorTestkit[T] private (_name: String, _initialBehavior: Behavior[T])
|
|||
}
|
||||
}
|
||||
|
||||
private var current = Behavior.validateAsInitial(Behavior.undefer(_initialBehavior, ctx))
|
||||
private var current = Behavior.validateAsInitial(Behavior.start(_initialBehavior, ctx))
|
||||
|
||||
def currentBehavior: Behavior[T] = current
|
||||
def isAlive: Boolean = Behavior.isAlive(current)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue