From ebb5748d6ac499def08beba7aced3e084635f32f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 5 Apr 2017 08:08:13 +0200 Subject: [PATCH] Complete adapter API for coexistence of typed and untyped actors, #22174 * Trying out what is working and what is missing with a test * Add missing API and cleanup (public vs internal) * Note that PropsAdapter will make it possible to use Cluster Sharding with typed entity actors * add javadsl for the adapters, and full java tests for that * Add narrow to ActorRef --- .../main/scala/akka/actor/ActorSystem.scala | 3 +- .../code/docs/akka/typed/IntroSpec.scala | 5 +- .../java/akka/typed/javadsl/AdapterTest.java | 339 ++++++++++++++++++ .../src/test/scala/akka/typed/AskSpec.scala | 5 +- .../src/test/scala/akka/typed/TypedSpec.scala | 25 +- .../typed/scaladsl/adapter/AdapterSpec.scala | 255 +++++++++++++ .../main/scala/akka/typed/ActorContext.scala | 3 + .../src/main/scala/akka/typed/ActorRef.scala | 8 +- .../main/scala/akka/typed/ActorSystem.scala | 11 +- .../typed/adapter/ActorContextAdapter.scala | 61 ---- .../akka/typed/adapter/ActorRefAdapter.scala | 19 - .../akka/typed/adapter/PropsAdapter.scala | 24 -- .../scala/akka/typed/adapter/package.scala | 43 --- .../akka/typed/internal/ActorSystemImpl.scala | 5 +- .../{ => internal}/adapter/ActorAdapter.scala | 8 +- .../adapter/ActorContextAdapter.scala | 105 ++++++ .../internal/adapter/ActorRefAdapter.scala | 47 +++ .../adapter/ActorSystemAdapter.scala | 40 ++- .../adapter/EventStreamAdapter.scala | 16 +- .../typed/internal/adapter/PropsAdapter.scala | 22 ++ .../scala/akka/typed/javadsl/Adapter.scala | 115 ++++++ .../scala/akka/typed/scaladsl/Actor.scala | 16 +- .../main/scala/akka/typed/scaladsl/Ask.scala | 14 +- .../typed/scaladsl/adapter/PropsAdapter.scala | 23 ++ .../akka/typed/scaladsl/adapter/package.scala | 93 +++++ 25 files changed, 1110 insertions(+), 195 deletions(-) create mode 100644 akka-typed-tests/src/test/java/akka/typed/javadsl/AdapterTest.java create mode 100644 akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/AdapterSpec.scala delete mode 100644 akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala delete mode 100644 akka-typed/src/main/scala/akka/typed/adapter/ActorRefAdapter.scala delete mode 100644 akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala delete mode 100644 akka-typed/src/main/scala/akka/typed/adapter/package.scala rename akka-typed/src/main/scala/akka/typed/{ => internal}/adapter/ActorAdapter.scala (90%) create mode 100644 akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala create mode 100644 akka-typed/src/main/scala/akka/typed/internal/adapter/ActorRefAdapter.scala rename akka-typed/src/main/scala/akka/typed/{ => internal}/adapter/ActorSystemAdapter.scala (65%) rename akka-typed/src/main/scala/akka/typed/{ => internal}/adapter/EventStreamAdapter.scala (64%) create mode 100644 akka-typed/src/main/scala/akka/typed/internal/adapter/PropsAdapter.scala create mode 100644 akka-typed/src/main/scala/akka/typed/javadsl/Adapter.scala create mode 100644 akka-typed/src/main/scala/akka/typed/scaladsl/adapter/PropsAdapter.scala create mode 100644 akka-typed/src/main/scala/akka/typed/scaladsl/adapter/package.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 6eae58a480..722dbd15f5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -702,7 +702,8 @@ private[akka] class ActorSystemImpl( def actorOf(props: Props, name: String): ActorRef = if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false) - else throw new UnsupportedOperationException("cannot create top-level actor from the outside on ActorSystem with custom user guardian") + else throw new UnsupportedOperationException( + s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian") def actorOf(props: Props): ActorRef = if (guardianProps.isEmpty) guardian.underlying.attachChild(props, systemService = false) diff --git a/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala b/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala index 4f0b9bc83a..afdba6b73b 100644 --- a/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala +++ b/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala @@ -121,15 +121,14 @@ class IntroSpec extends TypedSpec { chatRoom ! GetSession("ol’ Gabbler", gabblerRef) Stateful( - behavior = (_, _) ⇒ Unhandled, - signal = { (ctx, sig) ⇒ + onMessage = (_, _) ⇒ Unhandled, + onSignal = (ctx, sig) ⇒ sig match { case Terminated(ref) ⇒ Stopped case _ ⇒ Unhandled } - } ) } diff --git a/akka-typed-tests/src/test/java/akka/typed/javadsl/AdapterTest.java b/akka-typed-tests/src/test/java/akka/typed/javadsl/AdapterTest.java new file mode 100644 index 0000000000..cd22cb31ee --- /dev/null +++ b/akka-typed-tests/src/test/java/akka/typed/javadsl/AdapterTest.java @@ -0,0 +1,339 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.typed.javadsl; + +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.TimeUnit; +import akka.actor.ActorSystem; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import akka.typed.ActorRef; +import akka.typed.Behavior; +import akka.typed.Signal; +import akka.typed.Terminated; +import akka.testkit.javadsl.TestKit; +import akka.actor.SupervisorStrategy; +import static akka.typed.javadsl.Actor.*; + +public class AdapterTest extends JUnitSuite { + + static akka.actor.Props untyped1() { + return akka.actor.Props.create(Untyped1.class, () -> new Untyped1()); + } + + static class Untyped1 extends akka.actor.AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("ping", s -> getSender().tell("pong", getSelf())) + .match(ThrowIt.class, t -> { + throw t; + }) + .build(); + } + } + + static class Typed1 { + private final akka.actor.ActorRef ref; + private final akka.actor.ActorRef probe; + + private Typed1(akka.actor.ActorRef ref, akka.actor.ActorRef probe) { + this.ref = ref; + this.probe = probe; + } + + static Behavior create(akka.actor.ActorRef ref, akka.actor.ActorRef probe) { + Typed1 logic = new Typed1(ref, probe); + return stateful( + (ctx, msg) -> logic.onMessage(ctx, msg), + (ctx, sig) -> logic.onSignal(ctx, sig)); + } + + Behavior onMessage(ActorContext ctx, String msg) { + if (msg.equals("send")) { + akka.actor.ActorRef replyTo = Adapter.toUntyped(ctx.getSelf()); + ref.tell("ping", replyTo); + return same(); + } else if (msg.equals("pong")) { + probe.tell("ok", akka.actor.ActorRef.noSender()); + return same(); + } else if (msg.equals("actorOf")) { + akka.actor.ActorRef child = Adapter.actorOf(ctx, untyped1()); + child.tell("ping", Adapter.toUntyped(ctx.getSelf())); + return same(); + } else if (msg.equals("watch")) { + Adapter.watch(ctx, ref); + return same(); + } else if (msg.equals("supervise-stop")) { + akka.actor.ActorRef child = Adapter.actorOf(ctx, untyped1()); + Adapter.watch(ctx, child); + child.tell(new ThrowIt3(), Adapter.toUntyped(ctx.getSelf())); + child.tell("ping", Adapter.toUntyped(ctx.getSelf())); + return same(); + } else if (msg.equals("stop-child")) { + akka.actor.ActorRef child = Adapter.actorOf(ctx, untyped1()); + Adapter.watch(ctx, child); + Adapter.stop(ctx, child); + return same(); + } else { + return unhandled(); + } + } + + Behavior onSignal(ActorContext ctx, Signal sig) { + if (sig instanceof Terminated) { + probe.tell("terminated", akka.actor.ActorRef.noSender()); + return same(); + } else { + return unhandled(); + } + } + } + + static interface Typed2Msg {}; + static final class Ping implements Typed2Msg { + public final ActorRef replyTo; + + public Ping(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + static final class StopIt implements Typed2Msg {} + static abstract class ThrowIt extends RuntimeException implements Typed2Msg {} + static class ThrowIt1 extends ThrowIt {} + static class ThrowIt2 extends ThrowIt {} + static class ThrowIt3 extends ThrowIt {} + + static akka.actor.Props untyped2(ActorRef ref, akka.actor.ActorRef probe) { + return akka.actor.Props.create(Untyped2.class, () -> new Untyped2(ref, probe)); + } + + static class Untyped2 extends akka.actor.AbstractActor { + private final ActorRef ref; + private final akka.actor.ActorRef probe; + private final SupervisorStrategy strategy; + + Untyped2(ActorRef ref, akka.actor.ActorRef probe) { + this.ref = ref; + this.probe = probe; + this.strategy = strategy(); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("send", s -> { + ActorRef replyTo = Adapter.toTyped(getSelf()); + ref.tell(new Ping(replyTo)); + }) + .matchEquals("pong", s -> probe.tell("ok", getSelf())) + .matchEquals("spawn", s -> { + ActorRef child = Adapter.spawnAnonymous(getContext(), typed2()); + child.tell(new Ping(Adapter.toTyped(getSelf()))); + }) + .matchEquals("actorOf-props", s -> { + // this is how Cluster Sharding can be used + akka.actor.ActorRef child = getContext().actorOf(typed2Props()); + child.tell(new Ping(Adapter.toTyped(getSelf())), akka.actor.ActorRef.noSender()); + }) + .matchEquals("watch", s -> Adapter.watch(getContext(), ref)) + .match(akka.actor.Terminated.class, t -> probe.tell("terminated", getSelf())) + .matchEquals("supervise-stop", s -> testSupervice(new ThrowIt1())) + .matchEquals("supervise-resume", s -> testSupervice(new ThrowIt2())) + .matchEquals("supervise-restart", s -> testSupervice(new ThrowIt3())) + .matchEquals("stop-child", s -> { + ActorRef child = Adapter.spawnAnonymous(getContext(), typed2()); + Adapter.watch(getContext(), child); + Adapter.stop(getContext(), child); + }) + .build(); + } + + private void testSupervice(ThrowIt t) { + ActorRef child = Adapter.spawnAnonymous(getContext(), typed2()); + Adapter.watch(getContext(), child); + child.tell(t); + child.tell(new Ping(Adapter.toTyped(getSelf()))); + } + + private SupervisorStrategy strategy() { + return new akka.actor.OneForOneStrategy(false, akka.japi.pf.DeciderBuilder + .match(ThrowIt1.class, e -> { + probe.tell("thrown-stop", getSelf()); + return SupervisorStrategy.stop(); + }) + .match(ThrowIt2.class, e -> { + probe.tell("thrown-resume", getSelf()); + return SupervisorStrategy.resume(); + }) + .match(ThrowIt3.class, e -> { + probe.tell("thrown-restart", getSelf()); + // TODO Restart will not really restart the behavior + return SupervisorStrategy.restart(); + }) + .build()); + } + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + } + + static Behavior typed2() { + return Actor.stateful((ctx, msg) -> { + if (msg instanceof Ping) { + ActorRef replyTo = ((Ping) msg).replyTo; + replyTo.tell("pong"); + return same(); + } else if (msg instanceof StopIt) { + return stopped(); + } else if (msg instanceof ThrowIt) { + throw (ThrowIt) msg; + } else { + return unhandled(); + } + }); + } + + static akka.actor.Props typed2Props() { + return Adapter.props(() -> typed2()); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorSelectionTest", + AkkaSpec.testConf()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + + + @Test + public void shouldSendMessageFromTypedToUntyped() { + TestKit probe = new TestKit(system); + akka.actor.ActorRef untypedRef = system.actorOf(untyped1()); + ActorRef typedRef = Adapter.spawnAnonymous(system, Typed1.create(untypedRef, probe.getRef())); + typedRef.tell("send"); + probe.expectMsg("ok"); + } + + @Test + public void shouldSendMessageFromUntypedToTyped() { + TestKit probe = new TestKit(system); + ActorRef typedRef = Adapter.spawnAnonymous(system, typed2()).narrow(); + akka.actor.ActorRef untypedRef = system.actorOf(untyped2(typedRef, probe.getRef())); + untypedRef.tell("send", akka.actor.ActorRef.noSender()); + probe.expectMsg("ok"); + } + + @Test + public void shouldSpawnTypedChildFromUntypedParent() { + TestKit probe = new TestKit(system); + ActorRef ignore = Adapter.spawnAnonymous(system, ignore()); + akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef())); + untypedRef.tell("spawn", akka.actor.ActorRef.noSender()); + probe.expectMsg("ok"); + } + + @Test + public void shouldActorOfTypedChildViaPropsFromUntypedParent() { + TestKit probe = new TestKit(system); + ActorRef ignore = Adapter.spawnAnonymous(system, ignore()); + akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef())); + untypedRef.tell("actorOf-props", akka.actor.ActorRef.noSender()); + probe.expectMsg("ok"); + } + + @Test + public void shouldActorOfUntypedChildFromTypedParent() { + TestKit probe = new TestKit(system); + akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty()); + ActorRef typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef())); + typedRef.tell("actorOf"); + probe.expectMsg("ok"); + } + + @Test + public void shouldWatchTypedFromUntyped() { + TestKit probe = new TestKit(system); + ActorRef typedRef = Adapter.spawnAnonymous(system, typed2()); + ActorRef typedRef2 = typedRef.narrow(); + akka.actor.ActorRef untypedRef = system.actorOf(untyped2(typedRef2, probe.getRef())); + untypedRef.tell("watch", akka.actor.ActorRef.noSender()); + typedRef.tell(new StopIt()); + probe.expectMsg("terminated"); + } + + @Test + public void shouldWatchUntypedFromTyped() { + TestKit probe = new TestKit(system); + akka.actor.ActorRef untypedRef = system.actorOf(untyped1()); + ActorRef typedRef = Adapter.spawnAnonymous(system, Typed1.create(untypedRef, probe.getRef())); + typedRef.tell("watch"); + untypedRef.tell(akka.actor.PoisonPill.getInstance() , akka.actor.ActorRef.noSender()); + probe.expectMsg("terminated"); + } + + @Test + public void shouldSuperviseTypedChildFromUntypedParent() { + TestKit probe = new TestKit(system); + ActorRef ignore = Adapter.spawnAnonymous(system, ignore()); + akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef())); + untypedRef.tell("supervise-stop", akka.actor.ActorRef.noSender()); + probe.expectMsg("thrown-stop"); + // ping => ok should not get through here + probe.expectMsg("terminated"); + + untypedRef.tell("supervise-resume", akka.actor.ActorRef.noSender()); + probe.expectMsg("thrown-resume"); + probe.expectMsg("ok"); + + untypedRef.tell("supervise-restart", akka.actor.ActorRef.noSender()); + probe.expectMsg("thrown-restart"); + probe.expectMsg("ok"); + } + + @Test + public void shouldSuperviseUntypedChildFromTypedParent() { + TestKit probe = new TestKit(system); + akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty()); + ActorRef typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef())); + + int originalLogLevel = system.eventStream().logLevel(); + try { + // supress the logging with stack trace + system.eventStream().setLogLevel(Integer.MIN_VALUE); // OFF + + // only stop supervisorStrategy + typedRef.tell("supervise-stop"); + probe.expectMsg("terminated"); + } finally { + system.eventStream().setLogLevel(originalLogLevel); + } + probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); // no pong + } + + @Test + public void shouldStopTypedChildFromUntypedParent() { + TestKit probe = new TestKit(system); + ActorRef ignore = Adapter.spawnAnonymous(system, ignore()); + akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef())); + untypedRef.tell("stop-child", akka.actor.ActorRef.noSender()); + probe.expectMsg("terminated"); + } + + @Test + public void shouldStopUntypedChildFromTypedParent() { + TestKit probe = new TestKit(system); + akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty()); + ActorRef typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef())); + typedRef.tell("stop-child"); + probe.expectMsg("terminated"); + } +} diff --git a/akka-typed-tests/src/test/scala/akka/typed/AskSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/AskSpec.scala index 2e70cbed74..3f48b9a929 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/AskSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/AskSpec.scala @@ -65,8 +65,9 @@ class AskSpec extends TypedSpec with ScalaFutures { /** See issue #19947 (MatchError with adapted ActorRef) */ def `must fail the future if the actor doesn't exist`(): Unit = { val noSuchActor: ActorRef[Msg] = system match { - case adaptedSys: adapter.ActorSystemAdapter[_] ⇒ - adapter.actorRefAdapter(adaptedSys.untyped.provider.resolveActorRef("/foo/bar")) + case adaptedSys: akka.typed.internal.adapter.ActorSystemAdapter[_] ⇒ + import akka.typed.scaladsl.adapter._ + adaptedSys.untyped.provider.resolveActorRef("/foo/bar") case _ ⇒ fail("this test must only run in an adapted actor system") } diff --git a/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala index b7f04c6439..fd5b4fbb17 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala @@ -39,7 +39,7 @@ class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with S /** * Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter. */ -class TypedSpec(val config: Config) extends TypedSpecSetup { +abstract class TypedSpec(val config: Config) extends TypedSpecSetup { import TypedSpec._ import AskPattern._ @@ -48,8 +48,18 @@ class TypedSpec(val config: Config) extends TypedSpecSetup { // extension point def setTimeout: Timeout = Timeout(1.minute) - lazy val nativeSystem = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) - lazy val adaptedSystem = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) + private var nativeSystemUsed = false + lazy val nativeSystem: ActorSystem[TypedSpec.Command] = { + val sys = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) + nativeSystemUsed = true + sys + } + private var adaptedSystemUsed = false + lazy val adaptedSystem: ActorSystem[TypedSpec.Command] = { + val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) + adaptedSystemUsed = false + sys + } trait NativeSystem { def system = nativeSystem @@ -63,8 +73,10 @@ class TypedSpec(val config: Config) extends TypedSpecSetup { implicit def scheduler = nativeSystem.scheduler override def afterAll(): Unit = { - Await.result(nativeSystem ? (Terminate(_)), timeout.duration): Status - Await.result(adaptedSystem ? (Terminate(_)), timeout.duration): Status + if (nativeSystemUsed) + Await.result(nativeSystem ? (Terminate(_)), timeout.duration): Status + if (adaptedSystemUsed) + Await.result(adaptedSystem ? (Terminate(_)), timeout.duration): Status } // TODO remove after basing on ScalaTest 3 with async support @@ -197,8 +209,7 @@ class TypedSpecSpec extends TypedSpec { sync(runTest("failure")(StepWise[String]((ctx, startWith) ⇒ startWith { fail("expected") - } - ))) + }))) } } } diff --git a/akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/AdapterSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/AdapterSpec.scala new file mode 100644 index 0000000000..ba5cc9411f --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/AdapterSpec.scala @@ -0,0 +1,255 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.scaladsl.adapter + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import akka.{ actor ⇒ untyped } +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.Terminated +import akka.typed.scaladsl.Actor._ +import akka.{ actor ⇒ untyped } +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.testkit._ + +object AdapterSpec { + val untyped1: untyped.Props = untyped.Props(new Untyped1) + + class Untyped1 extends untyped.Actor { + def receive = { + case "ping" ⇒ sender() ! "pong" + case t: ThrowIt ⇒ throw t + } + } + + def typed1(ref: untyped.ActorRef, probe: ActorRef[String]): Behavior[String] = + Stateful( + onMessage = (ctx, msg) ⇒ + msg match { + case "send" ⇒ + val replyTo = ctx.self.toUntyped + ref.tell("ping", replyTo) + Same + case "pong" ⇒ + probe ! "ok" + Same + case "actorOf" ⇒ + val child = ctx.actorOf(untyped1) + child.tell("ping", ctx.self.toUntyped) + Same + case "watch" ⇒ + ctx.watch(ref) + Same + case "supervise-stop" ⇒ + val child = ctx.actorOf(untyped1) + ctx.watch(child) + child ! ThrowIt3 + child.tell("ping", ctx.self.toUntyped) + Same + case "stop-child" ⇒ + val child = ctx.actorOf(untyped1) + ctx.watch(child) + ctx.stop(child) + Same + }, + onSignal = (ctx, sig) ⇒ sig match { + case Terminated(ref) ⇒ + probe ! "terminated" + Same + case _ ⇒ Unhandled + }) + + sealed trait Typed2Msg + final case class Ping(replyTo: ActorRef[String]) extends Typed2Msg + case object StopIt extends Typed2Msg + sealed trait ThrowIt extends RuntimeException with Typed2Msg with NoStackTrace + case object ThrowIt1 extends ThrowIt + case object ThrowIt2 extends ThrowIt + case object ThrowIt3 extends ThrowIt + + def untyped2(ref: ActorRef[Ping], probe: ActorRef[String]): untyped.Props = + untyped.Props(new Untyped2(ref, probe)) + + class Untyped2(ref: ActorRef[Ping], probe: ActorRef[String]) extends untyped.Actor { + + override val supervisorStrategy = untyped.OneForOneStrategy() { + ({ + case ThrowIt1 ⇒ + probe ! "thrown-stop" + untyped.SupervisorStrategy.Stop + case ThrowIt2 ⇒ + probe ! "thrown-resume" + untyped.SupervisorStrategy.Resume + case ThrowIt3 ⇒ + probe ! "thrown-restart" + // TODO Restart will not really restart the behavior + untyped.SupervisorStrategy.Restart + }: untyped.SupervisorStrategy.Decider).orElse(untyped.SupervisorStrategy.defaultDecider) + } + + def receive = { + case "send" ⇒ ref ! Ping(self) // implicit conversion + case "pong" ⇒ probe ! "ok" + case "spawn" ⇒ + val child = context.spawnAnonymous(typed2) + child ! Ping(self) + case "actorOf-props" ⇒ + // this is how Cluster Sharding can be used + val child = context.actorOf(typed2Props) + child ! Ping(self) + case "watch" ⇒ + context.watch(ref) + case untyped.Terminated(_) ⇒ + probe ! "terminated" + case "supervise-stop" ⇒ + testSupervice(ThrowIt1) + case "supervise-resume" ⇒ + testSupervice(ThrowIt2) + case "supervise-restart" ⇒ + testSupervice(ThrowIt3) + case "stop-child" ⇒ + val child = context.spawnAnonymous(typed2) + context.watch(child) + context.stop(child) + } + + private def testSupervice(t: ThrowIt): Unit = { + val child = context.spawnAnonymous(typed2) + context.watch(child) + child ! t + child ! Ping(self) + } + } + + def typed2: Behavior[Typed2Msg] = + Stateful { (ctx, msg) ⇒ + msg match { + case Ping(replyTo) ⇒ + replyTo ! "pong" + Same + case StopIt ⇒ + Stopped + case t: ThrowIt ⇒ + throw t + } + } + + def typed2Props: untyped.Props = PropsAdapter(typed2) + +} + +class AdapterSpec extends AkkaSpec { + import AdapterSpec._ + + "Adapted actors" must { + + "send message from typed to untyped" in { + val probe = TestProbe() + val untypedRef = system.actorOf(untyped1) + val typedRef = system.spawnAnonymous(typed1(untypedRef, probe.ref)) + typedRef ! "send" + probe.expectMsg("ok") + } + + "send message from untyped to typed" in { + val probe = TestProbe() + val typedRef = system.spawnAnonymous(typed2) + val untypedRef = system.actorOf(untyped2(typedRef, probe.ref)) + untypedRef ! "send" + probe.expectMsg("ok") + } + + "spawn typed child from untyped parent" in { + val probe = TestProbe() + val ignore = system.spawnAnonymous(Ignore[Ping]) + val untypedRef = system.actorOf(untyped2(ignore, probe.ref)) + untypedRef ! "spawn" + probe.expectMsg("ok") + } + + "actorOf typed child via Props from untyped parent" in { + val probe = TestProbe() + val ignore = system.spawnAnonymous(Ignore[Ping]) + val untypedRef = system.actorOf(untyped2(ignore, probe.ref)) + untypedRef ! "actorOf-props" + probe.expectMsg("ok") + } + + "actorOf untyped child from typed parent" in { + val probe = TestProbe() + val ignore = system.actorOf(untyped.Props.empty) + val typedRef = system.spawnAnonymous(typed1(ignore, probe.ref)) + typedRef ! "actorOf" + probe.expectMsg("ok") + } + + "watch typed from untyped" in { + val probe = TestProbe() + val typedRef = system.spawnAnonymous(typed2) + val untypedRef = system.actorOf(untyped2(typedRef, probe.ref)) + untypedRef ! "watch" + typedRef ! StopIt + probe.expectMsg("terminated") + } + + "watch untyped from typed" in { + val probe = TestProbe() + val untypedRef = system.actorOf(untyped1) + val typedRef = system.spawnAnonymous(typed1(untypedRef, probe.ref)) + typedRef ! "watch" + untypedRef ! untyped.PoisonPill + probe.expectMsg("terminated") + } + + "supervise typed child from untyped parent" in { + val probe = TestProbe() + val ignore = system.spawnAnonymous(Ignore[Ping]) + val untypedRef = system.actorOf(untyped2(ignore, probe.ref)) + + untypedRef ! "supervise-stop" + probe.expectMsg("thrown-stop") + // ping => ok should not get through here + probe.expectMsg("terminated") + + untypedRef ! "supervise-resume" + probe.expectMsg("thrown-resume") + probe.expectMsg("ok") + + untypedRef ! "supervise-restart" + probe.expectMsg("thrown-restart") + probe.expectMsg("ok") + } + + "supervise untyped child from typed parent" in { + val probe = TestProbe() + val ignore = system.actorOf(untyped.Props.empty) + val typedRef = system.spawnAnonymous(typed1(ignore, probe.ref)) + + // only stop supervisorStrategy + typedRef ! "supervise-stop" + probe.expectMsg("terminated") + probe.expectNoMsg(100.millis) // no pong + } + + "stop typed child from untyped parent" in { + val probe = TestProbe() + val ignore = system.spawnAnonymous(Ignore[Ping]) + val untypedRef = system.actorOf(untyped2(ignore, probe.ref)) + untypedRef ! "stop-child" + probe.expectMsg("terminated") + } + + "stop untyped child from typed parent" in { + val probe = TestProbe() + val ignore = system.actorOf(untyped.Props.empty) + val typedRef = system.spawnAnonymous(typed1(ignore, probe.ref)) + typedRef ! "stop-child" + probe.expectMsg("terminated") + } + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/ActorContext.scala index 9f0a46986a..f900cddc02 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorContext.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorContext.scala @@ -17,6 +17,9 @@ import akka.annotation.ApiMayChange @DoNotInherit @ApiMayChange trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext[T] { + + // FIXME can we simplify this weird hierarchy of contexts, e.g. problem with createAdapter + override def getChild(name: String): Optional[ActorRef[Void]] = child(name) match { case Some(c) ⇒ Optional.of(c.upcast[Void]) diff --git a/akka-typed/src/main/scala/akka/typed/ActorRef.scala b/akka-typed/src/main/scala/akka/typed/ActorRef.scala index 0fbc0b28bf..4a7873c93f 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorRef.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorRef.scala @@ -31,6 +31,11 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act */ def !(msg: T): Unit = tell(msg) + /** + * Narrow the type of this `ActorRef, which is always a safe operation. + */ + final def narrow[U <: T]: ActorRef[U] = this.asInstanceOf[ActorRef[U]] + /** * Unsafe utility method for widening the type accepted by this ActorRef; * provided to avoid having to use `asInstanceOf` on the full reference type, @@ -73,7 +78,8 @@ object ActorRef { * Create an ActorRef from a Future, buffering up to the given number of * messages in while the Future is not fulfilled. */ - def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = new internal.FutureRef(FuturePath, bufferSize, f) + def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = + new internal.FutureRef(FuturePath, bufferSize, f) /** * Create an ActorRef by providing a function that is invoked for sending diff --git a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala index 9557befad6..914cc2ec1a 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala @@ -11,7 +11,7 @@ import akka.actor.setup.ActorSystemSetup import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.{ ExecutionContextExecutor, Future } -import akka.typed.adapter.{ ActorSystemAdapter, PropsAdapter } +import akka.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter } import akka.util.Timeout import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange @@ -180,10 +180,17 @@ object ActorSystem { classLoader: Option[ClassLoader] = None, executionContext: Option[ExecutionContext] = None, actorSystemSettings: ActorSystemSetup = ActorSystemSetup.empty): ActorSystem[T] = { + + // TODO I'm not sure how useful this mode is for end-users. It has the limitation that untyped top level + // actors can't be created, because we have a custom user guardian. I would imagine that if you have + // a system of both untyped and typed actors (e.g. adding some typed actors to an existing application) + // you would start an untyped.ActorSystem and spawn typed actors from that system or from untyped actors. + Behavior.validateAsInitial(guardianBehavior) val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader()) val appConfig = config.getOrElse(ConfigFactory.load(cl)) - val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(guardianBehavior, guardianDeployment)), actorSystemSettings) + val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, + Some(PropsAdapter(() ⇒ guardianBehavior, guardianDeployment)), actorSystemSettings) untyped.start() new ActorSystemAdapter(untyped) } diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala deleted file mode 100644 index b3c8049508..0000000000 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.typed -package adapter - -import akka.{ actor ⇒ a } -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContextExecutor - -/** - * INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]]. - */ -private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorContext[T] { - - override def self = ActorRefAdapter(ctx.self) - override val system = ActorSystemAdapter(ctx.system) - override def mailboxCapacity = 1 << 29 // FIXME - override def children = ctx.children.map(ActorRefAdapter(_)) - override def child(name: String) = ctx.child(name).map(ActorRefAdapter(_)) - override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig) = - ctx.spawnAnonymous(behavior, deployment) - override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) = - ctx.spawn(behavior, name, deployment) - override def stop(child: ActorRef[_]) = - toUntyped(child) match { - case f: akka.actor.FunctionRef ⇒ - val cell = ctx.asInstanceOf[akka.actor.ActorCell] - cell.removeFunctionRef(f) - case untyped ⇒ - ctx.child(child.path.name) match { - case Some(`untyped`) ⇒ - ctx.stop(untyped) - true - case _ ⇒ - false // none of our business - } - } - override def watch(other: ActorRef[_]) = ctx.watch(toUntyped(other)) - override def unwatch(other: ActorRef[_]) = ctx.unwatch(toUntyped(other)) - var receiveTimeoutMsg: T = null.asInstanceOf[T] - override def setReceiveTimeout(d: FiniteDuration, msg: T) = { - receiveTimeoutMsg = msg - ctx.setReceiveTimeout(d) - } - override def cancelReceiveTimeout(): Unit = { - receiveTimeoutMsg = null.asInstanceOf[T] - ctx.setReceiveTimeout(Duration.Undefined) - } - override def executionContext: ExecutionContextExecutor = ctx.dispatcher - override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): a.Cancellable = { - import ctx.dispatcher - ctx.system.scheduler.scheduleOnce(delay, toUntyped(target), msg) - } - override def spawnAdapter[U](f: U ⇒ T, name: String = ""): ActorRef[U] = { - val cell = ctx.asInstanceOf[akka.actor.ActorCell] - val ref = cell.addFunctionRef((_, msg) ⇒ ctx.self ! f(msg.asInstanceOf[U]), name) - ActorRefAdapter[U](ref) - } - -} diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorRefAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorRefAdapter.scala deleted file mode 100644 index 3477beca3f..0000000000 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorRefAdapter.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.typed -package adapter - -import akka.{ actor ⇒ a } - -private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef) - extends ActorRef[T](untyped.path) with internal.ActorRefImpl[T] { - - override def tell(msg: T): Unit = untyped ! msg - override def isLocal: Boolean = true - override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped, signal) -} - -private[typed] object ActorRefAdapter { - def apply[T](untyped: a.ActorRef): ActorRef[T] = new ActorRefAdapter(untyped.asInstanceOf[a.InternalActorRef]) -} diff --git a/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala deleted file mode 100644 index 3ab936ad61..0000000000 --- a/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.typed -package adapter - -import akka.{ actor ⇒ a } - -private[typed] object PropsAdapter { - - // FIXME dispatcher and queue size - def apply(b: Behavior[_], deploy: DeploymentConfig): a.Props = new a.Props(a.Deploy(), classOf[ActorAdapter[_]], (b: AnyRef) :: Nil) - - def apply[T](p: a.Props): Behavior[T] = { - assert(p.clazz == classOf[ActorAdapter[_]], "typed.Actor must have typed.Props") - p.args match { - case (initial: Behavior[_]) :: Nil ⇒ - // FIXME queue size - initial.asInstanceOf[Behavior[T]] - case _ ⇒ throw new AssertionError("typed.Actor args must be right") - } - } - -} diff --git a/akka-typed/src/main/scala/akka/typed/adapter/package.scala b/akka-typed/src/main/scala/akka/typed/adapter/package.scala deleted file mode 100644 index 30222a221d..0000000000 --- a/akka-typed/src/main/scala/akka/typed/adapter/package.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.typed - -package object adapter { - - import language.implicitConversions - import akka.dispatch.sysmsg - - implicit class ActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal { - def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = - ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment))) - def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = - ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name)) - } - - implicit class ActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal { - def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = - ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment))) - def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = - ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name)) - } - - implicit def actorRefAdapter(ref: akka.actor.ActorRef): ActorRef[Any] = ActorRefAdapter(ref) - - private[adapter] def toUntyped[U](ref: ActorRef[U]): akka.actor.InternalActorRef = - ref match { - case adapter: ActorRefAdapter[_] ⇒ adapter.untyped - case _ ⇒ throw new UnsupportedOperationException(s"only adapted untyped ActorRefs permissible ($ref of class ${ref.getClass})") - } - - private[adapter] def sendSystemMessage(untyped: akka.actor.InternalActorRef, signal: internal.SystemMessage): Unit = - signal match { - case internal.Create() ⇒ throw new IllegalStateException("WAT? No, seriously.") - case internal.Terminate() ⇒ untyped.stop() - case internal.Watch(watchee, watcher) ⇒ untyped.sendSystemMessage(sysmsg.Watch(toUntyped(watchee), toUntyped(watcher))) - case internal.Unwatch(watchee, watcher) ⇒ untyped.sendSystemMessage(sysmsg.Unwatch(toUntyped(watchee), toUntyped(watcher))) - case internal.DeathWatchNotification(ref, cause) ⇒ untyped.sendSystemMessage(sysmsg.DeathWatchNotification(toUntyped(ref), true, false)) - case internal.NoMessage ⇒ // just to suppress the warning - } - -} diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala index 688c34d4b0..0a8398449c 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala @@ -162,7 +162,7 @@ private[typed] class ActorSystemImpl[-T]( */ private object eventStreamStub extends e.EventStream(null, false) { override def subscribe(ref: a.ActorRef, ch: Class[_]): Boolean = - throw new UnsupportedOperationException("cannot use this eventstream for subscribing") + throw new UnsupportedOperationException("Cannot use this eventstream for subscribing") override def publish(event: AnyRef): Unit = eventStream.publish(event) } /** @@ -190,7 +190,8 @@ private[typed] class ActorSystemImpl[-T]( private val terminateTriggered = new AtomicBoolean private val theOneWhoWalksTheBubblesOfSpaceTime: ActorRefImpl[Nothing] = new ActorRef[Nothing](rootPath) with ActorRefImpl[Nothing] { - override def tell(msg: Nothing): Unit = throw new UnsupportedOperationException("cannot send to theOneWhoWalksTheBubblesOfSpaceTime") + override def tell(msg: Nothing): Unit = + throw new UnsupportedOperationException("Cannot send to theOneWhoWalksTheBubblesOfSpaceTime") override def sendSystem(signal: SystemMessage): Unit = signal match { case Terminate() ⇒ if (terminateTriggered.compareAndSet(false, true)) diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala similarity index 90% rename from akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala rename to akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala index 0695fdb041..3964c91bb7 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala @@ -2,12 +2,18 @@ * Copyright (C) 2016-2017 Lightbend Inc. */ package akka.typed +package internal package adapter import akka.{ actor ⇒ a } +import akka.annotation.InternalApi -private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor { +/** + * INTERNAL API + */ +@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor { import Behavior._ + import ActorRefAdapter.toUntyped var behavior: Behavior[T] = _initialBehavior diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala new file mode 100644 index 0000000000..68396fbe19 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.typed +package internal +package adapter + +import akka.{ actor ⇒ a } +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContextExecutor +import akka.annotation.InternalApi + +/** + * INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]]. + */ +@InternalApi private[typed] class ActorContextAdapter[T](val untyped: a.ActorContext) extends ActorContext[T] { + + import ActorRefAdapter.sendSystemMessage + import ActorRefAdapter.toUntyped + + override def self = ActorRefAdapter(untyped.self) + override val system = ActorSystemAdapter(untyped.system) + override def mailboxCapacity = 1 << 29 // FIXME + override def children = untyped.children.map(ActorRefAdapter(_)) + override def child(name: String) = untyped.child(name).map(ActorRefAdapter(_)) + override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig) = + ActorContextAdapter.spawnAnonymous(untyped, behavior, deployment) + override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) = + ActorContextAdapter.spawn(untyped, behavior, name, deployment) + override def stop(child: ActorRef[_]) = + toUntyped(child) match { + case f: akka.actor.FunctionRef ⇒ + val cell = untyped.asInstanceOf[akka.actor.ActorCell] + cell.removeFunctionRef(f) + case c ⇒ + untyped.child(child.path.name) match { + case Some(`c`) ⇒ + untyped.stop(c) + true + case _ ⇒ + false // none of our business + } + } + override def watch(other: ActorRef[_]) = { untyped.watch(toUntyped(other)) } + override def unwatch(other: ActorRef[_]) = { untyped.unwatch(toUntyped(other)) } + var receiveTimeoutMsg: T = null.asInstanceOf[T] + override def setReceiveTimeout(d: FiniteDuration, msg: T) = { + receiveTimeoutMsg = msg + untyped.setReceiveTimeout(d) + } + override def cancelReceiveTimeout(): Unit = { + receiveTimeoutMsg = null.asInstanceOf[T] + untyped.setReceiveTimeout(Duration.Undefined) + } + override def executionContext: ExecutionContextExecutor = untyped.dispatcher + override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): a.Cancellable = { + import untyped.dispatcher + untyped.system.scheduler.scheduleOnce(delay, toUntyped(target), msg) + } + override def spawnAdapter[U](f: U ⇒ T, name: String = ""): ActorRef[U] = { + val cell = untyped.asInstanceOf[akka.actor.ActorCell] + val ref = cell.addFunctionRef((_, msg) ⇒ untyped.self ! f(msg.asInstanceOf[U]), name) + ActorRefAdapter[U](ref) + } + +} + +/** + * INTERNAL API + */ +@InternalApi private[typed] object ActorContextAdapter { + def toUntyped[U](ctx: ActorContext[_]): a.ActorContext = + ctx match { + case adapter: ActorContextAdapter[_] ⇒ adapter.untyped + case _ ⇒ + throw new UnsupportedOperationException("only adapted untyped ActorContext permissible " + + s"($ctx of class ${ctx.getClass.getName})") + } + + def toUntyped[U](ctx: scaladsl.ActorContext[_]): a.ActorContext = + ctx match { + case c: ActorContext[_] ⇒ toUntyped(c) + case _ ⇒ + throw new UnsupportedOperationException("unknown ActorContext type " + + s"($ctx of class ${ctx.getClass.getName})") + } + + def toUntyped[U](ctx: javadsl.ActorContext[_]): a.ActorContext = + ctx match { + case c: ActorContext[_] ⇒ toUntyped(c) + case _ ⇒ + throw new UnsupportedOperationException("unknown ActorContext type " + + s"($ctx of class ${ctx.getClass.getName})") + } + + def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], deployment: DeploymentConfig): ActorRef[T] = { + Behavior.validateAsInitial(behavior) + ActorRefAdapter(ctx.actorOf(PropsAdapter(() ⇒ behavior, deployment))) + } + + def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, deployment: DeploymentConfig): ActorRef[T] = { + Behavior.validateAsInitial(behavior) + ActorRefAdapter(ctx.actorOf(PropsAdapter(() ⇒ behavior, deployment), name)) + } +} diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorRefAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorRefAdapter.scala new file mode 100644 index 0000000000..14199e8616 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorRefAdapter.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.typed +package internal +package adapter + +import akka.{ actor ⇒ a } +import akka.annotation.InternalApi +import akka.dispatch.sysmsg + +/** + * INTERNAL API + */ +@InternalApi private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef) + extends ActorRef[T](untyped.path) with internal.ActorRefImpl[T] { + + override def tell(msg: T): Unit = untyped ! msg + override def isLocal: Boolean = untyped.isLocal + override def sendSystem(signal: internal.SystemMessage): Unit = + ActorRefAdapter.sendSystemMessage(untyped, signal) +} + +private[typed] object ActorRefAdapter { + def apply[T](untyped: a.ActorRef): ActorRef[T] = new ActorRefAdapter(untyped.asInstanceOf[a.InternalActorRef]) + + def toUntyped[U](ref: ActorRef[U]): akka.actor.InternalActorRef = + ref match { + case adapter: ActorRefAdapter[_] ⇒ adapter.untyped + case _ ⇒ + throw new UnsupportedOperationException("only adapted untyped ActorRefs permissible " + + s"($ref of class ${ref.getClass.getName})") + } + + def sendSystemMessage(untyped: akka.actor.InternalActorRef, signal: internal.SystemMessage): Unit = + signal match { + case internal.Create() ⇒ throw new IllegalStateException("WAT? No, seriously.") + case internal.Terminate() ⇒ untyped.stop() + case internal.Watch(watchee, watcher) ⇒ untyped.sendSystemMessage( + sysmsg.Watch( + toUntyped(watchee), + toUntyped(watcher))) + case internal.Unwatch(watchee, watcher) ⇒ untyped.sendSystemMessage(sysmsg.Unwatch(toUntyped(watchee), toUntyped(watcher))) + case internal.DeathWatchNotification(ref, cause) ⇒ untyped.sendSystemMessage(sysmsg.DeathWatchNotification(toUntyped(ref), true, false)) + case internal.NoMessage ⇒ // just to suppress the warning + } +} diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala similarity index 65% rename from akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala rename to akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala index 1fd87b0a03..b86ab06d34 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2017 Lightbend Inc. */ package akka.typed +package internal package adapter import akka.{ actor ⇒ a, dispatch ⇒ d } @@ -9,19 +10,21 @@ import akka.dispatch.sysmsg import scala.concurrent.ExecutionContextExecutor import akka.util.Timeout import scala.concurrent.Future +import akka.annotation.InternalApi /** - * Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context). + * INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context). * Therefore it does not have a lot of vals, only the whenTerminated Future is cached after * its transformation because redoing that every time will add extra objects that persist for * a longer time; in all other cases the wrapper will just be spawned for a single call in * most circumstances. */ -private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) +@InternalApi private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) extends ActorRef[T](a.RootActorPath(a.Address("akka", untyped.name)) / "user") with ActorSystem[T] with internal.ActorRefImpl[T] { import ActorSystemAdapter._ + import ActorRefAdapter.sendSystemMessage // Members declared in akka.typed.ActorRef override def tell(msg: T): Unit = untyped.guardian ! msg @@ -33,10 +36,12 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) override def dispatchers: Dispatchers = new Dispatchers { override def lookup(selector: DispatcherSelector): ExecutionContextExecutor = selector match { - case DispatcherDefault(_) ⇒ untyped.dispatcher - case DispatcherFromConfig(str, _) ⇒ untyped.dispatchers.lookup(str) - case DispatcherFromExecutionContext(_, _) ⇒ throw new UnsupportedOperationException("cannot use DispatcherFromExecutionContext with ActorSystemAdapter") - case DispatcherFromExecutor(_, _) ⇒ throw new UnsupportedOperationException("cannot use DispatcherFromExecutor with ActorSystemAdapter") + case DispatcherDefault(_) ⇒ untyped.dispatcher + case DispatcherFromConfig(str, _) ⇒ untyped.dispatchers.lookup(str) + case DispatcherFromExecutionContext(_, _) ⇒ + throw new UnsupportedOperationException("Cannot use DispatcherFromExecutionContext with ActorSystemAdapter") + case DispatcherFromExecutor(_, _) ⇒ + throw new UnsupportedOperationException("Cannot use DispatcherFromExecutor with ActorSystemAdapter") } override def shutdown(): Unit = () // there was no shutdown in untyped Akka } @@ -65,8 +70,8 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) untyped.whenTerminated.map(t ⇒ Terminated(ActorRefAdapter(t.actor))(null))(sameThreadExecutionContext) def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]] = { - val ref = untyped.systemActorOf(PropsAdapter(behavior, deployment), name) - Future.successful(ref) + val ref = untyped.systemActorOf(PropsAdapter(() ⇒ behavior, deployment), name) + Future.successful(ActorRefAdapter(ref)) } } @@ -74,9 +79,24 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) private[typed] object ActorSystemAdapter { def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl]) - object ReceptionistExtension extends a.ExtensionKey[ReceptionistExtension] + def toUntyped[U](sys: ActorSystem[_]): a.ActorSystem = + sys match { + case adapter: ActorSystemAdapter[_] ⇒ adapter.untyped + case _ ⇒ throw new UnsupportedOperationException("only adapted untyped ActorSystem permissible " + + s"($sys of class ${sys.getClass.getName})") + } + + object ReceptionistExtension extends a.ExtensionId[ReceptionistExtension] with a.ExtensionIdProvider { + override def get(system: a.ActorSystem): ReceptionistExtension = super.get(system) + override def lookup = ReceptionistExtension + override def createExtension(system: a.ExtendedActorSystem): ReceptionistExtension = + new ReceptionistExtension(system) + } + class ReceptionistExtension(system: a.ExtendedActorSystem) extends a.Extension { val receptionist: ActorRef[patterns.Receptionist.Command] = - ActorRefAdapter(system.systemActorOf(PropsAdapter(patterns.Receptionist.behavior, EmptyDeploymentConfig), "receptionist")) + ActorRefAdapter(system.systemActorOf( + PropsAdapter(() ⇒ patterns.Receptionist.behavior, EmptyDeploymentConfig), + "receptionist")) } } diff --git a/akka-typed/src/main/scala/akka/typed/adapter/EventStreamAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/EventStreamAdapter.scala similarity index 64% rename from akka-typed/src/main/scala/akka/typed/adapter/EventStreamAdapter.scala rename to akka-typed/src/main/scala/akka/typed/internal/adapter/EventStreamAdapter.scala index f121f85119..91fb378b06 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/EventStreamAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/EventStreamAdapter.scala @@ -2,11 +2,16 @@ * Copyright (C) 2016-2017 Lightbend Inc. */ package akka.typed +package internal package adapter import akka.{ event ⇒ e } +import akka.annotation.InternalApi -class EventStreamAdapter(untyped: e.EventStream) extends EventStream { +/** + * INTERNAL API + */ +@InternalApi private[typed] class EventStreamAdapter(untyped: e.EventStream) extends EventStream { def logLevel: e.Logging.LogLevel = untyped.logLevel def publish[T](event: T): Unit = untyped.publish(event.asInstanceOf[AnyRef]) @@ -16,19 +21,22 @@ class EventStreamAdapter(untyped: e.EventStream) extends EventStream { def subscribe[T](subscriber: ActorRef[T], to: Class[T]): Boolean = subscriber match { case adapter: ActorRefAdapter[_] ⇒ untyped.subscribe(adapter.untyped, to) - case _ ⇒ throw new UnsupportedOperationException("cannot subscribe native typed ActorRef") + case _ ⇒ + throw new UnsupportedOperationException("Cannot subscribe native typed ActorRef") } def unsubscribe[T](subscriber: ActorRef[T]): Unit = subscriber match { case adapter: ActorRefAdapter[_] ⇒ untyped.unsubscribe(adapter.untyped) - case _ ⇒ throw new UnsupportedOperationException("cannot unsubscribe native typed ActorRef") + case _ ⇒ + throw new UnsupportedOperationException("Cannot unsubscribe native typed ActorRef") } def unsubscribe[T](subscriber: ActorRef[T], from: Class[T]): Boolean = subscriber match { case adapter: ActorRefAdapter[_] ⇒ untyped.unsubscribe(adapter.untyped, from) - case _ ⇒ throw new UnsupportedOperationException("cannot unsubscribe native typed ActorRef") + case _ ⇒ + throw new UnsupportedOperationException("Cannot unsubscribe native typed ActorRef") } } diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/PropsAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/PropsAdapter.scala new file mode 100644 index 0000000000..efc275d091 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/PropsAdapter.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed +package internal +package adapter + +import akka.typed.Behavior +import akka.typed.EmptyDeploymentConfig +import akka.typed.DeploymentConfig +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi private[akka] object PropsAdapter { + def apply[T](behavior: () ⇒ Behavior[T], deploy: DeploymentConfig = EmptyDeploymentConfig): akka.actor.Props = { + // FIXME use DeploymentConfig, e.g. dispatcher + akka.actor.Props(new ActorAdapter(behavior())) + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/javadsl/Adapter.scala b/akka-typed/src/main/scala/akka/typed/javadsl/Adapter.scala new file mode 100644 index 0000000000..86759b2546 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/javadsl/Adapter.scala @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl + +import akka.typed.Behavior +import akka.typed.DeploymentConfig +import akka.typed.EmptyDeploymentConfig +import akka.typed.ActorRef +import akka.typed.internal.adapter.ActorRefAdapter +import akka.typed.scaladsl.adapter._ +import akka.typed.ActorSystem +import akka.typed.internal.adapter.ActorContextAdapter +import akka.japi.Creator + +/** + * Java API: Adapters between typed and untyped actors and actor systems. + * The underlying `ActorSystem` is the untyped [[akka.actor.ActorSystem]] + * which runs Akka Typed [[akka.typed.Behavior]] on an emulation layer. In this + * system typed and untyped actors can coexist. + * + * These methods make it possible to create typed child actor from untyped + * parent actor, and the opposite untyped child from typed parent. + * `watch` is also supported in both directions. + * + * There are also converters (`toTyped`, `toUntyped`) between untyped + * [[akka.actor.ActorRef]] and typed [[akka.typed.ActorRef]], and between untyped + * [[akka.actor.ActorSystem]] and typed [[akka.typed.ActorSystem]]. + */ +object Adapter { + + def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T]): ActorRef[T] = + spawnAnonymous(sys, behavior, EmptyDeploymentConfig) + + def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], deployment: DeploymentConfig): ActorRef[T] = + sys.spawnAnonymous(behavior, deployment) + + def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String): ActorRef[T] = + spawn(sys, behavior, name, EmptyDeploymentConfig) + + def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String, deployment: DeploymentConfig): ActorRef[T] = + sys.spawn(behavior, name, deployment) + + def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T]): ActorRef[T] = + spawnAnonymous(ctx, behavior, EmptyDeploymentConfig) + + def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], deployment: DeploymentConfig): ActorRef[T] = + ctx.spawnAnonymous(behavior, deployment) + + def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String): ActorRef[T] = + spawn(ctx, behavior, name, EmptyDeploymentConfig) + + def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, deployment: DeploymentConfig): ActorRef[T] = + ctx.spawn(behavior, name, deployment) + + def toTyped(sys: akka.actor.ActorSystem): ActorSystem[Void] = + sys.toTyped.asInstanceOf[ActorSystem[Void]] + + def toUntyped(sys: ActorSystem[_]): akka.actor.ActorSystem = + sys.toUntyped + + def watch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit = + ctx.watch(other) + + def unwatch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit = + ctx.unwatch(other) + + def stop(ctx: akka.actor.ActorContext, child: ActorRef[_]): Unit = + ctx.stop(child) + + def watch[U](ctx: ActorContext[_], other: akka.actor.ActorRef): Unit = + ctx.watch(other) + + def unwatch[U](ctx: ActorContext[_], other: akka.actor.ActorRef): Unit = + ctx.unwatch(other) + + def stop(ctx: ActorContext[_], child: akka.actor.ActorRef): Boolean = + ctx.stop(child) + + def actorOf(ctx: ActorContext[_], props: akka.actor.Props): akka.actor.ActorRef = + ActorContextAdapter.toUntyped(ctx).actorOf(props) + + def actorOf(ctx: ActorContext[_], props: akka.actor.Props, name: String): akka.actor.ActorRef = + ActorContextAdapter.toUntyped(ctx).actorOf(props, name) + + def toUntyped(ref: ActorRef[_]): akka.actor.ActorRef = + ref.toUntyped + + def toTyped[T](ref: akka.actor.ActorRef): ActorRef[T] = + ref + + /** + * Wrap [[akka.typed.Behavior]] in an untyped [[akka.actor.Props]], i.e. when + * spawning a typed child actor from an untyped parent actor. + * This is normally not needed because you can use the extension methods + * `spawn` and `spawnAnonymous` with an untyped `ActorContext`, but it's needed + * when using typed actors with an existing library/tool that provides an API that + * takes an untyped [[akka.actor.Props]] parameter. Cluster Sharding is an + * example of that. + */ + def props[T](behavior: Creator[Behavior[T]], deploy: DeploymentConfig): akka.actor.Props = + akka.typed.internal.adapter.PropsAdapter(() ⇒ behavior.create(), deploy) + + /** + * Wrap [[akka.typed.Behavior]] in an untyped [[akka.actor.Props]], i.e. when + * spawning a typed child actor from an untyped parent actor. + * This is normally not needed because you can use the extension methods + * `spawn` and `spawnAnonymous` with an untyped `ActorContext`, but it's needed + * when using typed actors with an existing library/tool that provides an API that + * takes an untyped [[akka.actor.Props]] parameter. Cluster Sharding is an + * example of that. + */ + def props[T](behavior: Creator[Behavior[T]]): akka.actor.Props = + props(behavior, EmptyDeploymentConfig) +} diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala index 308afcc2c5..dcc50782d0 100644 --- a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala @@ -266,12 +266,12 @@ object Actor { * results in a new behavior that can potentially be different from this one. */ final case class Stateful[T]( - behavior: (ActorContext[T], T) ⇒ Behavior[T], - signal: (ActorContext[T], Signal) ⇒ Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) ⇒ Behavior[T]]) + onMessage: (ActorContext[T], T) ⇒ Behavior[T], + onSignal: (ActorContext[T], Signal) ⇒ Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) ⇒ Behavior[T]]) extends ExtensibleBehavior[T] { - override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg) - override def message(ctx: AC[T], msg: T) = behavior(ctx, msg) - override def toString = s"Stateful(${LineNumbers(behavior)})" + override def management(ctx: AC[T], msg: Signal): Behavior[T] = onSignal(ctx, msg) + override def message(ctx: AC[T], msg: T) = onMessage(ctx, msg) + override def toString = s"Stateful(${LineNumbers(onMessage)})" } /** @@ -285,13 +285,13 @@ object Actor { * another one after it has been installed. It is most useful for leaf actors * that do not create child actors themselves. */ - final case class Stateless[T](behavior: (ActorContext[T], T) ⇒ Any) extends ExtensibleBehavior[T] { + final case class Stateless[T](onMessage: (ActorContext[T], T) ⇒ Any) extends ExtensibleBehavior[T] { override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled override def message(ctx: AC[T], msg: T): Behavior[T] = { - behavior(ctx, msg) + onMessage(ctx, msg) this } - override def toString = s"Static(${LineNumbers(behavior)})" + override def toString = s"Static(${LineNumbers(onMessage)})" } /** diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala index e45b31ce8d..df49e223b2 100644 --- a/akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala @@ -13,7 +13,7 @@ import akka.typed.internal.FunctionRef import akka.actor.RootActorPath import akka.actor.Address import akka.typed.ActorRef -import akka.typed.adapter +import akka.typed.internal.{ adapter ⇒ adapt } /** * The ask-pattern implements the initiator side of a request–reply protocol. @@ -38,9 +38,9 @@ object AskPattern { implicit class Askable[T](val ref: ActorRef[T]) extends AnyVal { def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = ref match { - case a: adapter.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f) - case a: adapter.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f) - case _ ⇒ ask(ref, timeout, scheduler, f) + case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f) + case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f) + case _ ⇒ ask(ref, timeout, scheduler, f) } } @@ -50,15 +50,15 @@ object AskPattern { private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = if (untyped.isTerminated) ( - adapter.ActorRefAdapter[U](untyped.provider.deadLetters), + adapt.ActorRefAdapter[U](untyped.provider.deadLetters), Future.failed[U](new AskTimeoutException(s"Recipient[$target] had already been terminated.")), null) else if (timeout.duration.length <= 0) ( - adapter.ActorRefAdapter[U](untyped.provider.deadLetters), + adapt.ActorRefAdapter[U](untyped.provider.deadLetters), Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null) else { val a = PromiseActorRef(untyped.provider, timeout, target, "unknown") - val b = adapter.ActorRefAdapter[U](a) + val b = adapt.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/adapter/PropsAdapter.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/adapter/PropsAdapter.scala new file mode 100644 index 0000000000..b500c207bd --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/adapter/PropsAdapter.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.scaladsl.adapter + +import akka.typed.Behavior +import akka.typed.EmptyDeploymentConfig +import akka.typed.DeploymentConfig +import akka.typed.internal.adapter.ActorAdapter + +/** + * Wrap [[akka.typed.Behavior]] in an untyped [[akka.actor.Props]], i.e. when + * spawning a typed child actor from an untyped parent actor. + * This is normally not needed because you can use the extension methods + * `spawn` and `spawnAnonymous` on an untyped `ActorContext`, but it's needed + * when using typed actors with an existing library/tool that provides an API that + * takes an untyped [[akka.actor.Props]] parameter. Cluster Sharding is an + * example of that. + */ +object PropsAdapter { + def apply[T](behavior: ⇒ Behavior[T], deploy: DeploymentConfig = EmptyDeploymentConfig): akka.actor.Props = + akka.typed.internal.adapter.PropsAdapter(() ⇒ behavior, deploy) +} diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/adapter/package.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/adapter/package.scala new file mode 100644 index 0000000000..6097195e47 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/adapter/package.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.typed +package scaladsl + +import akka.annotation.InternalApi +import akka.typed.internal.adapter._ + +/** + * Scala API: Adapters between typed and untyped actors and actor systems. + * The underlying `ActorSystem` is the untyped [[akka.actor.ActorSystem]] + * which runs Akka Typed [[akka.typed.Behavior]] on an emulation layer. In this + * system typed and untyped actors can coexist. + * + * Use these adapters with `import akka.typed.scaladsl.adapter._`. + * + * Implicit extension methods are added to untyped and typed `ActorSystem`, + * `ActorContext`. Such methods make it possible to create typed child actor + * from untyped parent actor, and the opposite untyped child from typed parent. + * `watch` is also supported in both directions. + * + * There is an implicit conversion from untyped [[akka.actor.ActorRef]] to + * typed [[akka.typed.ActorRef]]. + * + * There are also converters (`toTyped`, `toUntyped`) from typed + * [[akka.typed.ActorRef]] to untyped [[akka.actor.ActorRef]], and between untyped + * [[akka.actor.ActorSystem]] and typed [[akka.typed.ActorSystem]]. + */ +package object adapter { + + import language.implicitConversions + + /** + * Extension methods added to [[akka.actor.ActorSystem]]. + */ + implicit class UntypedActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal { + def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment))) + def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name)) + + def toTyped: ActorSystem[Nothing] = ActorSystemAdapter(sys) + } + + /** + * Extension methods added to [[akka.typed.ActorSystem]]. + */ + implicit class TypedActorSystemOps(val sys: ActorSystem[_]) extends AnyVal { + def toUntyped: akka.actor.ActorSystem = ActorSystemAdapter.toUntyped(sys) + } + + /** + * Extension methods added to [[akka.actor.ActorContext]]. + */ + implicit class UntypedActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal { + def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorContextAdapter.spawnAnonymous(ctx, behavior, deployment) + def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorContextAdapter.spawn(ctx, behavior, name, deployment) + + def watch[U](other: ActorRef[U]): Unit = ctx.watch(ActorRefAdapter.toUntyped(other)) + def unwatch[U](other: ActorRef[U]): Unit = ctx.unwatch(ActorRefAdapter.toUntyped(other)) + + def stop(child: ActorRef[_]): Unit = + ctx.stop(ActorRefAdapter.toUntyped(child)) + } + + /** + * Extension methods added to [[akka.typed.scaladsl.ActorContext]]. + */ + implicit class TypedActorContextOps(val ctx: scaladsl.ActorContext[_]) extends AnyVal { + def actorOf(props: akka.actor.Props): akka.actor.ActorRef = + ActorContextAdapter.toUntyped(ctx).actorOf(props) + def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef = + ActorContextAdapter.toUntyped(ctx).actorOf(props, name) + + // watch, unwatch and stop not needed here because of the implicit ActorRef conversion + } + + /** + * Extension methods added to [[akka.typed.ActorRef]]. + */ + implicit class TypedActorRefOps(val ref: ActorRef[_]) extends AnyVal { + def toUntyped: akka.actor.ActorRef = ActorRefAdapter.toUntyped(ref) + } + + /** + * Implicit conversion from untyped [[akka.actor.ActorRef]] to typed [[akka.typed.ActorRef]]. + */ + implicit def actorRefAdapter[T](ref: akka.actor.ActorRef): ActorRef[T] = ActorRefAdapter(ref) + +}