Complete adapter API for coexistence of typed and untyped actors, #22174

* Trying out what is working and what is missing with a test
* Add missing API and cleanup (public vs internal)
* Note that PropsAdapter will make it possible to use Cluster Sharding
  with typed entity actors
* add javadsl for the adapters, and full java tests for that
* Add narrow to ActorRef
This commit is contained in:
Patrik Nordwall 2017-04-05 08:08:13 +02:00
parent 6bbd658817
commit ebb5748d6a
25 changed files with 1110 additions and 195 deletions

View file

@ -702,7 +702,8 @@ private[akka] class ActorSystemImpl(
def actorOf(props: Props, name: String): ActorRef =
if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false)
else throw new UnsupportedOperationException("cannot create top-level actor from the outside on ActorSystem with custom user guardian")
else throw new UnsupportedOperationException(
s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")
def actorOf(props: Props): ActorRef =
if (guardianProps.isEmpty) guardian.underlying.attachChild(props, systemService = false)

View file

@ -121,15 +121,14 @@ class IntroSpec extends TypedSpec {
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Stateful(
behavior = (_, _) Unhandled,
signal = { (ctx, sig)
onMessage = (_, _) Unhandled,
onSignal = (ctx, sig)
sig match {
case Terminated(ref)
Stopped
case _
Unhandled
}
}
)
}

View file

@ -0,0 +1,339 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.javadsl;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorSystem;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.Signal;
import akka.typed.Terminated;
import akka.testkit.javadsl.TestKit;
import akka.actor.SupervisorStrategy;
import static akka.typed.javadsl.Actor.*;
public class AdapterTest extends JUnitSuite {
static akka.actor.Props untyped1() {
return akka.actor.Props.create(Untyped1.class, () -> new Untyped1());
}
static class Untyped1 extends akka.actor.AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("ping", s -> getSender().tell("pong", getSelf()))
.match(ThrowIt.class, t -> {
throw t;
})
.build();
}
}
static class Typed1 {
private final akka.actor.ActorRef ref;
private final akka.actor.ActorRef probe;
private Typed1(akka.actor.ActorRef ref, akka.actor.ActorRef probe) {
this.ref = ref;
this.probe = probe;
}
static Behavior<String> create(akka.actor.ActorRef ref, akka.actor.ActorRef probe) {
Typed1 logic = new Typed1(ref, probe);
return stateful(
(ctx, msg) -> logic.onMessage(ctx, msg),
(ctx, sig) -> logic.onSignal(ctx, sig));
}
Behavior<String> onMessage(ActorContext<String> ctx, String msg) {
if (msg.equals("send")) {
akka.actor.ActorRef replyTo = Adapter.toUntyped(ctx.getSelf());
ref.tell("ping", replyTo);
return same();
} else if (msg.equals("pong")) {
probe.tell("ok", akka.actor.ActorRef.noSender());
return same();
} else if (msg.equals("actorOf")) {
akka.actor.ActorRef child = Adapter.actorOf(ctx, untyped1());
child.tell("ping", Adapter.toUntyped(ctx.getSelf()));
return same();
} else if (msg.equals("watch")) {
Adapter.watch(ctx, ref);
return same();
} else if (msg.equals("supervise-stop")) {
akka.actor.ActorRef child = Adapter.actorOf(ctx, untyped1());
Adapter.watch(ctx, child);
child.tell(new ThrowIt3(), Adapter.toUntyped(ctx.getSelf()));
child.tell("ping", Adapter.toUntyped(ctx.getSelf()));
return same();
} else if (msg.equals("stop-child")) {
akka.actor.ActorRef child = Adapter.actorOf(ctx, untyped1());
Adapter.watch(ctx, child);
Adapter.stop(ctx, child);
return same();
} else {
return unhandled();
}
}
Behavior<String> onSignal(ActorContext<String> ctx, Signal sig) {
if (sig instanceof Terminated) {
probe.tell("terminated", akka.actor.ActorRef.noSender());
return same();
} else {
return unhandled();
}
}
}
static interface Typed2Msg {};
static final class Ping implements Typed2Msg {
public final ActorRef<String> replyTo;
public Ping(ActorRef<String> replyTo) {
this.replyTo = replyTo;
}
}
static final class StopIt implements Typed2Msg {}
static abstract class ThrowIt extends RuntimeException implements Typed2Msg {}
static class ThrowIt1 extends ThrowIt {}
static class ThrowIt2 extends ThrowIt {}
static class ThrowIt3 extends ThrowIt {}
static akka.actor.Props untyped2(ActorRef<Ping> ref, akka.actor.ActorRef probe) {
return akka.actor.Props.create(Untyped2.class, () -> new Untyped2(ref, probe));
}
static class Untyped2 extends akka.actor.AbstractActor {
private final ActorRef<Ping> ref;
private final akka.actor.ActorRef probe;
private final SupervisorStrategy strategy;
Untyped2(ActorRef<Ping> ref, akka.actor.ActorRef probe) {
this.ref = ref;
this.probe = probe;
this.strategy = strategy();
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("send", s -> {
ActorRef<String> replyTo = Adapter.toTyped(getSelf());
ref.tell(new Ping(replyTo));
})
.matchEquals("pong", s -> probe.tell("ok", getSelf()))
.matchEquals("spawn", s -> {
ActorRef<Typed2Msg> child = Adapter.spawnAnonymous(getContext(), typed2());
child.tell(new Ping(Adapter.toTyped(getSelf())));
})
.matchEquals("actorOf-props", s -> {
// this is how Cluster Sharding can be used
akka.actor.ActorRef child = getContext().actorOf(typed2Props());
child.tell(new Ping(Adapter.toTyped(getSelf())), akka.actor.ActorRef.noSender());
})
.matchEquals("watch", s -> Adapter.watch(getContext(), ref))
.match(akka.actor.Terminated.class, t -> probe.tell("terminated", getSelf()))
.matchEquals("supervise-stop", s -> testSupervice(new ThrowIt1()))
.matchEquals("supervise-resume", s -> testSupervice(new ThrowIt2()))
.matchEquals("supervise-restart", s -> testSupervice(new ThrowIt3()))
.matchEquals("stop-child", s -> {
ActorRef<Typed2Msg> child = Adapter.spawnAnonymous(getContext(), typed2());
Adapter.watch(getContext(), child);
Adapter.stop(getContext(), child);
})
.build();
}
private void testSupervice(ThrowIt t) {
ActorRef<Typed2Msg> child = Adapter.spawnAnonymous(getContext(), typed2());
Adapter.watch(getContext(), child);
child.tell(t);
child.tell(new Ping(Adapter.toTyped(getSelf())));
}
private SupervisorStrategy strategy() {
return new akka.actor.OneForOneStrategy(false, akka.japi.pf.DeciderBuilder
.match(ThrowIt1.class, e -> {
probe.tell("thrown-stop", getSelf());
return SupervisorStrategy.stop();
})
.match(ThrowIt2.class, e -> {
probe.tell("thrown-resume", getSelf());
return SupervisorStrategy.resume();
})
.match(ThrowIt3.class, e -> {
probe.tell("thrown-restart", getSelf());
// TODO Restart will not really restart the behavior
return SupervisorStrategy.restart();
})
.build());
}
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
}
static Behavior<Typed2Msg> typed2() {
return Actor.stateful((ctx, msg) -> {
if (msg instanceof Ping) {
ActorRef<String> replyTo = ((Ping) msg).replyTo;
replyTo.tell("pong");
return same();
} else if (msg instanceof StopIt) {
return stopped();
} else if (msg instanceof ThrowIt) {
throw (ThrowIt) msg;
} else {
return unhandled();
}
});
}
static akka.actor.Props typed2Props() {
return Adapter.props(() -> typed2());
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorSelectionTest",
AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
@Test
public void shouldSendMessageFromTypedToUntyped() {
TestKit probe = new TestKit(system);
akka.actor.ActorRef untypedRef = system.actorOf(untyped1());
ActorRef<String> typedRef = Adapter.spawnAnonymous(system, Typed1.create(untypedRef, probe.getRef()));
typedRef.tell("send");
probe.expectMsg("ok");
}
@Test
public void shouldSendMessageFromUntypedToTyped() {
TestKit probe = new TestKit(system);
ActorRef<Ping> typedRef = Adapter.spawnAnonymous(system, typed2()).narrow();
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(typedRef, probe.getRef()));
untypedRef.tell("send", akka.actor.ActorRef.noSender());
probe.expectMsg("ok");
}
@Test
public void shouldSpawnTypedChildFromUntypedParent() {
TestKit probe = new TestKit(system);
ActorRef<Ping> ignore = Adapter.spawnAnonymous(system, ignore());
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef()));
untypedRef.tell("spawn", akka.actor.ActorRef.noSender());
probe.expectMsg("ok");
}
@Test
public void shouldActorOfTypedChildViaPropsFromUntypedParent() {
TestKit probe = new TestKit(system);
ActorRef<Ping> ignore = Adapter.spawnAnonymous(system, ignore());
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef()));
untypedRef.tell("actorOf-props", akka.actor.ActorRef.noSender());
probe.expectMsg("ok");
}
@Test
public void shouldActorOfUntypedChildFromTypedParent() {
TestKit probe = new TestKit(system);
akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty());
ActorRef<String> typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef()));
typedRef.tell("actorOf");
probe.expectMsg("ok");
}
@Test
public void shouldWatchTypedFromUntyped() {
TestKit probe = new TestKit(system);
ActorRef<Typed2Msg> typedRef = Adapter.spawnAnonymous(system, typed2());
ActorRef<Ping> typedRef2 = typedRef.narrow();
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(typedRef2, probe.getRef()));
untypedRef.tell("watch", akka.actor.ActorRef.noSender());
typedRef.tell(new StopIt());
probe.expectMsg("terminated");
}
@Test
public void shouldWatchUntypedFromTyped() {
TestKit probe = new TestKit(system);
akka.actor.ActorRef untypedRef = system.actorOf(untyped1());
ActorRef<String> typedRef = Adapter.spawnAnonymous(system, Typed1.create(untypedRef, probe.getRef()));
typedRef.tell("watch");
untypedRef.tell(akka.actor.PoisonPill.getInstance() , akka.actor.ActorRef.noSender());
probe.expectMsg("terminated");
}
@Test
public void shouldSuperviseTypedChildFromUntypedParent() {
TestKit probe = new TestKit(system);
ActorRef<Ping> ignore = Adapter.spawnAnonymous(system, ignore());
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef()));
untypedRef.tell("supervise-stop", akka.actor.ActorRef.noSender());
probe.expectMsg("thrown-stop");
// ping => ok should not get through here
probe.expectMsg("terminated");
untypedRef.tell("supervise-resume", akka.actor.ActorRef.noSender());
probe.expectMsg("thrown-resume");
probe.expectMsg("ok");
untypedRef.tell("supervise-restart", akka.actor.ActorRef.noSender());
probe.expectMsg("thrown-restart");
probe.expectMsg("ok");
}
@Test
public void shouldSuperviseUntypedChildFromTypedParent() {
TestKit probe = new TestKit(system);
akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty());
ActorRef<String> typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef()));
int originalLogLevel = system.eventStream().logLevel();
try {
// supress the logging with stack trace
system.eventStream().setLogLevel(Integer.MIN_VALUE); // OFF
// only stop supervisorStrategy
typedRef.tell("supervise-stop");
probe.expectMsg("terminated");
} finally {
system.eventStream().setLogLevel(originalLogLevel);
}
probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); // no pong
}
@Test
public void shouldStopTypedChildFromUntypedParent() {
TestKit probe = new TestKit(system);
ActorRef<Ping> ignore = Adapter.spawnAnonymous(system, ignore());
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef()));
untypedRef.tell("stop-child", akka.actor.ActorRef.noSender());
probe.expectMsg("terminated");
}
@Test
public void shouldStopUntypedChildFromTypedParent() {
TestKit probe = new TestKit(system);
akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty());
ActorRef<String> typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef()));
typedRef.tell("stop-child");
probe.expectMsg("terminated");
}
}

