* Removed `Behavior.same`, `Behavior.unhandled`, `Behavior.stopped`, `Behavior.empty`, and `Behavior.ignore` since they were redundant with corresponding in Behaviors * Also moved several of the internal things from Behavior to BehaviorImpl
This commit is contained in:
parent
6e6b157775
commit
5bb83899e7
40 changed files with 228 additions and 289 deletions
|
|
@ -233,7 +233,7 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T
|
|||
|
||||
val n = if (name != "") s"${childName.next()}-$name" else childName.next()
|
||||
val p = (path / n).withUid(rnd().nextInt())
|
||||
val i = new BehaviorTestKitImpl[U](p, Behavior.ignore)
|
||||
val i = new BehaviorTestKitImpl[U](p, BehaviorImpl.ignore)
|
||||
_children += p.name -> i
|
||||
|
||||
new FunctionRef[U](p, (message, _) => {
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ abstract class BehaviorTestKit[T] {
|
|||
|
||||
/**
|
||||
* Returns the current behavior as it was returned from processing the previous message.
|
||||
* For example if [[Behavior.unhandled]] is returned it will be kept here, but not in
|
||||
* For example if [[Behaviors.unhandled]] is returned it will be kept here, but not in
|
||||
* [[currentBehavior]].
|
||||
*/
|
||||
def returnedBehavior: Behavior[T]
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ trait BehaviorTestKit[T] {
|
|||
|
||||
/**
|
||||
* Returns the current behavior as it was returned from processing the previous message.
|
||||
* For example if [[Behavior.unhandled]] is returned it will be kept here, but not in
|
||||
* For example if [[Behaviors.unhandled]] is returned it will be kept here, but not in
|
||||
* [[currentBehavior]].
|
||||
*/
|
||||
def returnedBehavior: Behavior[T]
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ object BehaviorTestKitSpec {
|
|||
case SpawnSession(replyTo, sessionHandler) =>
|
||||
val session = context.spawnAnonymous[String](Behaviors.receiveMessage { message =>
|
||||
sessionHandler ! message
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
})
|
||||
replyTo ! session
|
||||
Behaviors.same
|
||||
|
|
@ -257,8 +257,8 @@ class BehaviorTestKitSpec extends WordSpec with Matchers {
|
|||
"run behaviors with messages without canonicalization" in {
|
||||
val testkit = BehaviorTestKit[Father.Command](Father.init)
|
||||
testkit.run(SpawnAdapterWithName("adapter"))
|
||||
testkit.currentBehavior should not be Behavior.same
|
||||
testkit.returnedBehavior shouldBe Behavior.same
|
||||
testkit.currentBehavior should not be Behaviors.same
|
||||
testkit.returnedBehavior shouldBe Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.actor.typed;
|
||||
|
||||
import akka.Done;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
|
|
@ -18,7 +19,7 @@ public class ActorSystemTest extends JUnitSuite {
|
|||
@Test
|
||||
public void testGetWhenTerminated() throws Exception {
|
||||
final ActorSystem<Void> system =
|
||||
ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem");
|
||||
ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedSystem");
|
||||
system.terminate();
|
||||
final CompletionStage<Done> cs = system.getWhenTerminated();
|
||||
cs.toCompletableFuture().get(2, SECONDS);
|
||||
|
|
@ -27,7 +28,7 @@ public class ActorSystemTest extends JUnitSuite {
|
|||
@Test
|
||||
public void testGetWhenTerminatedWithoutTermination() {
|
||||
final ActorSystem<Void> system =
|
||||
ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination");
|
||||
ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedWithoutTermination");
|
||||
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.actor.typed;
|
||||
|
||||
import akka.actor.setup.ActorSystemSetup;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.Test;
|
||||
|
|
@ -54,7 +55,7 @@ public class ExtensionsTest extends JUnitSuite {
|
|||
"akka.actor.typed.extensions += \"akka.actor.typed.ExtensionsTest$MyExtension\"")
|
||||
.resolve();
|
||||
final ActorSystem<Object> system =
|
||||
ActorSystem.create(Behavior.empty(), "loadJavaExtensionsFromConfig", cfg);
|
||||
ActorSystem.create(Behaviors.empty(), "loadJavaExtensionsFromConfig", cfg);
|
||||
|
||||
try {
|
||||
// note that this is not the intended end user way to access it
|
||||
|
|
@ -71,7 +72,7 @@ public class ExtensionsTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
public void loadScalaExtension() {
|
||||
final ActorSystem<Object> system = ActorSystem.create(Behavior.empty(), "loadScalaExtension");
|
||||
final ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "loadScalaExtension");
|
||||
try {
|
||||
DummyExtension1 instance1 = DummyExtension1.get(system);
|
||||
DummyExtension1 instance2 = DummyExtension1.get(system);
|
||||
|
|
@ -86,7 +87,7 @@ public class ExtensionsTest extends JUnitSuite {
|
|||
public void overrideExtensionsViaActorSystemSetup() {
|
||||
final ActorSystem<Object> system =
|
||||
ActorSystem.create(
|
||||
Behavior.empty(),
|
||||
Behaviors.empty(),
|
||||
"overrideExtensionsViaActorSystemSetup",
|
||||
ActorSystemSetup.create(new MyExtensionSetup(sys -> new MyExtImplViaSetup())));
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,6 @@ public class EventStreamTest {
|
|||
|
||||
public static void compileOnlyTest(ActorSystem<?> actorSystem, ActorRef<SomeClass> actorRef) {
|
||||
actorSystem.eventStream().tell(Subscribe.of(SomeClass.class, actorRef));
|
||||
actorSystem.eventStream().tell(new Subscribe(SomeClass.class, actorRef));
|
||||
actorSystem.eventStream().tell(new Subscribe<>(SomeClass.class, actorRef));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -575,7 +575,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
|||
context.spawn(
|
||||
new PrepareToLeaveHome(message.who, message.respondTo, keyCabinet, drawer),
|
||||
"leaving" + message.who);
|
||||
return Behavior.same();
|
||||
return Behaviors.same();
|
||||
})
|
||||
.build();
|
||||
});
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
.receiveSignal {
|
||||
case (_, signal) =>
|
||||
probe.ref ! GotChildSignal(signal)
|
||||
Behavior.stopped
|
||||
Behaviors.stopped
|
||||
}
|
||||
.decorate
|
||||
|
||||
|
|
@ -192,12 +192,12 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
.receivePartial[Command] {
|
||||
case (context, StopRef(ref)) =>
|
||||
context.stop(ref)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
.receiveSignal {
|
||||
case (_, signal) =>
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behavior.stopped
|
||||
Behaviors.stopped
|
||||
}
|
||||
.decorate
|
||||
})
|
||||
|
|
@ -232,7 +232,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
.receiveSignal {
|
||||
case (_, signal) =>
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behavior.stopped
|
||||
Behaviors.stopped
|
||||
}
|
||||
})
|
||||
.decorate
|
||||
|
|
@ -251,7 +251,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
case (_, Ping) =>
|
||||
counter += 1
|
||||
probe.ref ! counter
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case (_, Fail) =>
|
||||
throw new TestException("Boom")
|
||||
}
|
||||
|
|
@ -277,7 +277,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
case (_, Ping) =>
|
||||
counter += 1
|
||||
probe.ref ! counter
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case (_, Fail) =>
|
||||
throw new TestException("Boom")
|
||||
}
|
||||
|
|
@ -307,7 +307,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
.receiveSignal {
|
||||
case (_, PostStop) =>
|
||||
probe.ref ! ReceivedSignal(PostStop)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
.decorate
|
||||
val actorToWatch = spawn(behavior)
|
||||
|
|
@ -317,12 +317,12 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
case (context, Ping) =>
|
||||
context.watch(actorToWatch)
|
||||
probe.ref ! Pong
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
.receiveSignal {
|
||||
case (_, signal) =>
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
.decorate)
|
||||
actorToWatch ! Ping
|
||||
|
|
@ -480,7 +480,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
|
|||
.receiveSignal {
|
||||
case (_, signal) =>
|
||||
probe.ref ! GotChildSignal(signal)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
.decorate
|
||||
val actor = spawn(
|
||||
|
|
|
|||
|
|
@ -148,15 +148,15 @@ class AskSpec extends ScalaTestWithActorTestKit("""
|
|||
throw new RuntimeException("Unsupported number")
|
||||
case _ => "test"
|
||||
}
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case (_, "test") =>
|
||||
probe.ref ! "got-test"
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case (_, "get-state") =>
|
||||
probe.ref ! "running"
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case (_, _) =>
|
||||
Behavior.unhandled
|
||||
Behaviors.unhandled
|
||||
}
|
||||
|
||||
val ref = spawn(behv)
|
||||
|
|
|
|||
|
|
@ -240,7 +240,8 @@ object BehaviorSpec {
|
|||
"Unhandled" must {
|
||||
"must return Unhandled" in {
|
||||
val Setup(testKit, inbox, aux) = mkCtx()
|
||||
Behavior.interpretMessage(testKit.currentBehavior, testKit.context, Miss) should be(Behavior.UnhandledBehavior)
|
||||
val next = Behavior.interpretMessage(testKit.currentBehavior, testKit.context, Miss)
|
||||
Behavior.isUnhandled(next) should ===(true)
|
||||
inbox.receiveAll() should ===(Missed :: Nil)
|
||||
checkAux(Miss, aux)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,13 +7,14 @@ package akka.actor.typed
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
import akka.actor.BootstrapSetup
|
||||
import akka.actor.setup.ActorSystemSetup
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class DummyExtension1 extends Extension
|
||||
|
|
@ -123,7 +124,7 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
"handle extensions that fail to initialize" in {
|
||||
def create(): Unit = {
|
||||
ActorSystem[Any](
|
||||
Behavior.EmptyBehavior,
|
||||
Behaviors.empty[Any],
|
||||
"ExtensionsSpec04",
|
||||
ConfigFactory.parseString("""
|
||||
akka.actor.typed.extensions = ["akka.actor.typed.FailingToLoadExtension$"]
|
||||
|
|
@ -259,8 +260,8 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
case None => BootstrapSetup(ExtensionsSpec.config)
|
||||
}
|
||||
val sys = setup match {
|
||||
case None => ActorSystem[Any](Behavior.EmptyBehavior, name, bootstrap)
|
||||
case Some(s) => ActorSystem[Any](Behavior.EmptyBehavior, name, s.and(bootstrap))
|
||||
case None => ActorSystem[Any](Behaviors.empty[Any], name, bootstrap)
|
||||
case Some(s) => ActorSystem[Any](Behaviors.empty[Any], name, s.and(bootstrap))
|
||||
}
|
||||
|
||||
try f(sys)
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ class OrElseSpec extends ScalaTestWithActorTestKit("""
|
|||
val probe = TestProbe[String]()
|
||||
spawn(Behaviors.setup[String] { ctx =>
|
||||
// arrange with a deathwatch triggering
|
||||
ctx.watch(ctx.spawnAnonymous(Behavior.stopped[String]))
|
||||
ctx.watch(ctx.spawnAnonymous(Behaviors.stopped[String]))
|
||||
|
||||
Behaviors
|
||||
.receiveSignal[String] {
|
||||
|
|
@ -195,18 +195,18 @@ class OrElseSpec extends ScalaTestWithActorTestKit("""
|
|||
EventFilter[DeathPactException](occurrences = 1).intercept {
|
||||
spawn(Behaviors.setup[String] { ctx =>
|
||||
// arrange with a deathwatch triggering
|
||||
ctx.watch(ctx.spawnAnonymous(Behavior.stopped[String]))
|
||||
ctx.watch(ctx.spawnAnonymous(Behaviors.stopped[String]))
|
||||
|
||||
Behaviors
|
||||
.receiveSignal[String] {
|
||||
case (_, Terminated(_)) =>
|
||||
probe.ref ! "first handler saw it"
|
||||
Behavior.unhandled
|
||||
Behaviors.unhandled
|
||||
}
|
||||
.orElse(Behaviors.receiveSignal {
|
||||
case (_, Terminated(_)) =>
|
||||
probe.ref ! "second handler saw it"
|
||||
Behavior.unhandled
|
||||
Behaviors.unhandled
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,8 +29,8 @@ object LocalReceptionistSpec {
|
|||
case object Stop extends ServiceA with ServiceB
|
||||
val stoppableBehavior = Behaviors.receive[Any] { (_, message) =>
|
||||
message match {
|
||||
case Stop => Behavior.stopped
|
||||
case _ => Behavior.same
|
||||
case Stop => Behaviors.stopped
|
||||
case _ => Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,8 +27,8 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
|
|||
def compileOnlyApiCoverage(): Unit = {
|
||||
Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting()
|
||||
|
||||
Routers.pool(10)(() => Behavior.empty[Any]).withRandomRouting()
|
||||
Routers.pool(10)(() => Behavior.empty[Any]).withRoundRobinRouting()
|
||||
Routers.pool(10)(() => Behaviors.empty[Any]).withRandomRouting()
|
||||
Routers.pool(10)(() => Behaviors.empty[Any]).withRoundRobinRouting()
|
||||
}
|
||||
|
||||
"The router pool" must {
|
||||
|
|
|
|||
|
|
@ -304,7 +304,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
Behaviors.receiveMessage[String] {
|
||||
case msg if msg.startsWith("stash") =>
|
||||
stash.stash(msg)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case "unstash" =>
|
||||
stash.unstashAll(ctx, unstashing(0))
|
||||
case "get-current" =>
|
||||
|
|
@ -329,7 +329,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
case (ctx, "unstash") =>
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
stash.unstashAll(ctx, Behavior.same)
|
||||
stash.unstashAll(ctx, Behaviors.same)
|
||||
|
||||
case (_, msg) =>
|
||||
probe.ref ! msg
|
||||
|
|
@ -373,7 +373,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
case (ctx, "unstash") =>
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
stash.unstashAll(ctx, Behavior.same)
|
||||
stash.unstashAll(ctx, Behaviors.same)
|
||||
|
||||
case (_, msg) =>
|
||||
probe.ref ! msg
|
||||
|
|
@ -570,7 +570,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
}
|
||||
case msg =>
|
||||
probe.ref ! msg
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
})
|
||||
|
||||
|
|
@ -590,7 +590,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
|
||||
def unstashing(n: Int): Behavior[String] =
|
||||
Behaviors.receiveMessage {
|
||||
case "unhandled" => Behavior.unhandled
|
||||
case "unhandled" => Behaviors.unhandled
|
||||
case "handled" =>
|
||||
probe.ref ! s"handled $n"
|
||||
unstashing(n + 1)
|
||||
|
|
@ -616,7 +616,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
"fail quick on invalid start behavior" in {
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
intercept[IllegalArgumentException](stash.unstashAll(null, Behavior.unhandled))
|
||||
intercept[IllegalArgumentException](stash.unstashAll(null, Behaviors.unhandled))
|
||||
}
|
||||
|
||||
"deal with initial stop" in {
|
||||
|
|
@ -649,10 +649,10 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
stash.unstashAll(ctx, Behaviors.receiveMessage {
|
||||
case unstashed =>
|
||||
probe.ref ! unstashed
|
||||
Behavior.stopped
|
||||
Behaviors.stopped
|
||||
})
|
||||
case _ =>
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
})
|
||||
ref ! "unstash"
|
||||
|
|
@ -673,7 +673,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
|||
stash.unstashAll(ctx, Behaviors.same)
|
||||
case msg =>
|
||||
probe.ref ! msg
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
})
|
||||
ref ! "unstash"
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ class AdapterSpec extends AkkaSpec("""
|
|||
var system: akka.actor.typed.ActorSystem[NotUsed] = null
|
||||
try {
|
||||
system = ActorSystem.create(
|
||||
Behaviors.setup[NotUsed](_ => Behavior.stopped[NotUsed]),
|
||||
Behaviors.setup[NotUsed](_ => Behaviors.stopped[NotUsed]),
|
||||
"AdapterSpec-stopping-guardian")
|
||||
} finally if (system != null) shutdown(system.toUntyped)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -296,7 +296,7 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
|
|||
message match {
|
||||
case LeaveHome(who, respondTo) =>
|
||||
context.spawn(prepareToLeaveHome(who, respondTo, keyCabinet, drawer), s"leaving-$who")
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -323,10 +323,10 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
|
|||
case (Some(w), Some(k)) =>
|
||||
// we got both, "session" is completed!
|
||||
respondTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
|
||||
Behavior.stopped
|
||||
Behaviors.stopped
|
||||
|
||||
case _ =>
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
|
|
|
|||
|
|
@ -4,35 +4,18 @@
|
|||
|
||||
package akka.actor.typed
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.actor.typed.internal.{ BehaviorImpl, InterceptorImpl }
|
||||
import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior
|
||||
import akka.util.{ LineNumbers, OptionVal }
|
||||
import akka.annotation.{ DoNotInherit, InternalApi }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
||||
|
||||
import scala.annotation.switch
|
||||
import scala.annotation.tailrec
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object BehaviorTags {
|
||||
|
||||
// optimization - by keeping an identifier for each concrete subtype of behavior
|
||||
// without gaps we can do table switches instead of instance of checks when interpreting
|
||||
// note that these must be compile time constants for it to work
|
||||
final val ExtensibleBehavior = 1
|
||||
final val EmptyBehavior = 2
|
||||
final val IgnoreBehavior = 3
|
||||
final val UnhandledBehavior = 4
|
||||
final val DeferredBehavior = 5
|
||||
final val SameBehavior = 6
|
||||
final val FailedBehavior = 7
|
||||
final val StoppedBehavior = 8
|
||||
|
||||
}
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.actor.typed.internal.BehaviorImpl
|
||||
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior
|
||||
import akka.actor.typed.internal.BehaviorTags
|
||||
import akka.actor.typed.internal.InterceptorImpl
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
* The behavior of an actor defines how it reacts to the messages that it
|
||||
|
|
@ -83,7 +66,7 @@ abstract class Behavior[T](private[akka] val _tag: Int) { behavior =>
|
|||
*
|
||||
* @param that the fallback `Behavior`
|
||||
**/
|
||||
final def orElse(that: Behavior[T]): Behavior[T] = Behavior.DeferredBehavior[T] { ctx =>
|
||||
final def orElse(that: Behavior[T]): Behavior[T] = BehaviorImpl.DeferredBehavior[T] { ctx =>
|
||||
new OrElseBehavior[T](Behavior.start(this, ctx), Behavior.start(that, ctx))
|
||||
}
|
||||
}
|
||||
|
|
@ -159,150 +142,6 @@ object Behavior {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing in order to advise the
|
||||
* system to reuse the previous behavior. This is provided in order to
|
||||
* avoid the allocation overhead of recreating the current behavior where
|
||||
* that is not necessary.
|
||||
*/
|
||||
def same[T]: Behavior[T] = SameBehavior.unsafeCast[T]
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing in order to advise the
|
||||
* system to reuse the previous behavior, including the hint that the
|
||||
* message has not been handled. This hint may be used by composite
|
||||
* behaviors that delegate (partial) handling to other behaviors.
|
||||
*/
|
||||
def unhandled[T]: Behavior[T] = UnhandledBehavior.unsafeCast[T]
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing to signal that this actor
|
||||
* shall terminate voluntarily. If this actor has created child actors then
|
||||
* these will be stopped as part of the shutdown procedure.
|
||||
*
|
||||
* The PostStop signal that results from stopping this actor will be passed to the
|
||||
* current behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
def stopped[T]: Behavior[T] = StoppedBehavior.unsafeCast[T]
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing to signal that this actor
|
||||
* shall terminate voluntarily. If this actor has created child actors then
|
||||
* these will be stopped as part of the shutdown procedure.
|
||||
*
|
||||
* The PostStop signal that results from stopping this actor will be passed to the
|
||||
* given `postStop` behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
def stopped[T](postStop: () => Unit): Behavior[T] =
|
||||
new StoppedBehavior[T](OptionVal.Some((_: TypedActorContext[T]) => postStop()))
|
||||
|
||||
/**
|
||||
* A behavior that treats every incoming message as unhandled.
|
||||
*/
|
||||
def empty[T]: Behavior[T] = EmptyBehavior.unsafeCast[T]
|
||||
|
||||
/**
|
||||
* A behavior that ignores every incoming message and returns “same”.
|
||||
*/
|
||||
def ignore[T]: Behavior[T] = IgnoreBehavior.unsafeCast[T]
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object EmptyBehavior extends Behavior[Any](BehaviorTags.EmptyBehavior) {
|
||||
override def toString = "Empty"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object IgnoreBehavior extends Behavior[Any](BehaviorTags.IgnoreBehavior) {
|
||||
override def toString = "Ignore"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object UnhandledBehavior extends Behavior[Nothing](BehaviorTags.UnhandledBehavior) {
|
||||
override def toString = "Unhandled"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] def failed[T](cause: Throwable): Behavior[T] = new FailedBehavior(cause).asInstanceOf[Behavior[T]]
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] val unhandledSignal
|
||||
: PartialFunction[(TypedActorContext[Nothing], Signal), Behavior[Nothing]] = {
|
||||
case (_, _) => UnhandledBehavior
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Not placed in internal.BehaviorImpl because Behavior is sealed.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] abstract class DeferredBehavior[T] extends Behavior[T](BehaviorTags.DeferredBehavior) {
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T]
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi
|
||||
private[akka] object DeferredBehavior {
|
||||
def apply[T](factory: SAC[T] => Behavior[T]): Behavior[T] =
|
||||
new DeferredBehavior[T] {
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T] = factory(ctx.asScala)
|
||||
override def toString: String = s"Deferred(${LineNumbers(factory)})"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object SameBehavior extends Behavior[Nothing](BehaviorTags.SameBehavior) {
|
||||
override def toString = "Same"
|
||||
}
|
||||
|
||||
private[akka] class FailedBehavior(val cause: Throwable) extends Behavior[Nothing](BehaviorTags.FailedBehavior) {
|
||||
override def toString: String = s"Failed($cause)"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None)
|
||||
|
||||
/**
|
||||
* INTERNAL API: When the cell is stopping this behavior is used, so
|
||||
* that PostStop can be sent to previous behavior from `finishTerminate`.
|
||||
*/
|
||||
private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit])
|
||||
extends Behavior[T](BehaviorTags.StoppedBehavior) {
|
||||
|
||||
def onPostStop(ctx: TypedActorContext[T]): Unit = {
|
||||
postStop match {
|
||||
case OptionVal.Some(callback) => callback(ctx)
|
||||
case OptionVal.None =>
|
||||
}
|
||||
}
|
||||
|
||||
override def toString = "Stopped" + {
|
||||
postStop match {
|
||||
case OptionVal.Some(callback) => s"(${LineNumbers(callback)})"
|
||||
case _ => "()"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a possibly special behavior (same or unhandled) and a
|
||||
* “current” behavior (which defines the meaning of encountering a `same`
|
||||
|
|
@ -378,7 +217,7 @@ object Behavior {
|
|||
/**
|
||||
* Returns true if the given behavior is the special `unhandled` marker.
|
||||
*/
|
||||
def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq UnhandledBehavior
|
||||
def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq BehaviorImpl.UnhandledBehavior
|
||||
|
||||
/**
|
||||
* Returns true if the given behavior is the special `Unhandled` marker.
|
||||
|
|
@ -398,8 +237,8 @@ object Behavior {
|
|||
val result = interpret(behavior, ctx, signal, isSignal = true)
|
||||
// we need to throw here to allow supervision of deathpact exception
|
||||
signal match {
|
||||
case Terminated(ref) if result == UnhandledBehavior => throw DeathPactException(ref)
|
||||
case _ => result
|
||||
case Terminated(ref) if result == BehaviorImpl.UnhandledBehavior => throw DeathPactException(ref)
|
||||
case _ => result
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -420,7 +259,7 @@ object Behavior {
|
|||
case BehaviorTags.DeferredBehavior =>
|
||||
throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter")
|
||||
case BehaviorTags.IgnoreBehavior =>
|
||||
Behavior.same[T]
|
||||
BehaviorImpl.same[T]
|
||||
case BehaviorTags.StoppedBehavior =>
|
||||
val s = behavior.asInstanceOf[StoppedBehavior[T]]
|
||||
if (msg == PostStop) s.onPostStop(ctx)
|
||||
|
|
@ -428,7 +267,7 @@ object Behavior {
|
|||
case BehaviorTags.FailedBehavior =>
|
||||
behavior
|
||||
case BehaviorTags.EmptyBehavior =>
|
||||
Behavior.unhandled[T]
|
||||
BehaviorImpl.unhandled[T]
|
||||
case BehaviorTags.ExtensibleBehavior =>
|
||||
val ext = behavior.asInstanceOf[ExtensibleBehavior[T]]
|
||||
val possiblyDeferredResult =
|
||||
|
|
|
|||
|
|
@ -5,16 +5,36 @@
|
|||
package akka.actor.typed
|
||||
package internal
|
||||
|
||||
import akka.util.{ LineNumbers }
|
||||
import akka.util.LineNumbers
|
||||
import akka.annotation.InternalApi
|
||||
import akka.actor.typed.{ TypedActorContext => AC }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object BehaviorTags {
|
||||
|
||||
// optimization - by keeping an identifier for each concrete subtype of behavior
|
||||
// without gaps we can do table switches instead of instance of checks when interpreting
|
||||
// note that these must be compile time constants for it to work
|
||||
final val ExtensibleBehavior = 1
|
||||
final val EmptyBehavior = 2
|
||||
final val IgnoreBehavior = 3
|
||||
final val UnhandledBehavior = 4
|
||||
final val DeferredBehavior = 5
|
||||
final val SameBehavior = 6
|
||||
final val FailedBehavior = 7
|
||||
final val StoppedBehavior = 8
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object BehaviorImpl {
|
||||
import Behavior._
|
||||
|
||||
implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal {
|
||||
def as[U]: AC[U] = ctx.asInstanceOf[AC[U]]
|
||||
|
|
@ -23,16 +43,91 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
|||
def widened[O, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] =
|
||||
intercept(WidenedInterceptor(matcher))(behavior)
|
||||
|
||||
def same[T]: Behavior[T] = SameBehavior.unsafeCast[T]
|
||||
|
||||
def unhandled[T]: Behavior[T] = UnhandledBehavior.unsafeCast[T]
|
||||
|
||||
def stopped[T]: Behavior[T] = StoppedBehavior.unsafeCast[T]
|
||||
|
||||
def stopped[T](postStop: () => Unit): Behavior[T] =
|
||||
new StoppedBehavior[T](OptionVal.Some((_: TypedActorContext[T]) => postStop()))
|
||||
|
||||
def empty[T]: Behavior[T] = EmptyBehavior.unsafeCast[T]
|
||||
|
||||
def ignore[T]: Behavior[T] = IgnoreBehavior.unsafeCast[T]
|
||||
|
||||
def failed[T](cause: Throwable): Behavior[T] = new FailedBehavior(cause).asInstanceOf[Behavior[T]]
|
||||
|
||||
val unhandledSignal: PartialFunction[(TypedActorContext[Nothing], Signal), Behavior[Nothing]] = {
|
||||
case (_, _) => UnhandledBehavior
|
||||
}
|
||||
|
||||
private object EmptyBehavior extends Behavior[Any](BehaviorTags.EmptyBehavior) {
|
||||
override def toString = "Empty"
|
||||
}
|
||||
|
||||
private object IgnoreBehavior extends Behavior[Any](BehaviorTags.IgnoreBehavior) {
|
||||
override def toString = "Ignore"
|
||||
}
|
||||
|
||||
object UnhandledBehavior extends Behavior[Nothing](BehaviorTags.UnhandledBehavior) {
|
||||
override def toString = "Unhandled"
|
||||
}
|
||||
|
||||
object SameBehavior extends Behavior[Nothing](BehaviorTags.SameBehavior) {
|
||||
override def toString = "Same"
|
||||
}
|
||||
|
||||
class FailedBehavior(val cause: Throwable) extends Behavior[Nothing](BehaviorTags.FailedBehavior) {
|
||||
override def toString: String = s"Failed($cause)"
|
||||
}
|
||||
|
||||
object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None)
|
||||
|
||||
/**
|
||||
* When the cell is stopping this behavior is used, so
|
||||
* that PostStop can be sent to previous behavior from `finishTerminate`.
|
||||
*/
|
||||
private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit])
|
||||
extends Behavior[T](BehaviorTags.StoppedBehavior) {
|
||||
|
||||
def onPostStop(ctx: TypedActorContext[T]): Unit = {
|
||||
postStop match {
|
||||
case OptionVal.Some(callback) => callback(ctx)
|
||||
case OptionVal.None =>
|
||||
}
|
||||
}
|
||||
|
||||
override def toString = "Stopped" + {
|
||||
postStop match {
|
||||
case OptionVal.Some(callback) => s"(${LineNumbers(callback)})"
|
||||
case _ => "()"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class DeferredBehavior[T] extends Behavior[T](BehaviorTags.DeferredBehavior) {
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T]
|
||||
}
|
||||
|
||||
object DeferredBehavior {
|
||||
def apply[T](factory: SAC[T] => Behavior[T]): Behavior[T] =
|
||||
new DeferredBehavior[T] {
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T] = factory(ctx.asScala)
|
||||
override def toString: String = s"Deferred(${LineNumbers(factory)})"
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiveBehavior[T](
|
||||
val onMessage: (SAC[T], T) => Behavior[T],
|
||||
onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] =
|
||||
Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
extends ExtensibleBehavior[T] {
|
||||
|
||||
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
|
||||
onSignal.applyOrElse(
|
||||
(ctx.asScala, msg),
|
||||
Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
|
||||
override def receive(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg)
|
||||
|
||||
|
|
@ -47,7 +142,7 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
|||
class ReceiveMessageBehavior[T](
|
||||
val onMessage: T => Behavior[T],
|
||||
onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] =
|
||||
Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
extends ExtensibleBehavior[T] {
|
||||
|
||||
override def receive(ctx: AC[T], msg: T) = onMessage(msg)
|
||||
|
|
@ -55,7 +150,7 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
|||
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
|
||||
onSignal.applyOrElse(
|
||||
(ctx.asScala, msg),
|
||||
Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
|
||||
override def toString = s"ReceiveMessage(${LineNumbers(onMessage)})"
|
||||
}
|
||||
|
|
@ -87,7 +182,7 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
|||
// since we don't know what kind of concrete Behavior `first` is, if it is intercepted etc.
|
||||
// the only way we can fallback to second behavior if Terminated wasn't handled is to
|
||||
// catch the DeathPact here and pretend like it was just `unhandled`
|
||||
Behavior.unhandled
|
||||
BehaviorImpl.unhandled
|
||||
}
|
||||
|
||||
result match {
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@
|
|||
package akka.actor.typed.internal
|
||||
|
||||
import akka.actor.typed
|
||||
import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior }
|
||||
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.{ LogOptions, _ }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.LineNumbers
|
||||
|
|
@ -20,7 +20,7 @@ import akka.util.LineNumbers
|
|||
private[akka] object InterceptorImpl {
|
||||
|
||||
def apply[O, I](interceptor: BehaviorInterceptor[O, I], nestedBehavior: Behavior[I]): Behavior[O] = {
|
||||
Behavior.DeferredBehavior[O] { ctx =>
|
||||
BehaviorImpl.DeferredBehavior[O] { ctx =>
|
||||
val interceptorBehavior = new InterceptorImpl[O, I](interceptor, nestedBehavior)
|
||||
interceptorBehavior.preStart(ctx)
|
||||
}
|
||||
|
|
@ -89,7 +89,7 @@ private[akka] final class InterceptorImpl[O, I](
|
|||
|
||||
private def deduplicate(interceptedResult: Behavior[I], ctx: TypedActorContext[O]): Behavior[O] = {
|
||||
val started = Behavior.start(interceptedResult, ctx.asInstanceOf[TypedActorContext[I]])
|
||||
if (started == UnhandledBehavior || started == SameBehavior || !Behavior.isAlive(started)) {
|
||||
if (started == BehaviorImpl.UnhandledBehavior || started == BehaviorImpl.SameBehavior || !Behavior.isAlive(started)) {
|
||||
started.unsafeCast[O]
|
||||
} else {
|
||||
// returned behavior could be nested in setups, so we need to start before we deduplicate
|
||||
|
|
@ -203,7 +203,7 @@ private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction
|
|||
}
|
||||
|
||||
matcher.applyOrElse(msg, any2null) match {
|
||||
case null => Behavior.unhandled
|
||||
case null => Behaviors.unhandled
|
||||
case transformed => target(ctx, transformed)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ import akka.annotation.InternalApi
|
|||
signal match {
|
||||
case p: PoisonPill =>
|
||||
val next = target(ctx, p)
|
||||
if (Behavior.isUnhandled(next)) Behavior.stopped
|
||||
if (Behavior.isUnhandled(next)) BehaviorImpl.stopped
|
||||
else next
|
||||
case _ => target(ctx, signal)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ import akka.util.ConstantFun
|
|||
}
|
||||
|
||||
val actualNext =
|
||||
if (interpretResult == Behavior.same) b2
|
||||
if (interpretResult == BehaviorImpl.same) b2
|
||||
else if (Behavior.isUnhandled(interpretResult)) {
|
||||
ctx.asScala.onUnhandled(message)
|
||||
b2
|
||||
|
|
@ -158,7 +158,7 @@ import akka.util.ConstantFun
|
|||
val actualInitialBehavior =
|
||||
if (Behavior.isUnhandled(started))
|
||||
throw new IllegalArgumentException("Cannot unstash with unhandled as starting behavior")
|
||||
else if (started == Behavior.same) {
|
||||
else if (started == BehaviorImpl.same) {
|
||||
ctx.asScala.currentBehavior
|
||||
} else started
|
||||
|
||||
|
|
|
|||
|
|
@ -112,7 +112,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
|
|||
|
||||
protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
|
||||
Behavior.failed(t)
|
||||
BehaviorImpl.failed(t)
|
||||
}
|
||||
|
||||
// convenience if target not required to handle exception
|
||||
|
|
@ -130,7 +130,7 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Beh
|
|||
override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
|
||||
log(ctx, t)
|
||||
Behavior.failed(t)
|
||||
BehaviorImpl.failed(t)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -235,7 +235,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
if (current == restartCount) {
|
||||
restartCount = 0
|
||||
}
|
||||
Behavior.same
|
||||
BehaviorImpl.same
|
||||
} else {
|
||||
// ResetRestartCount from nested Backoff strategy
|
||||
target(ctx, msg.asInstanceOf[T])
|
||||
|
|
@ -301,7 +301,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
case _: Restart => throw t
|
||||
case _: Backoff =>
|
||||
log(ctx, t)
|
||||
Behavior.failed(t)
|
||||
BehaviorImpl.failed(t)
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -10,8 +10,8 @@ import java.lang.reflect.InvocationTargetException
|
|||
|
||||
import akka.actor.{ ActorInitializationException, ActorRefWithCell }
|
||||
import akka.{ actor => untyped }
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.Behavior.StoppedBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior
|
||||
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
|
||||
import akka.annotation.InternalApi
|
||||
import scala.annotation.tailrec
|
||||
|
|
@ -51,7 +51,6 @@ import akka.util.OptionVal
|
|||
@InternalApi private[typed] final class ActorAdapter[T](_initialBehavior: Behavior[T], rethrowTypedFailure: Boolean)
|
||||
extends untyped.Actor
|
||||
with untyped.ActorLogging {
|
||||
import Behavior._
|
||||
|
||||
private var behavior: Behavior[T] = _initialBehavior
|
||||
def currentBehavior: Behavior[T] = behavior
|
||||
|
|
@ -146,7 +145,7 @@ import akka.util.OptionVal
|
|||
case BehaviorTags.UnhandledBehavior =>
|
||||
unhandled(msg)
|
||||
case BehaviorTags.FailedBehavior =>
|
||||
val f = b.asInstanceOf[FailedBehavior]
|
||||
val f = b.asInstanceOf[BehaviorImpl.FailedBehavior]
|
||||
// For the parent untyped supervisor to pick up the exception
|
||||
if (rethrowTypedFailure) throw TypedActorFailedException(f.cause)
|
||||
else context.stop(self)
|
||||
|
|
@ -234,23 +233,23 @@ import akka.util.OptionVal
|
|||
}
|
||||
|
||||
override def preStart(): Unit = {
|
||||
if (isAlive(behavior)) {
|
||||
behavior = validateAsInitial(Behavior.start(behavior, ctx))
|
||||
if (Behavior.isAlive(behavior)) {
|
||||
behavior = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
|
||||
}
|
||||
// either was stopped initially or became stopped on start
|
||||
if (!isAlive(behavior)) context.stop(self)
|
||||
if (!Behavior.isAlive(behavior)) context.stop(self)
|
||||
}
|
||||
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||
ctx.cancelAllTimers()
|
||||
Behavior.interpretSignal(behavior, ctx, PreRestart)
|
||||
behavior = Behavior.stopped
|
||||
behavior = BehaviorImpl.stopped
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = {
|
||||
ctx.cancelAllTimers()
|
||||
behavior = validateAsInitial(Behavior.start(behavior, ctx))
|
||||
if (!isAlive(behavior)) context.stop(self)
|
||||
behavior = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
|
||||
if (!Behavior.isAlive(behavior)) context.stop(self)
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
|
|
@ -260,7 +259,7 @@ import akka.util.OptionVal
|
|||
// Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
|
||||
case b => Behavior.interpretSignal(b, ctx, PostStop)
|
||||
}
|
||||
behavior = Behavior.stopped
|
||||
behavior = BehaviorImpl.stopped
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -286,6 +285,6 @@ import akka.util.OptionVal
|
|||
}
|
||||
// and then to the potential stop hook, which can have a call back or not
|
||||
stopBehavior.onPostStop(ctx)
|
||||
Behavior.empty
|
||||
BehaviorImpl.empty
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.actor.typed.internal.routing
|
|||
import akka.actor.typed._
|
||||
import akka.actor.typed.scaladsl.AbstractBehavior
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
|
|
@ -72,7 +73,7 @@ private final class PoolRouterImpl[T](
|
|||
this
|
||||
} else {
|
||||
ctx.log.info("Last pool child stopped, stopping pool [{}]", ctx.self.path)
|
||||
Behavior.stopped
|
||||
Behaviors.stopped
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import akka.actor.typed.Behavior
|
|||
import akka.actor.typed.ExtensibleBehavior
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.TypedActorContext
|
||||
import akka.actor.typed.Behavior.unhandled
|
||||
import BehaviorBuilder._
|
||||
import akka.util.OptionVal
|
||||
|
||||
|
|
@ -201,7 +200,7 @@ private final class BuiltBehavior[T](messageHandlers: List[Case[T, T]], signalHa
|
|||
handler(ctx, msg)
|
||||
else receive(ctx, msg, tail)
|
||||
case Nil =>
|
||||
unhandled[T]
|
||||
Behaviors.unhandled[T]
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ object Behaviors {
|
|||
* processed by the started behavior.
|
||||
*/
|
||||
def setup[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] =
|
||||
Behavior.DeferredBehavior(ctx => factory.apply(ctx.asJava))
|
||||
BehaviorImpl.DeferredBehavior(ctx => factory.apply(ctx.asJava))
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing in order to advise the
|
||||
|
|
@ -46,7 +46,7 @@ object Behaviors {
|
|||
* avoid the allocation overhead of recreating the current behavior where
|
||||
* that is not necessary.
|
||||
*/
|
||||
def same[T]: Behavior[T] = Behavior.same
|
||||
def same[T]: Behavior[T] = BehaviorImpl.same
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing in order to advise the
|
||||
|
|
@ -54,7 +54,7 @@ object Behaviors {
|
|||
* message has not been handled. This hint may be used by composite
|
||||
* behaviors that delegate (partial) handling to other behaviors.
|
||||
*/
|
||||
def unhandled[T]: Behavior[T] = Behavior.unhandled
|
||||
def unhandled[T]: Behavior[T] = BehaviorImpl.unhandled
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing to signal that this actor
|
||||
|
|
@ -65,7 +65,7 @@ object Behaviors {
|
|||
* current behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
def stopped[T]: Behavior[T] = Behavior.stopped
|
||||
def stopped[T]: Behavior[T] = BehaviorImpl.stopped
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing to signal that this actor
|
||||
|
|
@ -76,17 +76,17 @@ object Behaviors {
|
|||
* current behavior and then the provided `postStop` callback will be invoked.
|
||||
* All other messages and signals will effectively be ignored.
|
||||
*/
|
||||
def stopped[T](postStop: Effect): Behavior[T] = Behavior.stopped(postStop.apply _)
|
||||
def stopped[T](postStop: Effect): Behavior[T] = BehaviorImpl.stopped(postStop.apply _)
|
||||
|
||||
/**
|
||||
* A behavior that treats every incoming message as unhandled.
|
||||
*/
|
||||
def empty[T]: Behavior[T] = Behavior.empty
|
||||
def empty[T]: Behavior[T] = BehaviorImpl.empty
|
||||
|
||||
/**
|
||||
* A behavior that ignores every incoming message and returns “same”.
|
||||
*/
|
||||
def ignore[T]: Behavior[T] = Behavior.ignore
|
||||
def ignore[T]: Behavior[T] = BehaviorImpl.ignore
|
||||
|
||||
/**
|
||||
* Construct an actor behavior that can react to incoming messages but not to
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.actor.typed.javadsl
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
|
||||
import akka.actor.typed.internal.routing.GroupRouterBuilder
|
||||
import akka.actor.typed.internal.routing.PoolRouterBuilder
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
|
|
|
|||
|
|
@ -60,5 +60,5 @@ abstract class AbstractBehavior[T] extends ExtensibleBehavior[T] {
|
|||
|
||||
@throws(classOf[Exception])
|
||||
override final def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] =
|
||||
onSignal.applyOrElse(msg, { case _ => Behavior.unhandled }: PartialFunction[Signal, Behavior[T]])
|
||||
onSignal.applyOrElse(msg, { case _ => Behaviors.unhandled }: PartialFunction[Signal, Behavior[T]])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ object Behaviors {
|
|||
* processed by the started behavior.
|
||||
*/
|
||||
def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] =
|
||||
Behavior.DeferredBehavior(factory)
|
||||
BehaviorImpl.DeferredBehavior(factory)
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing in order to advise the
|
||||
|
|
@ -35,7 +35,7 @@ object Behaviors {
|
|||
* avoid the allocation overhead of recreating the current behavior where
|
||||
* that is not necessary.
|
||||
*/
|
||||
def same[T]: Behavior[T] = Behavior.same
|
||||
def same[T]: Behavior[T] = BehaviorImpl.same
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing in order to advise the
|
||||
|
|
@ -43,7 +43,7 @@ object Behaviors {
|
|||
* message has not been handled. This hint may be used by composite
|
||||
* behaviors that delegate (partial) handling to other behaviors.
|
||||
*/
|
||||
def unhandled[T]: Behavior[T] = Behavior.unhandled
|
||||
def unhandled[T]: Behavior[T] = BehaviorImpl.unhandled
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing to signal that this actor
|
||||
|
|
@ -54,7 +54,7 @@ object Behaviors {
|
|||
* current behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
def stopped[T]: Behavior[T] = Behavior.stopped
|
||||
def stopped[T]: Behavior[T] = BehaviorImpl.stopped
|
||||
|
||||
/**
|
||||
* Return this behavior from message processing to signal that this actor
|
||||
|
|
@ -65,17 +65,17 @@ object Behaviors {
|
|||
* current behavior and then the provided `postStop` callback will be invoked.
|
||||
* All other messages and signals will effectively be ignored.
|
||||
*/
|
||||
def stopped[T](postStop: () => Unit): Behavior[T] = Behavior.stopped(postStop)
|
||||
def stopped[T](postStop: () => Unit): Behavior[T] = BehaviorImpl.stopped(postStop)
|
||||
|
||||
/**
|
||||
* A behavior that treats every incoming message as unhandled.
|
||||
*/
|
||||
def empty[T]: Behavior[T] = Behavior.empty
|
||||
def empty[T]: Behavior[T] = BehaviorImpl.empty
|
||||
|
||||
/**
|
||||
* A behavior that ignores every incoming message and returns “same”.
|
||||
*/
|
||||
def ignore[T]: Behavior[T] = Behavior.ignore
|
||||
def ignore[T]: Behavior[T] = BehaviorImpl.ignore
|
||||
|
||||
/**
|
||||
* Construct an actor behavior that can react to both incoming messages and
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ object ShardingState {
|
|||
} else {
|
||||
replyTo ! CurrentShardRegionState(Set.empty)
|
||||
}
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -346,7 +346,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress)
|
||||
val notInCluster = allAddressesInState.diff(registry.nodes)
|
||||
|
||||
if (notInCluster.isEmpty) Behavior.same
|
||||
if (notInCluster.isEmpty) Behaviors.same
|
||||
else {
|
||||
if (ctx.log.isDebugEnabled)
|
||||
ctx.log.debug(
|
||||
|
|
@ -356,7 +356,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
nodesRemoved(notInCluster)
|
||||
}
|
||||
}
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
|
||||
case PruneTombstonesTick =>
|
||||
val prunedRegistry = registry.pruneTombstones()
|
||||
|
|
|
|||
|
|
@ -6,15 +6,15 @@ package akka.cluster.typed.internal
|
|||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.cluster.typed.internal.receptionist.ClusterReceptionist
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class AkkaClusterTypedSerializerSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
|
||||
val ref = spawn(Behavior.empty[String])
|
||||
val ref = spawn(Behaviors.empty[String])
|
||||
val untypedSystem = system.toUntyped
|
||||
val serializer = new AkkaClusterTypedSerializer(untypedSystem.asInstanceOf[ExtendedActorSystem])
|
||||
|
||||
|
|
|
|||
|
|
@ -324,6 +324,8 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
|
|||
prefer `Behaviors.withTimers`.
|
||||
* `TimerScheduler.startPeriodicTimer`, replaced by `startTimerWithFixedDelay` or `startTimerAtFixedRate`
|
||||
* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees.
|
||||
* Removed `Behavior.same`, `Behavior.unhandled`, `Behavior.stopped`, `Behavior.empty`, and `Behavior.ignore` since
|
||||
they were redundant with corresponding @scala[scaladsl.Behaviors.x]@java[javadsl.Behaviors.x].
|
||||
|
||||
#### Akka Typed Stream API changes
|
||||
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
}
|
||||
} else {
|
||||
// snapshot timeout, but we're already in the events recovery phase
|
||||
Behavior.unhandled
|
||||
Behaviors.unhandled
|
||||
}
|
||||
|
||||
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
|
|||
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
|
||||
// during recovery, stash all incoming commands
|
||||
stashInternal(cmd)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import java.util.Optional
|
|||
import akka.actor.typed
|
||||
import akka.actor.typed.BackoffSupervisorStrategy
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
|
||||
import akka.actor.typed.javadsl.ActorContext
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.typed.EventAdapter
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import scala.annotation.tailrec
|
|||
|
||||
import akka.actor.typed.BackoffSupervisorStrategy
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.internal.InterceptorImpl
|
||||
import akka.actor.typed.internal.LoggerClass
|
||||
|
|
|
|||
|
|
@ -200,7 +200,7 @@ class RecoveryPermitterSpec extends ScalaTestWithActorTestKit(s"""
|
|||
case (_, StopActor) =>
|
||||
stopProbe.ref ! persistentActor
|
||||
ctx.stop(persistentActor)
|
||||
Behavior.same
|
||||
Behaviors.same
|
||||
case (_, message) =>
|
||||
persistentActor ! message
|
||||
Behaviors.same
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue