diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index dbd8fbe0f0..dbe6326e69 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -70,8 +70,8 @@ object ExecutionContexts { * It is very useful for actions which are known to be non-blocking and * non-throwing in order to save a round-trip to the thread pool. */ - object sameThreadExecutionContext extends ExecutionContext { - override def execute(runnable: Runnable): Unit = runnable.run() + private[akka] object sameThreadExecutionContext extends ExecutionContext with BatchingExecutor { + override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run() override def reportFailure(t: Throwable): Unit = throw new IllegalStateException("exception in sameThreadExecutionContext", t) } diff --git a/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala index a771af2197..b92e695364 100644 --- a/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala +++ b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala @@ -126,7 +126,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "Actor with Channels" must { "construct refs" in { - val ref = ChannelExt(system).actorOf(new Tester) + val ref = ChannelExt(system).actorOf(new Tester, "t1") ref <-!- A expectMsg(C) lastSender must be(ref.actorRef) @@ -136,16 +136,16 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, } "select return channels" in { - val ref = ChannelExt(system).actorOf(new Tester) - implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor)) + val ref = ChannelExt(system).actorOf(new Tester, "t2") + implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t3") ref <-!- A expectMsg(C) lastSender must be(selfChannel.actorRef) } "correctly dispatch to subchannels" in { - val ref = ChannelExt(system).actorOf(new Tester) - implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor)) + val ref = ChannelExt(system).actorOf(new Tester, "t4") + implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t5") ref <-!- A2 expectMsg(C1) lastSender must be(selfChannel.actorRef) @@ -281,7 +281,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, } "have a working selfChannel" in { - val ref = ChannelExt(system).actorOf(new Children) + val ref = ChannelExt(system).actorOf(new Children, "t10") ref <-!- A expectMsg(C) } @@ -292,7 +292,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, parentChannel <-!- A }) channel[A] { (msg, snd) ⇒ testActor ! msg } - }) + }, "t11") expectMsg(A) } @@ -301,7 +301,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, eval(""" |import akka.channels._ |import ChannelSpec._ - |null.asInstanceOf[ChannelExtension].actorOf(new Actor with Channels[(A, A) :+: TNil, (A, Nothing) :+: TNil] {}) + |null.asInstanceOf[ChannelExtension].actorOf(new Actor with Channels[(A, A) :+: TNil, (A, Nothing) :+: TNil] {}, "") """.stripMargin) }.message must include("type mismatch") } @@ -367,7 +367,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "support narrowing ActorRefs" in { import Channels._ - val channel = ChannelExt(system).actorOf(new RecvC(testActor)) + val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t15") val ref = channel.actorRef implicit val t = Timeout(1.second.dilated) import system.dispatcher @@ -377,7 +377,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, } "deny wrong narrowing of ActorRefs" in { - val channel = ChannelExt(system).actorOf(new RecvC(testActor)) + val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t16") val ref = channel.actorRef implicit val t = Timeout(1.second.dilated) import system.dispatcher @@ -392,8 +392,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, } "allow wrapping of ChannelRefs with pass-through" in { - val target = ChannelExt(system).actorOf(new RecvC(testActor)) - val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target)) + val target = ChannelExt(system).actorOf(new RecvC(testActor), "t17") + val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target), "t18") wrap <-!- C expectMsg(C) lastSender must be(target.actorRef) @@ -404,8 +404,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "allow wrapping of Actor with ChannelsRefs with replies" in { val probe = TestProbe() - val target = ChannelExt(system).actorOf(new EchoTee(probe.ref)) - val wrap = ChannelExt(system).actorOf(new WriteOnly[C, C](target)) + val target = ChannelExt(system).actorOf(new EchoTee(probe.ref), "t19") + val wrap = ChannelExt(system).actorOf(new WriteOnly[C, C](target), "t20") C -!-> wrap expectMsg(C1) expectMsg(C1) @@ -413,14 +413,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, } "support typed ask" in { - val t = ChannelExt(system).actorOf(new Tester) + val t = ChannelExt(system).actorOf(new Tester, "t21") implicit val timeout = Timeout(1.second) val r: Future[C] = (t <-?- A).lub Await.result(r, 1.second) must be(C) } "support typed ask with multiple reply channels" in { - val t = ChannelExt(system).actorOf(new SubChannels) + val t = ChannelExt(system).actorOf(new SubChannels, "t22") implicit val timeout = Timeout(1.second) val r: Future[Msg] = (t <-?- A1).lub Await.result(r, 1.second) must be(B) @@ -462,8 +462,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "be able to forward fully generic channels" in { val cd = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, D) :+: TNil] { channel[C] { (x, snd) ⇒ snd <-!- D } - }) - val t = ChannelExt(system).actorOf(new Poly(cd)) + }, "t25") + val t = ChannelExt(system).actorOf(new Poly(cd), "t26") t <-!- A expectMsg(A) t <-!- C @@ -479,8 +479,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, implicit val selfChannel = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, Nothing) :+:(D, Nothing) :+: TNil] { channel[C] { (c, snd) ⇒ testActor ! c } channel[D] { (d, snd) ⇒ testActor ! d } - }) - val t = ChannelExt(system).actorOf(new Tester) + }, "t27") + val t = ChannelExt(system).actorOf(new Tester, "t28") val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) val fa = Future successful a val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B) @@ -548,7 +548,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "be askable to a ChannelRef" in { implicit val timeout = Timeout(1.second) - val t = ChannelExt(system).actorOf(new Tester) + val t = ChannelExt(system).actorOf(new Tester, "t30") val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) val fa = Future successful a val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B) @@ -608,7 +608,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "be LUBbable within a Future" in { implicit val timeout = Timeout(1.second) - val t = ChannelExt(system).actorOf(new Tester) + val t = ChannelExt(system).actorOf(new Tester, "t31") val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) (Await.result((a -?-> t).lub, timeout.duration): Msg) must be(C) } diff --git a/akka-channels/src/main/scala/akka/channels/ChannelExtension.scala b/akka-channels/src/main/scala/akka/channels/ChannelExtension.scala index 461d0de291..7a1ffffa90 100644 --- a/akka-channels/src/main/scala/akka/channels/ChannelExtension.scala +++ b/akka-channels/src/main/scala/akka/channels/ChannelExtension.scala @@ -17,9 +17,10 @@ object ChannelExt extends ExtensionKey[ChannelExtension] class ChannelExtension(system: ExtendedActorSystem) extends Extension { - // kick-start the universe (needed due to thread safety issues in runtime mirror) + // FIXME: kick-start the universe (needed due to thread safety issues in runtime mirror) + // see https://issues.scala-lang.org/browse/SI-6240 private val t = typeTag[(Int, Int) :+: TNil] - def actorOf[Ch <: ChannelList](factory: ⇒ Actor with Channels[TNil, Ch]): ChannelRef[Ch] = - new ChannelRef[Ch](system.actorOf(Props(factory))) + def actorOf[Ch <: ChannelList](factory: ⇒ Actor with Channels[TNil, Ch], name: String): ChannelRef[Ch] = + new ChannelRef[Ch](system.actorOf(Props(factory), name)) } diff --git a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala index c102d8fd65..7fe2ee2591 100644 --- a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala +++ b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala @@ -6,27 +6,38 @@ package akka.channels import language.experimental.{ macros ⇒ makkros } import akka.actor.ActorRef -import scala.reflect.runtime.universe.TypeTag -import scala.reflect.macros.Context -import scala.annotation.tailrec -import scala.reflect.macros.Universe -import akka.actor.Actor -import akka.actor.ActorContext import scala.concurrent.Future -import akka.util.Timeout -import akka.AkkaException -import scala.util.control.NoStackTrace - -case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace +/** + * A channel reference, holding a type list of all channels supported by the + * underlying actor. This actor’s reference can be obtained as the `actorRef` + * member. + */ class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal { + /** + * Send a message over this channel, “tell” semantics, returning the message. + */ def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M] + /** + * Eventually send the value contained in the future over this channel, + * “tell” semantics, returning a Future which is completed after sending + * with the value which was sent (“Future.andThen” semantics). + */ def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M] + /** + * Send a message over this channel, “ask” semantics, returning a Future + * which will be completed with the reply message or a TimeoutException. + * If the message is a Future itself, eventually send the Future’s value. + */ def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M] + /** + * Narrow this ChannelRef by removing channels or narrowing input types or + * widening output types. + */ def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T] } diff --git a/akka-channels/src/main/scala/akka/channels/Channels.scala b/akka-channels/src/main/scala/akka/channels/Channels.scala index 766d3b308f..beae4b9cb5 100644 --- a/akka-channels/src/main/scala/akka/channels/Channels.scala +++ b/akka-channels/src/main/scala/akka/channels/Channels.scala @@ -51,6 +51,13 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒ */ def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch] + /** + * Create a child actor with properly typed ChannelRef and the given name, + * verifying that this actor can handle everything which the child tries to + * send via its `parent` ChannelRef. + */ + def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch], name: String): ChannelRef[Ch] = macro macros.CreateChild.implName[C, Pa, Ch] + /** * Properly typed ChannelRef for the context.parent. */ @@ -131,6 +138,8 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒ private def verifyCompleteness() { val channels = inputChannels(ru)(channelListTypeTag.tpe) + if (channels.isEmpty) + throw ActorInitializationException("Actor with Channels cannot have no channels") val classes = channels groupBy (e ⇒ channelListTypeTag.mirror.runtimeClass(e.widen)) val missing = classes.keySet -- behavior.keySet if (missing.nonEmpty) { @@ -162,10 +171,7 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒ case c: CheckType[_] ⇒ true case _ ⇒ val msgClass = x.getClass - index find (_ isAssignableFrom msgClass) match { - case None ⇒ false - case Some(cls) ⇒ true - } + index exists (_ isAssignableFrom msgClass) } } } diff --git a/akka-channels/src/main/scala/akka/channels/Ops.scala b/akka-channels/src/main/scala/akka/channels/Ops.scala index b213777812..59c4549c8e 100644 --- a/akka-channels/src/main/scala/akka/channels/Ops.scala +++ b/akka-channels/src/main/scala/akka/channels/Ops.scala @@ -12,6 +12,7 @@ import scala.concurrent.{ ExecutionContext, Future } import scala.reflect.runtime.{ universe ⇒ ru } import scala.util.Success import akka.dispatch.ExecutionContexts +import scala.util.control.NoStackTrace sealed trait ChannelList sealed trait TNil extends ChannelList @@ -26,8 +27,23 @@ sealed trait ReplyChannels[T <: ChannelList] extends ChannelList */ sealed trait UnknownDoNotWriteMeDown +/** + * This exception is used to signal errors when trying to `.narrow` an + * ActorRef into a ChannelRef: if the actor finds the requested channel types + * incompatible with its selfChannel, it will return errors in the same format + * as would occur during compilation of a `ChannelRef.narrow` operation. + */ +case class NarrowingException(message: String) extends akka.AkkaException(message) with NoStackTrace + class ActorRefOps(val ref: ActorRef) extends AnyVal { import macros.Helpers._ + + /** + * Send a query to the actor and check whether it supports the requested + * channel types; the normal timeout semantics of the `ask` pattern apply. + * The Future will be completed either with the desired ChannelRef or with + * an exception (TimeoutException or NarrowingException). + */ def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = { import Channels._ ref ? CheckType(tt) map { diff --git a/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala b/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala index 90a6be4251..de7d7d278b 100644 --- a/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala +++ b/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala @@ -22,20 +22,35 @@ object CreateChild { import c.universe._ - if (weakTypeOf[ParentChannels] =:= weakTypeOf[Nothing]) { - c.abort(c.enclosingPosition, "Parent argument must not be Nothing") - } - if (weakTypeOf[ChildChannels] =:= weakTypeOf[Nothing]) { - c.abort(c.enclosingPosition, "channel list must not be Nothing") - } + verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels]) - val missing = missingChannels(c.universe)(weakTypeOf[MyChannels], inputChannels(c.universe)(weakTypeOf[ParentChannels])) - if (missing.isEmpty) { - implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels]) - reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice)))) - } else { - c.abort(c.enclosingPosition, s"This actor cannot support a child requiring channels ${missing mkString ", "}") - } + implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels]) + reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice)))) + } + + def implName[MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag, ChildChannels <: ChannelList: c.WeakTypeTag]( + c: Context { + type PrefixType = Actor with Channels[_, MyChannels] + })(factory: c.Expr[Actor with Channels[ParentChannels, ChildChannels]], name: c.Expr[String]): c.Expr[ChannelRef[ChildChannels]] = { + + import c.universe._ + + verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels]) + + implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels]) + reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice), name.splice))) + } + + def verify(c: Context)(parent: c.Type, child: c.Type, mine: c.Type): Unit = { + import c.universe._ + + val nothing = weakTypeOf[Nothing] + if (parent =:= nothing) abort(c, "Parent argument must not be Nothing") + if (child =:= nothing) abort(c, "channel list must not be Nothing") + + val missing = missingChannels(c.universe)(mine, inputChannels(c.universe)(parent)) + if (missing.nonEmpty) + abort(c, s"This actor cannot support a child requiring channels ${missing mkString ", "}") } } \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala b/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala index ef74019446..31645655ff 100644 --- a/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala @@ -40,7 +40,7 @@ object ChannelDocSpec { class Parent extends Actor with Channels[TNil, (Stats, Nothing) :+: (GetChild.type, ChildRef) :+: TNil] { - + val child = createChild(new Child) channel[GetChild.type] { (_, snd) ⇒ ChildRef(child) -!-> snd } @@ -176,7 +176,7 @@ class ChannelDocSpec extends AkkaSpec { // type given just for demonstration purposes val latch: ChannelRef[(Request, Reply) :+: (String, Int) :+: TNil] = - ChannelExt(system).actorOf(new Latch(target)) + ChannelExt(system).actorOf(new Latch(target), "latch") "hello" -!-> latch //#processing @@ -202,8 +202,8 @@ class ChannelDocSpec extends AkkaSpec { // // then it is used somewhat like this: // - - val parent = ChannelExt(system).actorOf(new Parent) + + val parent = ChannelExt(system).actorOf(new Parent, "parent") parent <-!- GetChild val child = expectMsgType[ChildRef].child // this assumes TestKit context diff --git a/akka-docs/rst/scala/typed-channels.rst b/akka-docs/rst/scala/typed-channels.rst index a0b2574443..94348bf9d3 100644 --- a/akka-docs/rst/scala/typed-channels.rst +++ b/akka-docs/rst/scala/typed-channels.rst @@ -40,7 +40,7 @@ an error, since the companion object ``Command`` is not an instance of type While this example looks pretty simple, the implications are profound. In order to be useful, the system must be as reliable as you would expect a type system -to be. This means that unless you step outside of the it (i.e. doing the +to be. This means that unless you step outside of it (i.e. doing the equivalent of ``.asInstanceOf[_]``) you shall be protected, failures shall be recognized and flagged. There are a number of challenges included in this requirement, which are discussed in the following sections. If you are reading