View file

@ -65,8 +65,9 @@ class AskSpec extends TypedSpec with ScalaFutures {
/** See issue #19947 (MatchError with adapted ActorRef) */
def `must fail the future if the actor doesn't exist`(): Unit = {
val noSuchActor: ActorRef[Msg] = system match {
case adaptedSys: adapter.ActorSystemAdapter[_]
adapter.actorRefAdapter(adaptedSys.untyped.provider.resolveActorRef("/foo/bar"))
case adaptedSys: akka.typed.internal.adapter.ActorSystemAdapter[_]
import akka.typed.scaladsl.adapter._
adaptedSys.untyped.provider.resolveActorRef("/foo/bar")
case _
fail("this test must only run in an adapted actor system")
}

View file

@ -39,7 +39,7 @@ class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with S
/**
* Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter.
*/
class TypedSpec(val config: Config) extends TypedSpecSetup {
abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
import TypedSpec._
import AskPattern._
@ -48,8 +48,18 @@ class TypedSpec(val config: Config) extends TypedSpecSetup {
// extension point
def setTimeout: Timeout = Timeout(1.minute)
lazy val nativeSystem = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
lazy val adaptedSystem = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
private var nativeSystemUsed = false
lazy val nativeSystem: ActorSystem[TypedSpec.Command] = {
val sys = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
nativeSystemUsed = true
sys
}
private var adaptedSystemUsed = false
lazy val adaptedSystem: ActorSystem[TypedSpec.Command] = {
val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
adaptedSystemUsed = false
sys
}
trait NativeSystem {
def system = nativeSystem
@ -63,8 +73,10 @@ class TypedSpec(val config: Config) extends TypedSpecSetup {
implicit def scheduler = nativeSystem.scheduler
override def afterAll(): Unit = {
Await.result(nativeSystem ? (Terminate(_)), timeout.duration): Status
Await.result(adaptedSystem ? (Terminate(_)), timeout.duration): Status
if (nativeSystemUsed)
Await.result(nativeSystem ? (Terminate(_)), timeout.duration): Status
if (adaptedSystemUsed)
Await.result(adaptedSystem ? (Terminate(_)), timeout.duration): Status
}
// TODO remove after basing on ScalaTest 3 with async support
@ -197,8 +209,7 @@ class TypedSpecSpec extends TypedSpec {
sync(runTest("failure")(StepWise[String]((ctx, startWith)
startWith {
fail("expected")
}
)))
})))
}
}
}

View file

@ -0,0 +1,255 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.scaladsl.adapter
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.{ actor untyped }
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.Terminated
import akka.typed.scaladsl.Actor._
import akka.{ actor untyped }
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.testkit._
object AdapterSpec {
val untyped1: untyped.Props = untyped.Props(new Untyped1)
class Untyped1 extends untyped.Actor {
def receive = {
case "ping" sender() ! "pong"
case t: ThrowIt throw t
}
}
def typed1(ref: untyped.ActorRef, probe: ActorRef[String]): Behavior[String] =
Stateful(
onMessage = (ctx, msg)
msg match {
case "send"
val replyTo = ctx.self.toUntyped
ref.tell("ping", replyTo)
Same
case "pong"
probe ! "ok"
Same
case "actorOf"
val child = ctx.actorOf(untyped1)
child.tell("ping", ctx.self.toUntyped)
Same
case "watch"
ctx.watch(ref)
Same
case "supervise-stop"
val child = ctx.actorOf(untyped1)
ctx.watch(child)
child ! ThrowIt3
child.tell("ping", ctx.self.toUntyped)
Same
case "stop-child"
val child = ctx.actorOf(untyped1)
ctx.watch(child)
ctx.stop(child)
Same
},
onSignal = (ctx, sig) sig match {
case Terminated(ref)
probe ! "terminated"
Same
case _ Unhandled
})
sealed trait Typed2Msg
final case class Ping(replyTo: ActorRef[String]) extends Typed2Msg
case object StopIt extends Typed2Msg
sealed trait ThrowIt extends RuntimeException with Typed2Msg with NoStackTrace
case object ThrowIt1 extends ThrowIt
case object ThrowIt2 extends ThrowIt
case object ThrowIt3 extends ThrowIt
def untyped2(ref: ActorRef[Ping], probe: ActorRef[String]): untyped.Props =
untyped.Props(new Untyped2(ref, probe))
class Untyped2(ref: ActorRef[Ping], probe: ActorRef[String]) extends untyped.Actor {
override val supervisorStrategy = untyped.OneForOneStrategy() {
({
case ThrowIt1
probe ! "thrown-stop"
untyped.SupervisorStrategy.Stop
case ThrowIt2
probe ! "thrown-resume"
untyped.SupervisorStrategy.Resume
case ThrowIt3
probe ! "thrown-restart"
// TODO Restart will not really restart the behavior
untyped.SupervisorStrategy.Restart
}: untyped.SupervisorStrategy.Decider).orElse(untyped.SupervisorStrategy.defaultDecider)
}
def receive = {
case "send" ref ! Ping(self) // implicit conversion
case "pong" probe ! "ok"
case "spawn"
val child = context.spawnAnonymous(typed2)
child ! Ping(self)
case "actorOf-props"
// this is how Cluster Sharding can be used
val child = context.actorOf(typed2Props)
child ! Ping(self)
case "watch"
context.watch(ref)
case untyped.Terminated(_)
probe ! "terminated"
case "supervise-stop"
testSupervice(ThrowIt1)
case "supervise-resume"
testSupervice(ThrowIt2)
case "supervise-restart"
testSupervice(ThrowIt3)
case "stop-child"
val child = context.spawnAnonymous(typed2)
context.watch(child)
context.stop(child)
}
private def testSupervice(t: ThrowIt): Unit = {
val child = context.spawnAnonymous(typed2)
context.watch(child)
child ! t
child ! Ping(self)
}
}
def typed2: Behavior[Typed2Msg] =
Stateful { (ctx, msg)
msg match {
case Ping(replyTo)
replyTo ! "pong"
Same
case StopIt
Stopped
case t: ThrowIt
throw t
}
}
def typed2Props: untyped.Props = PropsAdapter(typed2)
}
class AdapterSpec extends AkkaSpec {
import AdapterSpec._
"Adapted actors" must {
"send message from typed to untyped" in {
val probe = TestProbe()
val untypedRef = system.actorOf(untyped1)
val typedRef = system.spawnAnonymous(typed1(untypedRef, probe.ref))
typedRef ! "send"
probe.expectMsg("ok")
}
"send message from untyped to typed" in {
val probe = TestProbe()
val typedRef = system.spawnAnonymous(typed2)
val untypedRef = system.actorOf(untyped2(typedRef, probe.ref))
untypedRef ! "send"
probe.expectMsg("ok")
}
"spawn typed child from untyped parent" in {
val probe = TestProbe()
val ignore = system.spawnAnonymous(Ignore[Ping])
val untypedRef = system.actorOf(untyped2(ignore, probe.ref))
untypedRef ! "spawn"
probe.expectMsg("ok")
}
"actorOf typed child via Props from untyped parent" in {
val probe = TestProbe()
val ignore = system.spawnAnonymous(Ignore[Ping])
val untypedRef = system.actorOf(untyped2(ignore, probe.ref))
untypedRef ! "actorOf-props"
probe.expectMsg("ok")
}
"actorOf untyped child from typed parent" in {
val probe = TestProbe()
val ignore = system.actorOf(untyped.Props.empty)
val typedRef = system.spawnAnonymous(typed1(ignore, probe.ref))
typedRef ! "actorOf"
probe.expectMsg("ok")
}
"watch typed from untyped" in {
val probe = TestProbe()
val typedRef = system.spawnAnonymous(typed2)
val untypedRef = system.actorOf(untyped2(typedRef, probe.ref))
untypedRef ! "watch"
typedRef ! StopIt
probe.expectMsg("terminated")
}
"watch untyped from typed" in {
val probe = TestProbe()
val untypedRef = system.actorOf(untyped1)
val typedRef = system.spawnAnonymous(typed1(untypedRef, probe.ref))
typedRef ! "watch"
untypedRef ! untyped.PoisonPill
probe.expectMsg("terminated")
}
"supervise typed child from untyped parent" in {
val probe = TestProbe()
val ignore = system.spawnAnonymous(Ignore[Ping])
val untypedRef = system.actorOf(untyped2(ignore, probe.ref))
untypedRef ! "supervise-stop"
probe.expectMsg("thrown-stop")
// ping => ok should not get through here
probe.expectMsg("terminated")
untypedRef ! "supervise-resume"
probe.expectMsg("thrown-resume")
probe.expectMsg("ok")
untypedRef ! "supervise-restart"
probe.expectMsg("thrown-restart")
probe.expectMsg("ok")
}
"supervise untyped child from typed parent" in {
val probe = TestProbe()
val ignore = system.actorOf(untyped.Props.empty)
val typedRef = system.spawnAnonymous(typed1(ignore, probe.ref))
// only stop supervisorStrategy
typedRef ! "supervise-stop"
probe.expectMsg("terminated")
probe.expectNoMsg(100.millis) // no pong
}
"stop typed child from untyped parent" in {
val probe = TestProbe()
val ignore = system.spawnAnonymous(Ignore[Ping])
val untypedRef = system.actorOf(untyped2(ignore, probe.ref))
untypedRef ! "stop-child"
probe.expectMsg("terminated")
}
"stop untyped child from typed parent" in {
val probe = TestProbe()
val ignore = system.actorOf(untyped.Props.empty)
val typedRef = system.spawnAnonymous(typed1(ignore, probe.ref))
typedRef ! "stop-child"
probe.expectMsg("terminated")
}
}
}

View file

@ -17,6 +17,9 @@ import akka.annotation.ApiMayChange
@DoNotInherit
@ApiMayChange
trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext[T] {
// FIXME can we simplify this weird hierarchy of contexts, e.g. problem with createAdapter
override def getChild(name: String): Optional[ActorRef[Void]] =
child(name) match {
case Some(c) Optional.of(c.upcast[Void])

View file

@ -31,6 +31,11 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act
*/
def !(msg: T): Unit = tell(msg)
/**
* Narrow the type of this `ActorRef, which is always a safe operation.
*/
final def narrow[U <: T]: ActorRef[U] = this.asInstanceOf[ActorRef[U]]
/**
* Unsafe utility method for widening the type accepted by this ActorRef;
* provided to avoid having to use `asInstanceOf` on the full reference type,
@ -73,7 +78,8 @@ object ActorRef {
* Create an ActorRef from a Future, buffering up to the given number of
* messages in while the Future is not fulfilled.
*/
def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = new internal.FutureRef(FuturePath, bufferSize, f)
def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] =
new internal.FutureRef(FuturePath, bufferSize, f)
/**
* Create an ActorRef by providing a function that is invoked for sending

View file

@ -11,7 +11,7 @@ import akka.actor.setup.ActorSystemSetup
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.typed.adapter.{ ActorSystemAdapter, PropsAdapter }
import akka.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter }
import akka.util.Timeout
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
@ -180,10 +180,17 @@ object ActorSystem {
classLoader: Option[ClassLoader] = None,
executionContext: Option[ExecutionContext] = None,
actorSystemSettings: ActorSystemSetup = ActorSystemSetup.empty): ActorSystem[T] = {
// TODO I'm not sure how useful this mode is for end-users. It has the limitation that untyped top level
// actors can't be created, because we have a custom user guardian. I would imagine that if you have
// a system of both untyped and typed actors (e.g. adding some typed actors to an existing application)
// you would start an untyped.ActorSystem and spawn typed actors from that system or from untyped actors.
Behavior.validateAsInitial(guardianBehavior)
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(guardianBehavior, guardianDeployment)), actorSystemSettings)
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext,
Some(PropsAdapter(() guardianBehavior, guardianDeployment)), actorSystemSettings)
untyped.start()
new ActorSystemAdapter(untyped)
}

View file

@ -1,61 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package adapter
import akka.{ actor a }
import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor
/**
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]].
*/
private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorContext[T] {
override def self = ActorRefAdapter(ctx.self)
override val system = ActorSystemAdapter(ctx.system)
override def mailboxCapacity = 1 << 29 // FIXME
override def children = ctx.children.map(ActorRefAdapter(_))
override def child(name: String) = ctx.child(name).map(ActorRefAdapter(_))
override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig) =
ctx.spawnAnonymous(behavior, deployment)
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) =
ctx.spawn(behavior, name, deployment)
override def stop(child: ActorRef[_]) =
toUntyped(child) match {
case f: akka.actor.FunctionRef
val cell = ctx.asInstanceOf[akka.actor.ActorCell]
cell.removeFunctionRef(f)
case untyped
ctx.child(child.path.name) match {
case Some(`untyped`)
ctx.stop(untyped)
true
case _
false // none of our business
}
}
override def watch(other: ActorRef[_]) = ctx.watch(toUntyped(other))
override def unwatch(other: ActorRef[_]) = ctx.unwatch(toUntyped(other))
var receiveTimeoutMsg: T = null.asInstanceOf[T]
override def setReceiveTimeout(d: FiniteDuration, msg: T) = {
receiveTimeoutMsg = msg
ctx.setReceiveTimeout(d)
}
override def cancelReceiveTimeout(): Unit = {
receiveTimeoutMsg = null.asInstanceOf[T]
ctx.setReceiveTimeout(Duration.Undefined)
}
override def executionContext: ExecutionContextExecutor = ctx.dispatcher
override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): a.Cancellable = {
import ctx.dispatcher
ctx.system.scheduler.scheduleOnce(delay, toUntyped(target), msg)
}
override def spawnAdapter[U](f: U T, name: String = ""): ActorRef[U] = {
val cell = ctx.asInstanceOf[akka.actor.ActorCell]
val ref = cell.addFunctionRef((_, msg) ctx.self ! f(msg.asInstanceOf[U]), name)
ActorRefAdapter[U](ref)
}
}

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package adapter
import akka.{ actor a }
private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef)
extends ActorRef[T](untyped.path) with internal.ActorRefImpl[T] {
override def tell(msg: T): Unit = untyped ! msg
override def isLocal: Boolean = true
override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped, signal)
}
private[typed] object ActorRefAdapter {
def apply[T](untyped: a.ActorRef): ActorRef[T] = new ActorRefAdapter(untyped.asInstanceOf[a.InternalActorRef])
}

View file

@ -1,24 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package adapter
import akka.{ actor a }
private[typed] object PropsAdapter {
// FIXME dispatcher and queue size
def apply(b: Behavior[_], deploy: DeploymentConfig): a.Props = new a.Props(a.Deploy(), classOf[ActorAdapter[_]], (b: AnyRef) :: Nil)
def apply[T](p: a.Props): Behavior[T] = {
assert(p.clazz == classOf[ActorAdapter[_]], "typed.Actor must have typed.Props")
p.args match {
case (initial: Behavior[_]) :: Nil
// FIXME queue size
initial.asInstanceOf[Behavior[T]]
case _ throw new AssertionError("typed.Actor args must be right")
}
}
}

View file

@ -1,43 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package object adapter {
import language.implicitConversions
import akka.dispatch.sysmsg
implicit class ActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal {
def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment)))
def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name))
}
implicit class ActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal {
def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment)))
def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name))
}
implicit def actorRefAdapter(ref: akka.actor.ActorRef): ActorRef[Any] = ActorRefAdapter(ref)
private[adapter] def toUntyped[U](ref: ActorRef[U]): akka.actor.InternalActorRef =
ref match {
case adapter: ActorRefAdapter[_] adapter.untyped
case _ throw new UnsupportedOperationException(s"only adapted untyped ActorRefs permissible ($ref of class ${ref.getClass})")
}
private[adapter] def sendSystemMessage(untyped: akka.actor.InternalActorRef, signal: internal.SystemMessage): Unit =
signal match {
case internal.Create() throw new IllegalStateException("WAT? No, seriously.")
case internal.Terminate() untyped.stop()
case internal.Watch(watchee, watcher) untyped.sendSystemMessage(sysmsg.Watch(toUntyped(watchee), toUntyped(watcher)))
case internal.Unwatch(watchee, watcher) untyped.sendSystemMessage(sysmsg.Unwatch(toUntyped(watchee), toUntyped(watcher)))
case internal.DeathWatchNotification(ref, cause) untyped.sendSystemMessage(sysmsg.DeathWatchNotification(toUntyped(ref), true, false))
case internal.NoMessage // just to suppress the warning
}
}

View file

@ -162,7 +162,7 @@ private[typed] class ActorSystemImpl[-T](
*/
private object eventStreamStub extends e.EventStream(null, false) {
override def subscribe(ref: a.ActorRef, ch: Class[_]): Boolean =
throw new UnsupportedOperationException("cannot use this eventstream for subscribing")
throw new UnsupportedOperationException("Cannot use this eventstream for subscribing")
override def publish(event: AnyRef): Unit = eventStream.publish(event)
}
/**
@ -190,7 +190,8 @@ private[typed] class ActorSystemImpl[-T](
private val terminateTriggered = new AtomicBoolean
private val theOneWhoWalksTheBubblesOfSpaceTime: ActorRefImpl[Nothing] =
new ActorRef[Nothing](rootPath) with ActorRefImpl[Nothing] {
override def tell(msg: Nothing): Unit = throw new UnsupportedOperationException("cannot send to theOneWhoWalksTheBubblesOfSpaceTime")
override def tell(msg: Nothing): Unit =
throw new UnsupportedOperationException("Cannot send to theOneWhoWalksTheBubblesOfSpaceTime")
override def sendSystem(signal: SystemMessage): Unit = signal match {
case Terminate()
if (terminateTriggered.compareAndSet(false, true))

View file

@ -2,12 +2,18 @@
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package internal
package adapter
import akka.{ actor a }
import akka.annotation.InternalApi
private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor {
/**
* INTERNAL API
*/
@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor {
import Behavior._
import ActorRefAdapter.toUntyped
var behavior: Behavior[T] = _initialBehavior

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package internal
package adapter
import akka.{ actor a }
import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor
import akka.annotation.InternalApi
/**
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]].
*/
@InternalApi private[typed] class ActorContextAdapter[T](val untyped: a.ActorContext) extends ActorContext[T] {
import ActorRefAdapter.sendSystemMessage
import ActorRefAdapter.toUntyped
override def self = ActorRefAdapter(untyped.self)
override val system = ActorSystemAdapter(untyped.system)
override def mailboxCapacity = 1 << 29 // FIXME
override def children = untyped.children.map(ActorRefAdapter(_))
override def child(name: String) = untyped.child(name).map(ActorRefAdapter(_))
override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig) =
ActorContextAdapter.spawnAnonymous(untyped, behavior, deployment)
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) =
ActorContextAdapter.spawn(untyped, behavior, name, deployment)
override def stop(child: ActorRef[_]) =
toUntyped(child) match {
case f: akka.actor.FunctionRef
val cell = untyped.asInstanceOf[akka.actor.ActorCell]
cell.removeFunctionRef(f)
case c
untyped.child(child.path.name) match {
case Some(`c`)
untyped.stop(c)
true
case _
false // none of our business
}
}
override def watch(other: ActorRef[_]) = { untyped.watch(toUntyped(other)) }
override def unwatch(other: ActorRef[_]) = { untyped.unwatch(toUntyped(other)) }
var receiveTimeoutMsg: T = null.asInstanceOf[T]
override def setReceiveTimeout(d: FiniteDuration, msg: T) = {
receiveTimeoutMsg = msg
untyped.setReceiveTimeout(d)
}
override def cancelReceiveTimeout(): Unit = {
receiveTimeoutMsg = null.asInstanceOf[T]
untyped.setReceiveTimeout(Duration.Undefined)
}
override def executionContext: ExecutionContextExecutor = untyped.dispatcher
override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): a.Cancellable = {
import untyped.dispatcher
untyped.system.scheduler.scheduleOnce(delay, toUntyped(target), msg)
}
override def spawnAdapter[U](f: U T, name: String = ""): ActorRef[U] = {
val cell = untyped.asInstanceOf[akka.actor.ActorCell]
val ref = cell.addFunctionRef((_, msg) untyped.self ! f(msg.asInstanceOf[U]), name)
ActorRefAdapter[U](ref)
}
}
/**
* INTERNAL API
*/
@InternalApi private[typed] object ActorContextAdapter {
def toUntyped[U](ctx: ActorContext[_]): a.ActorContext =
ctx match {
case adapter: ActorContextAdapter[_] adapter.untyped
case _
throw new UnsupportedOperationException("only adapted untyped ActorContext permissible " +
s"($ctx of class ${ctx.getClass.getName})")
}
def toUntyped[U](ctx: scaladsl.ActorContext[_]): a.ActorContext =
ctx match {
case c: ActorContext[_] toUntyped(c)
case _
throw new UnsupportedOperationException("unknown ActorContext type " +
s"($ctx of class ${ctx.getClass.getName})")
}
def toUntyped[U](ctx: javadsl.ActorContext[_]): a.ActorContext =
ctx match {
case c: ActorContext[_] toUntyped(c)
case _
throw new UnsupportedOperationException("unknown ActorContext type " +
s"($ctx of class ${ctx.getClass.getName})")
}
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], deployment: DeploymentConfig): ActorRef[T] = {
Behavior.validateAsInitial(behavior)
ActorRefAdapter(ctx.actorOf(PropsAdapter(() behavior, deployment)))
}
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, deployment: DeploymentConfig): ActorRef[T] = {
Behavior.validateAsInitial(behavior)
ActorRefAdapter(ctx.actorOf(PropsAdapter(() behavior, deployment), name))
}
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package internal
package adapter
import akka.{ actor a }
import akka.annotation.InternalApi
import akka.dispatch.sysmsg
/**
* INTERNAL API
*/
@InternalApi private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef)
extends ActorRef[T](untyped.path) with internal.ActorRefImpl[T] {
override def tell(msg: T): Unit = untyped ! msg
override def isLocal: Boolean = untyped.isLocal
override def sendSystem(signal: internal.SystemMessage): Unit =
ActorRefAdapter.sendSystemMessage(untyped, signal)
}
private[typed] object ActorRefAdapter {
def apply[T](untyped: a.ActorRef): ActorRef[T] = new ActorRefAdapter(untyped.asInstanceOf[a.InternalActorRef])
def toUntyped[U](ref: ActorRef[U]): akka.actor.InternalActorRef =
ref match {
case adapter: ActorRefAdapter[_] adapter.untyped
case _
throw new UnsupportedOperationException("only adapted untyped ActorRefs permissible " +
s"($ref of class ${ref.getClass.getName})")
}
def sendSystemMessage(untyped: akka.actor.InternalActorRef, signal: internal.SystemMessage): Unit =
signal match {
case internal.Create() throw new IllegalStateException("WAT? No, seriously.")
case internal.Terminate() untyped.stop()
case internal.Watch(watchee, watcher) untyped.sendSystemMessage(
sysmsg.Watch(
toUntyped(watchee),
toUntyped(watcher)))
case internal.Unwatch(watchee, watcher) untyped.sendSystemMessage(sysmsg.Unwatch(toUntyped(watchee), toUntyped(watcher)))
case internal.DeathWatchNotification(ref, cause) untyped.sendSystemMessage(sysmsg.DeathWatchNotification(toUntyped(ref), true, false))
case internal.NoMessage // just to suppress the warning
}
}

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package internal
package adapter
import akka.{ actor a, dispatch d }
@ -9,19 +10,21 @@ import akka.dispatch.sysmsg
import scala.concurrent.ExecutionContextExecutor
import akka.util.Timeout
import scala.concurrent.Future
import akka.annotation.InternalApi
/**
* Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context).
* INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context).
* Therefore it does not have a lot of vals, only the whenTerminated Future is cached after
* its transformation because redoing that every time will add extra objects that persist for
* a longer time; in all other cases the wrapper will just be spawned for a single call in
* most circumstances.
*/
private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
@InternalApi private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
extends ActorRef[T](a.RootActorPath(a.Address("akka", untyped.name)) / "user")
with ActorSystem[T] with internal.ActorRefImpl[T] {
import ActorSystemAdapter._
import ActorRefAdapter.sendSystemMessage
// Members declared in akka.typed.ActorRef
override def tell(msg: T): Unit = untyped.guardian ! msg
@ -33,10 +36,12 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
override def dispatchers: Dispatchers = new Dispatchers {
override def lookup(selector: DispatcherSelector): ExecutionContextExecutor =
selector match {
case DispatcherDefault(_) untyped.dispatcher
case DispatcherFromConfig(str, _) untyped.dispatchers.lookup(str)
case DispatcherFromExecutionContext(_, _) throw new UnsupportedOperationException("cannot use DispatcherFromExecutionContext with ActorSystemAdapter")
case DispatcherFromExecutor(_, _) throw new UnsupportedOperationException("cannot use DispatcherFromExecutor with ActorSystemAdapter")
case DispatcherDefault(_) untyped.dispatcher
case DispatcherFromConfig(str, _) untyped.dispatchers.lookup(str)
case DispatcherFromExecutionContext(_, _)
throw new UnsupportedOperationException("Cannot use DispatcherFromExecutionContext with ActorSystemAdapter")
case DispatcherFromExecutor(_, _)
throw new UnsupportedOperationException("Cannot use DispatcherFromExecutor with ActorSystemAdapter")
}
override def shutdown(): Unit = () // there was no shutdown in untyped Akka
}
@ -65,8 +70,8 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
untyped.whenTerminated.map(t Terminated(ActorRefAdapter(t.actor))(null))(sameThreadExecutionContext)
def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]] = {
val ref = untyped.systemActorOf(PropsAdapter(behavior, deployment), name)
Future.successful(ref)
val ref = untyped.systemActorOf(PropsAdapter(() behavior, deployment), name)
Future.successful(ActorRefAdapter(ref))
}
}
@ -74,9 +79,24 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
private[typed] object ActorSystemAdapter {
def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl])
object ReceptionistExtension extends a.ExtensionKey[ReceptionistExtension]
def toUntyped[U](sys: ActorSystem[_]): a.ActorSystem =
sys match {
case adapter: ActorSystemAdapter[_] adapter.untyped
case _ throw new UnsupportedOperationException("only adapted untyped ActorSystem permissible " +
s"($sys of class ${sys.getClass.getName})")
}
object ReceptionistExtension extends a.ExtensionId[ReceptionistExtension] with a.ExtensionIdProvider {
override def get(system: a.ActorSystem): ReceptionistExtension = super.get(system)
override def lookup = ReceptionistExtension
override def createExtension(system: a.ExtendedActorSystem): ReceptionistExtension =
new ReceptionistExtension(system)
}
class ReceptionistExtension(system: a.ExtendedActorSystem) extends a.Extension {
val receptionist: ActorRef[patterns.Receptionist.Command] =
ActorRefAdapter(system.systemActorOf(PropsAdapter(patterns.Receptionist.behavior, EmptyDeploymentConfig), "receptionist"))
ActorRefAdapter(system.systemActorOf(
PropsAdapter(() patterns.Receptionist.behavior, EmptyDeploymentConfig),
"receptionist"))
}
}

View file

@ -2,11 +2,16 @@
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package internal
package adapter
import akka.{ event e }
import akka.annotation.InternalApi
class EventStreamAdapter(untyped: e.EventStream) extends EventStream {
/**
* INTERNAL API
*/
@InternalApi private[typed] class EventStreamAdapter(untyped: e.EventStream) extends EventStream {
def logLevel: e.Logging.LogLevel = untyped.logLevel
def publish[T](event: T): Unit = untyped.publish(event.asInstanceOf[AnyRef])
@ -16,19 +21,22 @@ class EventStreamAdapter(untyped: e.EventStream) extends EventStream {
def subscribe[T](subscriber: ActorRef[T], to: Class[T]): Boolean =
subscriber match {
case adapter: ActorRefAdapter[_] untyped.subscribe(adapter.untyped, to)
case _ throw new UnsupportedOperationException("cannot subscribe native typed ActorRef")
case _
throw new UnsupportedOperationException("Cannot subscribe native typed ActorRef")
}
def unsubscribe[T](subscriber: ActorRef[T]): Unit =
subscriber match {
case adapter: ActorRefAdapter[_] untyped.unsubscribe(adapter.untyped)
case _ throw new UnsupportedOperationException("cannot unsubscribe native typed ActorRef")
case _
throw new UnsupportedOperationException("Cannot unsubscribe native typed ActorRef")
}
def unsubscribe[T](subscriber: ActorRef[T], from: Class[T]): Boolean =
subscriber match {
case adapter: ActorRefAdapter[_] untyped.unsubscribe(adapter.untyped, from)
case _ throw new UnsupportedOperationException("cannot unsubscribe native typed ActorRef")
case _
throw new UnsupportedOperationException("Cannot unsubscribe native typed ActorRef")
}
}

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
package internal
package adapter
import akka.typed.Behavior
import akka.typed.EmptyDeploymentConfig
import akka.typed.DeploymentConfig
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[akka] object PropsAdapter {
def apply[T](behavior: () Behavior[T], deploy: DeploymentConfig = EmptyDeploymentConfig): akka.actor.Props = {
// FIXME use DeploymentConfig, e.g. dispatcher
akka.actor.Props(new ActorAdapter(behavior()))
}
}

View file

@ -0,0 +1,115 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.javadsl
import akka.typed.Behavior
import akka.typed.DeploymentConfig
import akka.typed.EmptyDeploymentConfig
import akka.typed.ActorRef
import akka.typed.internal.adapter.ActorRefAdapter
import akka.typed.scaladsl.adapter._
import akka.typed.ActorSystem
import akka.typed.internal.adapter.ActorContextAdapter
import akka.japi.Creator
/**
* Java API: Adapters between typed and untyped actors and actor systems.
* The underlying `ActorSystem` is the untyped [[akka.actor.ActorSystem]]
* which runs Akka Typed [[akka.typed.Behavior]] on an emulation layer. In this
* system typed and untyped actors can coexist.
*
* These methods make it possible to create typed child actor from untyped
* parent actor, and the opposite untyped child from typed parent.
* `watch` is also supported in both directions.
*
* There are also converters (`toTyped`, `toUntyped`) between untyped
* [[akka.actor.ActorRef]] and typed [[akka.typed.ActorRef]], and between untyped
* [[akka.actor.ActorSystem]] and typed [[akka.typed.ActorSystem]].
*/
object Adapter {
def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T]): ActorRef[T] =
spawnAnonymous(sys, behavior, EmptyDeploymentConfig)
def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], deployment: DeploymentConfig): ActorRef[T] =
sys.spawnAnonymous(behavior, deployment)
def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String): ActorRef[T] =
spawn(sys, behavior, name, EmptyDeploymentConfig)
def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String, deployment: DeploymentConfig): ActorRef[T] =
sys.spawn(behavior, name, deployment)
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T]): ActorRef[T] =
spawnAnonymous(ctx, behavior, EmptyDeploymentConfig)
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], deployment: DeploymentConfig): ActorRef[T] =
ctx.spawnAnonymous(behavior, deployment)
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String): ActorRef[T] =
spawn(ctx, behavior, name, EmptyDeploymentConfig)
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, deployment: DeploymentConfig): ActorRef[T] =
ctx.spawn(behavior, name, deployment)
def toTyped(sys: akka.actor.ActorSystem): ActorSystem[Void] =
sys.toTyped.asInstanceOf[ActorSystem[Void]]
def toUntyped(sys: ActorSystem[_]): akka.actor.ActorSystem =
sys.toUntyped
def watch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit =
ctx.watch(other)
def unwatch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit =
ctx.unwatch(other)
def stop(ctx: akka.actor.ActorContext, child: ActorRef[_]): Unit =
ctx.stop(child)
def watch[U](ctx: ActorContext[_], other: akka.actor.ActorRef): Unit =
ctx.watch(other)
def unwatch[U](ctx: ActorContext[_], other: akka.actor.ActorRef): Unit =
ctx.unwatch(other)
def stop(ctx: ActorContext[_], child: akka.actor.ActorRef): Boolean =
ctx.stop(child)
def actorOf(ctx: ActorContext[_], props: akka.actor.Props): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props)
def actorOf(ctx: ActorContext[_], props: akka.actor.Props, name: String): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props, name)
def toUntyped(ref: ActorRef[_]): akka.actor.ActorRef =
ref.toUntyped
def toTyped[T](ref: akka.actor.ActorRef): ActorRef[T] =
ref
/**
* Wrap [[akka.typed.Behavior]] in an untyped [[akka.actor.Props]], i.e. when
* spawning a typed child actor from an untyped parent actor.
* This is normally not needed because you can use the extension methods
* `spawn` and `spawnAnonymous` with an untyped `ActorContext`, but it's needed
* when using typed actors with an existing library/tool that provides an API that
* takes an untyped [[akka.actor.Props]] parameter. Cluster Sharding is an
* example of that.
*/
def props[T](behavior: Creator[Behavior[T]], deploy: DeploymentConfig): akka.actor.Props =
akka.typed.internal.adapter.PropsAdapter(() behavior.create(), deploy)
/**
* Wrap [[akka.typed.Behavior]] in an untyped [[akka.actor.Props]], i.e. when
* spawning a typed child actor from an untyped parent actor.
* This is normally not needed because you can use the extension methods
* `spawn` and `spawnAnonymous` with an untyped `ActorContext`, but it's needed
* when using typed actors with an existing library/tool that provides an API that
* takes an untyped [[akka.actor.Props]] parameter. Cluster Sharding is an
* example of that.
*/
def props[T](behavior: Creator[Behavior[T]]): akka.actor.Props =
props(behavior, EmptyDeploymentConfig)
}

View file

@ -266,12 +266,12 @@ object Actor {
* results in a new behavior that can potentially be different from this one.
*/
final case class Stateful[T](
behavior: (ActorContext[T], T) Behavior[T],
signal: (ActorContext[T], Signal) Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) Behavior[T]])
onMessage: (ActorContext[T], T) Behavior[T],
onSignal: (ActorContext[T], Signal) Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) Behavior[T]])
extends ExtensibleBehavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T) = behavior(ctx, msg)
override def toString = s"Stateful(${LineNumbers(behavior)})"
override def management(ctx: AC[T], msg: Signal): Behavior[T] = onSignal(ctx, msg)
override def message(ctx: AC[T], msg: T) = onMessage(ctx, msg)
override def toString = s"Stateful(${LineNumbers(onMessage)})"
}
/**
@ -285,13 +285,13 @@ object Actor {
* another one after it has been installed. It is most useful for leaf actors
* that do not create child actors themselves.
*/
final case class Stateless[T](behavior: (ActorContext[T], T) Any) extends ExtensibleBehavior[T] {
final case class Stateless[T](onMessage: (ActorContext[T], T) Any) extends ExtensibleBehavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled
override def message(ctx: AC[T], msg: T): Behavior[T] = {
behavior(ctx, msg)
onMessage(ctx, msg)
this
}
override def toString = s"Static(${LineNumbers(behavior)})"
override def toString = s"Static(${LineNumbers(onMessage)})"
}
/**

View file

@ -13,7 +13,7 @@ import akka.typed.internal.FunctionRef
import akka.actor.RootActorPath
import akka.actor.Address
import akka.typed.ActorRef
import akka.typed.adapter
import akka.typed.internal.{ adapter adapt }
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
@ -38,9 +38,9 @@ object AskPattern {
implicit class Askable[T](val ref: ActorRef[T]) extends AnyVal {
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
ref match {
case a: adapter.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f)
case a: adapter.ActorSystemAdapter[_] askUntyped(ref, a.untyped.guardian, timeout, f)
case _ ask(ref, timeout, scheduler, f)
case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f)
case a: adapt.ActorSystemAdapter[_] askUntyped(ref, a.untyped.guardian, timeout, f)
case _ ask(ref, timeout, scheduler, f)
}
}
@ -50,15 +50,15 @@ object AskPattern {
private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =
if (untyped.isTerminated)
(
adapter.ActorRefAdapter[U](untyped.provider.deadLetters),
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new AskTimeoutException(s"Recipient[$target] had already been terminated.")), null)
else if (timeout.duration.length <= 0)
(
adapter.ActorRefAdapter[U](untyped.provider.deadLetters),
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null)
else {
val a = PromiseActorRef(untyped.provider, timeout, target, "unknown")
val b = adapter.ActorRefAdapter[U](a)
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.scaladsl.adapter
import akka.typed.Behavior
import akka.typed.EmptyDeploymentConfig
import akka.typed.DeploymentConfig
import akka.typed.internal.adapter.ActorAdapter
/**
* Wrap [[akka.typed.Behavior]] in an untyped [[akka.actor.Props]], i.e. when
* spawning a typed child actor from an untyped parent actor.
* This is normally not needed because you can use the extension methods
* `spawn` and `spawnAnonymous` on an untyped `ActorContext`, but it's needed
* when using typed actors with an existing library/tool that provides an API that
* takes an untyped [[akka.actor.Props]] parameter. Cluster Sharding is an
* example of that.
*/
object PropsAdapter {
def apply[T](behavior: Behavior[T], deploy: DeploymentConfig = EmptyDeploymentConfig): akka.actor.Props =
akka.typed.internal.adapter.PropsAdapter(() behavior, deploy)
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package scaladsl
import akka.annotation.InternalApi
import akka.typed.internal.adapter._
/**
* Scala API: Adapters between typed and untyped actors and actor systems.
* The underlying `ActorSystem` is the untyped [[akka.actor.ActorSystem]]
* which runs Akka Typed [[akka.typed.Behavior]] on an emulation layer. In this
* system typed and untyped actors can coexist.
*
* Use these adapters with `import akka.typed.scaladsl.adapter._`.
*
* Implicit extension methods are added to untyped and typed `ActorSystem`,
* `ActorContext`. Such methods make it possible to create typed child actor
* from untyped parent actor, and the opposite untyped child from typed parent.
* `watch` is also supported in both directions.
*
* There is an implicit conversion from untyped [[akka.actor.ActorRef]] to
* typed [[akka.typed.ActorRef]].
*
* There are also converters (`toTyped`, `toUntyped`) from typed
* [[akka.typed.ActorRef]] to untyped [[akka.actor.ActorRef]], and between untyped
* [[akka.actor.ActorSystem]] and typed [[akka.typed.ActorSystem]].
*/
package object adapter {
import language.implicitConversions
/**
* Extension methods added to [[akka.actor.ActorSystem]].
*/
implicit class UntypedActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal {
def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment)))
def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name))
def toTyped: ActorSystem[Nothing] = ActorSystemAdapter(sys)
}
/**
* Extension methods added to [[akka.typed.ActorSystem]].
*/
implicit class TypedActorSystemOps(val sys: ActorSystem[_]) extends AnyVal {
def toUntyped: akka.actor.ActorSystem = ActorSystemAdapter.toUntyped(sys)
}
/**
* Extension methods added to [[akka.actor.ActorContext]].
*/
implicit class UntypedActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal {
def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorContextAdapter.spawnAnonymous(ctx, behavior, deployment)
def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorContextAdapter.spawn(ctx, behavior, name, deployment)
def watch[U](other: ActorRef[U]): Unit = ctx.watch(ActorRefAdapter.toUntyped(other))
def unwatch[U](other: ActorRef[U]): Unit = ctx.unwatch(ActorRefAdapter.toUntyped(other))
def stop(child: ActorRef[_]): Unit =
ctx.stop(ActorRefAdapter.toUntyped(child))
}
/**
* Extension methods added to [[akka.typed.scaladsl.ActorContext]].
*/
implicit class TypedActorContextOps(val ctx: scaladsl.ActorContext[_]) extends AnyVal {
def actorOf(props: akka.actor.Props): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props)
def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props, name)
// watch, unwatch and stop not needed here because of the implicit ActorRef conversion
}
/**
* Extension methods added to [[akka.typed.ActorRef]].
*/
implicit class TypedActorRefOps(val ref: ActorRef[_]) extends AnyVal {
def toUntyped: akka.actor.ActorRef = ActorRefAdapter.toUntyped(ref)
}
/**
* Implicit conversion from untyped [[akka.actor.ActorRef]] to typed [[akka.typed.ActorRef]].
*/
implicit def actorRefAdapter[T](ref: akka.actor.ActorRef): ActorRef[T] = ActorRefAdapter(ref)
}