From e862890deda853593dde438ef56e83e35626b288 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 22 Jan 2013 22:50:09 +0100 Subject: [PATCH] major facelift: -!-> and -?-> appear MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - rename projects to akka-channels and akka-channels-tests - move implementation into akka.channels.macros package - remove picking up ActorRef as sender (or none at all) - factor out logic to make different façades acting upon Future[] or Any so that -!-> and -?-> can complement the traditional <-!- and <-?- - the new operators are easily distinguishable from !/? and the rightwards-pointing go with the flow and compose better, let’s try them out --- .../src/main/scala/akka/dispatch/Future.scala | 13 + .../src/main/scala/akka/makkros/Test.scala | 2 +- .../scala/akka/channels/ChannelSpec.scala | 67 ++-- .../akka/channels/ChannelExtension.scala | 0 .../main/scala/akka/channels/ChannelRef.scala | 30 ++ .../main/scala/akka/channels/Channels.scala | 172 +++++++++ .../src/main/scala/akka/channels/Ops.scala | 36 ++ .../main/scala/akka/channels/macros/Ask.scala | 63 ++++ .../scala/akka/channels/macros/Channel.scala | 77 ++++ .../akka/channels/macros/CreateChild.scala | 35 ++ .../scala/akka/channels/macros/Helpers.scala | 119 ++++++ .../scala/akka/channels/macros/Narrow.scala | 24 ++ .../scala/akka/channels/macros/Tell.scala | 90 +++++ .../main/scala/akka/channels/package.scala | 15 + .../main/scala/akka/channels/ChannelRef.scala | 137 ------- .../main/scala/akka/channels/Channels.scala | 357 ------------------ .../main/scala/akka/channels/package.scala | 37 -- project/AkkaBuild.scala | 16 +- 18 files changed, 718 insertions(+), 572 deletions(-) rename {akka-macro-tests => akka-channels-tests}/src/main/scala/akka/makkros/Test.scala (90%) rename {akka-macro-tests => akka-channels-tests}/src/test/scala/akka/channels/ChannelSpec.scala (88%) rename {akka-macros => akka-channels}/src/main/scala/akka/channels/ChannelExtension.scala (100%) create mode 100644 akka-channels/src/main/scala/akka/channels/ChannelRef.scala create mode 100644 akka-channels/src/main/scala/akka/channels/Channels.scala create mode 100644 akka-channels/src/main/scala/akka/channels/Ops.scala create mode 100644 akka-channels/src/main/scala/akka/channels/macros/Ask.scala create mode 100644 akka-channels/src/main/scala/akka/channels/macros/Channel.scala create mode 100644 akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala create mode 100644 akka-channels/src/main/scala/akka/channels/macros/Helpers.scala create mode 100644 akka-channels/src/main/scala/akka/channels/macros/Narrow.scala create mode 100644 akka-channels/src/main/scala/akka/channels/macros/Tell.scala create mode 100644 akka-channels/src/main/scala/akka/channels/package.scala delete mode 100644 akka-macros/src/main/scala/akka/channels/ChannelRef.scala delete mode 100644 akka-macros/src/main/scala/akka/channels/Channels.scala delete mode 100644 akka-macros/src/main/scala/akka/channels/package.scala diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index a7c964b750..dbd8fbe0f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -62,6 +62,19 @@ object ExecutionContexts { * @return a reference to the global ExecutionContext */ def global(): ExecutionContext = ExecutionContext.global + + /** + * WARNING: Not A General Purpose ExecutionContext! + * + * This is an execution context which runs everything on the calling thread. + * It is very useful for actions which are known to be non-blocking and + * non-throwing in order to save a round-trip to the thread pool. + */ + object sameThreadExecutionContext extends ExecutionContext { + override def execute(runnable: Runnable): Unit = runnable.run() + override def reportFailure(t: Throwable): Unit = + throw new IllegalStateException("exception in sameThreadExecutionContext", t) + } } /** diff --git a/akka-macro-tests/src/main/scala/akka/makkros/Test.scala b/akka-channels-tests/src/main/scala/akka/makkros/Test.scala similarity index 90% rename from akka-macro-tests/src/main/scala/akka/makkros/Test.scala rename to akka-channels-tests/src/main/scala/akka/makkros/Test.scala index efb016175f..db241301c8 100644 --- a/akka-macro-tests/src/main/scala/akka/makkros/Test.scala +++ b/akka-channels-tests/src/main/scala/akka/makkros/Test.scala @@ -8,7 +8,7 @@ import scala.tools.reflect.ToolBoxError object Test { - def eval(code: String, compileOptions: String = "-cp akka-actor/target/classes:akka-macros/target/classes"): Any = { + def eval(code: String, compileOptions: String = "-cp akka-actor/target/classes:akka-channels/target/classes"): Any = { val tb = mkToolbox(compileOptions) tb.eval(tb.parse(code)) } diff --git a/akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala similarity index 88% rename from akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala rename to akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala index b334b7d6a8..783f59fb6e 100644 --- a/akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala +++ b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala @@ -44,10 +44,10 @@ object ChannelSpec { // used for sender verification in the first two test cases class Tester extends Channels[TNil, (A, C) :+: (B, D) :+: TNil] { - channel[A.type] { (msg, snd) ⇒ snd ! C } - channel[A] { (msg, snd) ⇒ snd ! C1 } + channel[A.type] { (msg, snd) ⇒ snd <-!- C } + channel[A] { (msg, snd) ⇒ snd <-!- C1 } channel[B] { - case (B, s) ⇒ s ! D + case (B, s) ⇒ s <-!- D } } class RecvC(ref: ActorRef) extends Channels[TNil, (C, Nothing) :+: TNil] { @@ -57,20 +57,20 @@ object ChannelSpec { // pos compile test for multiple reply channels class SubChannels extends Channels[TNil, (A, B) :+: (A, C) :+: TNil] { channel[A] { - case (A1, x) ⇒ x ! B - case (_, x) ⇒ x ! C + case (A1, x) ⇒ B -!-> x + case (_, x) ⇒ x <-!- C } } // pos compile test for children class Children extends Channels[TNil, (A, B) :+: (C, Nothing) :+: TNil] { val c = createChild(new Channels[(A, Nothing) :+: TNil, (B, C) :+: TNil] { - channel[B] { case (B, s) ⇒ s ! C } + channel[B] { case (B, s) ⇒ s <-!- C } }) var client: ActorRef = _ channel[A] { - case (A, s) ⇒ c ! B; client = sender + case (A, s) ⇒ c <-!- B; client = sender } channel[C] { case (C, _) ⇒ client ! C @@ -81,9 +81,10 @@ object ChannelSpec { } // compile test for polymorphic actors - class WriteOnly[T <: ChannelList: ru.TypeTag](target: ChannelRef[T]) extends Channels[TNil, (D, D) :+: T] { - channel[D] { (d, snd) ⇒ snd ! d } - channel[T] { x ⇒ target forward x } + class WriteOnly[T1: ru.TypeTag, T2: ru.TypeTag](target: ChannelRef[(T1, T2) :+: TNil]) extends Channels[TNil, (D, D) :+: (T1, T2) :+: TNil] { + channel[D] { (d, snd) ⇒ snd <-!- d } + implicit val t = Timeout(1.second) + channel[T1] { (x, snd) ⇒ x -?-> target } } class MissingChannel extends Channels[TNil, (A, A) :+: (B, B) :+: TNil] { @@ -102,10 +103,10 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "construct refs" in { val ref = ChannelExt(system).actorOf(new Tester) - ref ! A + ref <-!- A expectMsg(C) lastSender must be(ref.actorRef) - ref ! B + ref <-!- B expectMsg(D) lastSender must be(ref.actorRef) } @@ -113,7 +114,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "select return channels" in { val ref = ChannelExt(system).actorOf(new Tester) implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor)) - ref ! A + ref <-!- A expectMsg(C) lastSender must be(selfChannel.actorRef) } @@ -121,7 +122,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "correctly dispatch to subchannels" in { val ref = ChannelExt(system).actorOf(new Tester) implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor)) - ref ! A2 + ref <-!- A2 expectMsg(C1) lastSender must be(selfChannel.actorRef) } @@ -131,7 +132,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, eval(""" |import akka.channels._ |import ChannelSpec._ - |new ChannelRef[(A, C) :+: TNil](null) ! B + |implicit val c = new ChannelRef[TNil](null) + |new ChannelRef[(A, C) :+: TNil](null) <-!- B """.stripMargin) }.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B.type") } @@ -141,7 +143,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, eval(""" |import akka.channels._ |import ChannelSpec._ - |new ChannelRef[(A, C) :+: (B, D) :+: TNil](null) ! C + |implicit val c = new ChannelRef[TNil](null) + |new ChannelRef[(A, C) :+: (B, D) :+: TNil](null) <-!- C """.stripMargin) }.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.C.type") } @@ -152,14 +155,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, |import akka.channels._ |import ChannelSpec._ |implicit val s = new ChannelRef[(C, D) :+: TNil](null) - |new ChannelRef[(A, B) :+: TNil](null) ! A + |new ChannelRef[(A, B) :+: TNil](null) <-!- A """.stripMargin) }.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B") } "permit any sender for Nothing replies" in { implicit val selfChannel = new ChannelRef[TNil](testActor) - new ChannelRef[(A, Nothing) :+: TNil](testActor) ! A + new ChannelRef[(A, Nothing) :+: TNil](testActor) <-!- A expectMsg(A) } @@ -169,7 +172,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, |import akka.channels._ |import ChannelSpec._ |implicit val s = new ChannelRef[TNil](null) - |new ChannelRef[(A, B) :+: (A, C) :+: TNil](null) ! A + |new ChannelRef[(A, B) :+: (A, C) :+: TNil](null) <-!- A """.stripMargin) }.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B, akka.channels.ChannelSpec.C") } @@ -180,7 +183,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, |import akka.channels._ |import ChannelSpec._ |implicit val s = new ChannelRef[(B, B) :+: TNil](null) - |new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) ! A + |new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A """.stripMargin) }.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.C") } @@ -191,7 +194,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, |import akka.channels._ |import ChannelSpec._ |implicit val s = new ChannelRef[(B, B) :+: (C, B) :+: TNil](null) - |new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) ! A + |new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A """.stripMargin) } def cause(ex: Throwable): Throwable = @@ -220,7 +223,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, |import ChannelSpec._ |new Channels[TNil, (A, B) :+: (A1.type, C) :+: TNil] { | channel[A] { - | case (A1, x) => x ! C + | case (A1, x) => x <-!- C | } |} """.stripMargin) @@ -253,14 +256,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "have a working selfChannel" in { val ref = ChannelExt(system).actorOf(new Children) - ref ! A + ref <-!- A expectMsg(C) } "have a working parentChannel" in { val parent = ChannelExt(system).actorOf(new Channels[TNil, (A, Nothing) :+: TNil] { createChild(new Channels[(A, Nothing) :+: TNil, TNil] { - parentChannel ! A + parentChannel <-!- A }) channel[A] { (msg, snd) ⇒ testActor ! msg } }) @@ -284,7 +287,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, |import ChannelSpec._ |new Channels[TNil, (A, Nothing) :+: TNil] { | createChild(new Channels[(A, Nothing) :+: TNil, TNil] { - | parentChannel ! B + | parentChannel <-!- B | }) |} """.stripMargin) @@ -343,7 +346,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, implicit val t = Timeout(1.second.dilated) import system.dispatcher val r = Await.result(ref.narrow[(C, Nothing) :+: TNil], t.duration) - r ! C + r <-!- C expectMsg(C) } @@ -354,7 +357,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, import system.dispatcher val f = ref.narrow[(D, Nothing) :+: TNil] Await.ready(f, t.duration) - f.value.get must be(Failure(Channels.NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D"))) + f.value.get must be(Failure(NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D"))) } "be equal according to its actor" in { @@ -364,11 +367,11 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "allow wrapping of ChannelRefs with pass-through" in { val target = ChannelExt(system).actorOf(new RecvC(testActor)) - val wrap = ChannelExt(system).actorOf(new WriteOnly(target)) - wrap ! C + val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target)) + wrap <-!- C expectMsg(C) lastSender must be(target.actorRef) - wrap ! D + wrap <-!- D expectMsg(D) lastSender must be(wrap.actorRef) } @@ -376,14 +379,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "support typed ask" in { val t = ChannelExt(system).actorOf(new Tester) implicit val timeout = Timeout(1.second) - val r: Future[C] = t ? A + val r: Future[C] = t <-?- A Await.result(r, 1.second) must be(C) } "support typed ask with multiple reply channels" in { val t = ChannelExt(system).actorOf(new SubChannels) implicit val timeout = Timeout(1.second) - val r: Future[Msg] = t ? A1 + val r: Future[Msg] = t <-?- A1 Await.result(r, 1.second) must be(B) } diff --git a/akka-macros/src/main/scala/akka/channels/ChannelExtension.scala b/akka-channels/src/main/scala/akka/channels/ChannelExtension.scala similarity index 100% rename from akka-macros/src/main/scala/akka/channels/ChannelExtension.scala rename to akka-channels/src/main/scala/akka/channels/ChannelExtension.scala diff --git a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala new file mode 100644 index 0000000000..276371ece7 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.channels + +import language.experimental.{ macros ⇒ makkros } +import akka.actor.ActorRef +import scala.reflect.runtime.universe.TypeTag +import scala.reflect.macros.Context +import scala.annotation.tailrec +import scala.reflect.macros.Universe +import akka.actor.Actor +import akka.actor.ActorContext +import scala.concurrent.Future +import akka.util.Timeout +import akka.AkkaException +import scala.util.control.NoStackTrace + +case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace + +class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal { + + def <-!-[M](msg: M): Unit = macro macros.Tell.impl[T, M] + + def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[T, M] + + def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T] + +} diff --git a/akka-channels/src/main/scala/akka/channels/Channels.scala b/akka-channels/src/main/scala/akka/channels/Channels.scala new file mode 100644 index 0000000000..65c9c9040c --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/Channels.scala @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.channels + +import language.experimental.{ macros ⇒ makkros } +import akka.actor.{ Actor, ActorRef } +import scala.reflect.macros.Context +import scala.reflect.runtime.{ universe ⇒ ru } +import scala.reflect.runtime.universe.TypeTag +import scala.reflect.api.Universe +import scala.runtime.AbstractPartialFunction +import akka.actor.Props +import scala.collection.immutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ classTag, ClassTag } +import scala.concurrent.{ ExecutionContext, Future } +import akka.util.Timeout +import akka.pattern.ask +import scala.util.control.NoStackTrace +import akka.AkkaException +import akka.actor.ExtendedActorSystem +import akka.actor.ActorInitializationException + +/** + * Typed channels atop untyped actors. + * + * The idea is that the actor declares all its input types up front, including + * what it expects the sender to handle wrt.replies, and then ChannelRef + * carries this information for statically verifying that messages sent to an + * actor have an actual chance of being processed. + * + * There are several implementation-imposed restrictions: + * + * - not two channels with different input types may have the same erased + * type; this is currently not enforced at compile time (and leads to + * channels being “ignored” at runtime) + * - messages received by the actor are dispatched to channels based on the + * erased type, which may be less precise than the actual channel type; this + * can lead to ClassCastExceptions if sending through the untyped ActorRef + */ +trait Channels[P <: ChannelList, C <: ChannelList] extends Actor { + + import macros.Helpers._ + + /** + * Create a child actor with properly typed ChannelRef, verifying that this + * actor can handle everything which the child tries to send via its + * `parent` ChannelRef. + */ + def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch] + + /** + * Properly typed ChannelRef for the context.parent. + */ + final def parentChannel: ChannelRef[P] = new ChannelRef(context.parent) + + /** + * The properly typed self-channel is used implicitly when sending to other + * typed channels for verifying that replies can be handled. + */ + implicit final def selfChannel = new ChannelRef[C](self) + + /* + * This map holds the current behavior for each erasure-tagged channel; the + * basic receive impl will dispatch incoming messages according to the most + * specific erased type in this map. + */ + private var behavior = Map.empty[Class[_], FF] + + /** + * Functions for storage in the behavior, to get around erasure + */ + private trait FF + private case class F1(f: (WrappedMessage[ChannelList], ChannelRef[ChannelList]) ⇒ Unit) extends FF + private case class F2(f: (Any, ChannelRef[ChannelList]) ⇒ Unit) extends FF + + /** + * Declare an input channel of the given type; the returned object takes a partial function: + * + * {{{ + * channel[A] { + * case (a, s) => + * // a is of type A and + * // s is a ChannelRef for the sender, capable of sending the declared reply type for A + * } + * }}} + */ + def channel[T]: Channels[P, C]#Behaviorist[Nothing, T] = macro macros.Channel.impl[T, C, P] + + class Behaviorist[-R, Ch](tt: ru.TypeTag[Ch], wrapped: Boolean) { + private def ff(recv: R): FF = + if (wrapped) + F1(recv.asInstanceOf[(WrappedMessage[ChannelList], ChannelRef[ChannelList]) ⇒ Unit]) + else + F2(recv.asInstanceOf[(Any, ChannelRef[ChannelList]) ⇒ Unit]) + def apply(recv: R): Unit = + behavior ++= (for (t ← inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> ff(recv)) + } + + /* + * HORRIBLE HACK AHEAD + * + * I’d like to keep this a trait, but traits cannot have constructor + * arguments, not even TypeTags. + */ + protected var channelListTypeTag: TypeTag[C] = _ + + /** + * Sort so that subtypes always precede their supertypes, but without + * obeying any order between unrelated subtypes (insert sort). + */ + private def sortClasses(in: Iterable[Class[_]]): immutable.Seq[Class[_]] = + (new ArrayBuffer[Class[_]](in.size) /: in) { (buf, cls) ⇒ + buf.indexWhere(_ isAssignableFrom cls) match { + case -1 ⇒ buf append cls + case x ⇒ buf insert (x, cls) + } + buf + }.to[immutable.IndexedSeq] + + final lazy val receive = new AbstractPartialFunction[Any, Unit] { + + val index = sortClasses(behavior.keys) + + if (channelListTypeTag != null) verifyCompleteness() + + private def verifyCompleteness() { + val channels = inputChannels(ru)(channelListTypeTag.tpe) + val classes = channels groupBy (e ⇒ channelListTypeTag.mirror.runtimeClass(e.widen)) + val missing = classes.keySet -- behavior.keySet + if (missing.nonEmpty) { + val m = missing.map(classes).flatten + throw ActorInitializationException(s"missing declarations for channels ${m mkString ", "}") + } + } + + override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = x match { + case CheckType(tt) ⇒ + narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match { + case Nil ⇒ sender ! CheckTypeACK + case err :: Nil ⇒ sender ! CheckTypeNAK(err) + case list ⇒ sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", "")) + } + case _ ⇒ + val msgClass = x.getClass + index find (_ isAssignableFrom msgClass) match { + case None ⇒ default(x) + case Some(cls) ⇒ + behavior(cls) match { + case F1(f) ⇒ f(new WrappedMessage[ChannelList](x), new ChannelRef(sender)) + case F2(f) ⇒ f(x, new ChannelRef(sender)) + } + } + } + + def isDefinedAt(x: Any): Boolean = x match { + case c: CheckType[_] ⇒ true + case _ ⇒ + val msgClass = x.getClass + index find (_ isAssignableFrom msgClass) match { + case None ⇒ false + case Some(cls) ⇒ true + } + } + } +} + +object Channels { + +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/Ops.scala b/akka-channels/src/main/scala/akka/channels/Ops.scala new file mode 100644 index 0000000000..21ad45eec9 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/Ops.scala @@ -0,0 +1,36 @@ +package akka.channels + +import language.experimental.{ macros ⇒ makkros } +import akka.actor.ActorRef +import akka.util.Timeout +import akka.pattern.ask +import scala.concurrent.{ ExecutionContext, Future } +import scala.reflect.runtime.{ universe ⇒ ru } +import scala.util.Success + +sealed trait ChannelList +sealed trait TNil extends ChannelList +sealed trait :+:[A <: (_, _), B <: ChannelList] extends ChannelList + +class ActorRefOps(val ref: ActorRef) extends AnyVal { + import macros.Helpers._ + def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = { + import Channels._ + ref ? CheckType(tt) map { + case CheckTypeACK ⇒ new ChannelRef[C](ref) + case CheckTypeNAK(error) ⇒ throw NarrowingException(error) + } + } +} + +class FutureOps[T](val future: Future[T]) extends AnyVal { + def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureImpl[C, T] + def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[C, T] +} + +class AnyOps[T](val value: T) extends AnyVal { + def -!->[C <: ChannelList](channel: ChannelRef[C]): Unit = macro macros.Tell.opsImpl[C, T] + def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.opsImpl[C, T] +} + +class WrappedMessage[T <: ChannelList](val value: Any) extends AnyVal diff --git a/akka-channels/src/main/scala/akka/channels/macros/Ask.scala b/akka-channels/src/main/scala/akka/channels/macros/Ask.scala new file mode 100644 index 0000000000..2c31f65ae4 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/macros/Ask.scala @@ -0,0 +1,63 @@ +package akka.channels.macros + +import akka.channels._ +import scala.concurrent.Future +import akka.util.Timeout +import scala.reflect.runtime.{ universe ⇒ ru } +import ru.TypeTag +import scala.reflect.macros.Context +import scala.reflect.api.Universe +import akka.actor.ActorRef +import akka.dispatch.ExecutionContexts + +object Ask { + import Helpers._ + + def impl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { + type PrefixType = ChannelRef[T] + })(msg: c.Expr[M]): c.Expr[Future[_]] = { + import c.universe._ + askTree(c)(weakTypeOf[M], weakTypeOf[T])(reify(c.prefix.splice.actorRef), msg) + } + + def opsImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { + type PrefixType = AnyOps[M] + })(channel: c.Expr[ChannelRef[T]]): c.Expr[Future[_]] = { + import c.universe._ + askTree(c)(weakTypeOf[M], weakTypeOf[T])(reify(channel.splice.actorRef), reify(c.prefix.splice.value)) + } + + def futureImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { + type PrefixType = FutureOps[M] + })(channel: c.Expr[ChannelRef[T]]): c.Expr[Future[_]] = { + import c.universe._ + val tree = askTree(c)(weakTypeOf[M], weakTypeOf[T])(c.Expr(Ident("c$1")), c.Expr(Ident("x$1"))) + reify({ + val c$1 = channel.splice.actorRef + c.prefix.splice.future.flatMap(x$1 ⇒ tree.splice)(ExecutionContexts.sameThreadExecutionContext) + }) + } + + def askTree[M](c: Context with Singleton)(msgT: c.universe.Type, chT: c.universe.Type)(target: c.Expr[ActorRef], msg: c.Expr[M]): c.Expr[Future[_]] = { + import c.universe._ + val out = replyChannels(c.universe)(chT, msgT) + if (out.isEmpty) { + c.error(c.enclosingPosition, s"This ChannelRef does not support messages of type $msgT") + reify(null) + } else { + val timeout = c.inferImplicitValue(typeOf[Timeout]) + if (timeout.isEmpty) + c.error(c.enclosingPosition, s"no implicit akka.util.Timeout found") + val result = appliedType(weakTypeOf[Future[_]].typeConstructor, List(lub(out))) + c.Expr( + TypeApply( + Select( + reify(akka.pattern.ask( + target.splice, msg.splice)( + c.Expr(timeout)(weakTypeTag[Timeout]).splice)).tree, + "asInstanceOf"), + List(TypeTree().setType(result))))(weakTypeTag[Future[_]]) + } + } + +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/macros/Channel.scala b/akka-channels/src/main/scala/akka/channels/macros/Channel.scala new file mode 100644 index 0000000000..f5234778de --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/macros/Channel.scala @@ -0,0 +1,77 @@ +package akka.channels.macros + +import akka.channels._ +import scala.reflect.runtime.{ universe ⇒ ru } +import ru.TypeTag +import scala.reflect.macros.Context +import scala.reflect.api.Universe + +object Channel { + import Helpers._ + + /** + * This macro transforms a channel[] call which returns “some” Behaviorist + * into a _channel[] call with precise reply channel descriptors, so that the + * partial function it is applied to can enjoy proper type checking. + * + * T is the message type + * C is the channel list of the enclosing Channels + * P is the parent channel list + */ + def impl[T: c.WeakTypeTag, C <: ChannelList: c.WeakTypeTag, P <: ChannelList: c.WeakTypeTag]( + c: Context { + type PrefixType = Channels[P, C] + }): c.Expr[Channels[P, C]#Behaviorist[Nothing, T]] = { + + val tT = c.weakTypeOf[T] + val tC = c.weakTypeOf[C] + + import c.universe._ + + val undefined = missingChannels(c.universe)(tC, inputChannels(c.universe)(tT)) + if (undefined.nonEmpty) { + c.error(c.enclosingPosition, s"no channel defined for types ${undefined mkString ", "}") + reify(null) + } else { + checkUnique(c.universe)(tT, tC) foreach (c.error(c.enclosingPosition, _)) + val channels = toChannels(c.universe)(replyChannels(c.universe)(tC, tT)) + val (receive, wrapped) = + if (tT <:< typeOf[ChannelList]) { + appliedType(typeOf[Function2[_, _, _]].typeConstructor, List( + appliedType(typeOf[WrappedMessage[_]].typeConstructor, List(tT)), + appliedType(typeOf[ChannelRef[_]].typeConstructor, List(channels)), + typeOf[Unit])) -> true + } else { + appliedType(typeOf[Function2[_, _, _]].typeConstructor, List( + tT, + appliedType(typeOf[ChannelRef[_]].typeConstructor, List(channels)), + typeOf[Unit])) -> false + } + c.Expr( + Block(List( + If( + { + val cltt = c.Expr(Select(c.prefix.tree, "channelListTypeTag")) + reify(cltt.splice == null).tree + }, + Apply( + Select(c.prefix.tree, "channelListTypeTag_$eq"), + List(TypeApply( + Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"), + List(TypeTree().setType(c.weakTypeOf[C]))))), + c.literalUnit.tree)), + Apply( + Select( + New(AppliedTypeTree(Select(c.prefix.tree, newTypeName("Behaviorist")), List( + TypeTree().setType(receive), + TypeTree().setType(tT)))), + nme.CONSTRUCTOR), + List( + TypeApply( + Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"), + List(TypeTree().setType(tT))), + Literal(Constant(wrapped)))))) + } + } + +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala b/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala new file mode 100644 index 0000000000..94b3ba8abc --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/macros/CreateChild.scala @@ -0,0 +1,35 @@ +package akka.channels.macros + +import akka.channels._ +import scala.reflect.runtime.{ universe ⇒ ru } +import ru.TypeTag +import scala.reflect.macros.Context +import scala.reflect.api.Universe +import akka.actor.Props + +object CreateChild { + import Helpers._ + + def impl[C <: ChannelList: c.WeakTypeTag, Pa <: ChannelList: c.WeakTypeTag, Ch <: ChannelList: c.WeakTypeTag]( + c: Context { + type PrefixType = Channels[_, C] + })(factory: c.Expr[Channels[Pa, Ch]]): c.Expr[ChannelRef[Ch]] = { + + import c.universe._ + if (weakTypeOf[Pa] =:= weakTypeOf[Nothing]) { + c.abort(c.enclosingPosition, "Parent argument must not be Nothing") + } + if (weakTypeOf[Ch] =:= weakTypeOf[Nothing]) { + c.abort(c.enclosingPosition, "channel list must not be Nothing") + } + val missing = missingChannels(c.universe)(weakTypeOf[C], inputChannels(c.universe)(weakTypeOf[Pa])) + if (missing.isEmpty) { + implicit val t = c.TypeTag[Ch](c.weakTypeOf[Ch]) + reify(new ChannelRef[Ch](c.prefix.splice.context.actorOf(Props(factory.splice)))) + } else { + c.error(c.enclosingPosition, s"This actor cannot support a child requiring channels ${missing mkString ", "}") + reify(???) + } + } + +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala b/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala new file mode 100644 index 0000000000..c72bd53392 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala @@ -0,0 +1,119 @@ +package akka.channels.macros + +import akka.AkkaException +import scala.util.control.NoStackTrace +import akka.channels._ +import scala.reflect.runtime.{ universe ⇒ ru } +import ru.TypeTag +import scala.reflect.macros.Context +import scala.reflect.api.Universe + +object Helpers { + + type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit] + + case class CheckType[T](tt: TypeTag[T]) + case object CheckTypeACK + case class CheckTypeNAK(errors: String) + + def error(c: Context, msg: String) = c.error(c.enclosingPosition, msg) + def abort(c: Context, msg: String) = c.abort(c.enclosingPosition, msg) + + def checkUnique(u: Universe)(channel: u.Type, list: u.Type): Option[String] = { + val channels = inputChannels(u)(list) groupBy (_.erasure) + val dupes = channels.get(channel.erasure).getOrElse(Nil).filterNot(_ =:= channel) + if (dupes.isEmpty) None + else Some(s"erasure ${channel.erasure} overlaps with declared channels ${dupes mkString ", "}") + } + + /** + * check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings + */ + def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = { + var errors = List.empty[String] + for (in ← inputChannels(u)(target)) { + val replies = replyChannels(u)(orig, in) + if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in" + else { + val targetReplies = replyChannels(u)(target, in) + val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _)) + if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in" + val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t)) + if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in" + } + } + errors.reverse + } + + /** + * get all input channels from a ChannelList or return the given type + */ + final def inputChannels(u: Universe)(list: u.Type): List[u.Type] = { + import u._ + val imp = u.mkImporter(ru) + val cl = imp.importType(ru.typeOf[ChannelList]) + val tnil = imp.importType(ru.typeOf[TNil]) + def rec(l: u.Type, acc: List[u.Type]): List[u.Type] = l match { + case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, if (acc contains in) acc else in :: acc) + case last ⇒ if (last =:= tnil) acc.reverse else (last :: acc).reverse + } + if (list <:< cl) rec(list, Nil) + else List(list) + } + + /** + * find all input channels matching the given message type and return a + * list of their respective reply channels + */ + final def replyChannels(u: Universe)(list: u.Type, msg: u.Type): List[u.Type] = { + import u._ + def rec(l: Type, acc: List[Type]): List[Type] = { + l match { + case TypeRef(_, _, TypeRef(_, _, in :: out :: Nil) :: tail :: Nil) if msg <:< in ⇒ + rec(tail, if (acc contains out) acc else out :: acc) + case TypeRef(_, _, _ :: tail :: Nil) ⇒ + rec(tail, acc) + case _ ⇒ acc.reverse + } + } + val n = typeOf[Nothing] + if (msg =:= n) List(n) else rec(list, Nil) + } + + /** + * filter from the `required` list of types all which are subtypes of inputs of the ChannelList + */ + final def missingChannels(u: Universe)(channels: u.Type, required: List[u.Type]): List[u.Type] = { + import u._ + // making the top-level method recursive blows up the compiler (when compiling the macro itself) + def rec(ch: Type, req: List[Type]): List[Type] = { + ch match { + case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, req filterNot (_ <:< in)) + case last ⇒ req filterNot (_ <:< last) + } + } + rec(channels, required) + } + + /** + * convert a list of types List(, , ...) into a ChannelList + * ( Channel[, Nothing] :=: Channel[, Nothing] :=: ... :=: TNil ) + */ + final def toChannels(u: Universe)(list: List[u.Type]): u.Type = { + import u._ + def rec(l: List[Type], acc: Type): Type = l match { + case head :: (tail: List[Type]) ⇒ + if (head =:= weakTypeOf[Nothing]) rec(tail, acc) + else + rec(tail, + appliedType(weakTypeOf[:+:[_, _]].typeConstructor, List( + appliedType(weakTypeOf[Tuple2[_, _]].typeConstructor, List( + head, + weakTypeOf[Nothing])), + acc))) + case _ ⇒ acc + } + rec(list.reverse, weakTypeOf[TNil]) + } + +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/macros/Narrow.scala b/akka-channels/src/main/scala/akka/channels/macros/Narrow.scala new file mode 100644 index 0000000000..2f9f6d3db5 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/macros/Narrow.scala @@ -0,0 +1,24 @@ +package akka.channels.macros + +import akka.channels._ +import scala.reflect.runtime.{ universe ⇒ ru } +import ru.TypeTag +import scala.reflect.macros.Context +import scala.reflect.api.Universe + +object Narrow { + import Helpers._ + + def impl[C <: ChannelList: c.WeakTypeTag, T <: ChannelList: c.WeakTypeTag]( + c: Context { + type PrefixType = ChannelRef[T] + }): c.Expr[ChannelRef[C]] = { + import c.{ universe ⇒ u } + narrowCheck(u)(u.weakTypeOf[T], u.weakTypeOf[C]) match { + case Nil ⇒ // okay + case err :: Nil ⇒ c.error(c.enclosingPosition, err) + case list ⇒ c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", "")) + } + u.reify(c.prefix.splice.asInstanceOf[ChannelRef[C]]) + } +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/macros/Tell.scala b/akka-channels/src/main/scala/akka/channels/macros/Tell.scala new file mode 100644 index 0000000000..ce0d56b134 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/macros/Tell.scala @@ -0,0 +1,90 @@ +package akka.channels.macros + +import akka.channels._ +import akka.actor._ +import scala.reflect.runtime.{ universe ⇒ ru } +import ru.TypeTag +import scala.reflect.macros.Context +import scala.reflect.api.Universe +import scala.annotation.tailrec +import scala.concurrent.Future +import scala.util.{ Failure, Success } +import akka.dispatch.ExecutionContexts + +object Tell { + import Helpers._ + + def impl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { + type PrefixType = ChannelRef[T] + })(msg: c.Expr[M]): c.Expr[Unit] = { + val tT = c.universe.weakTypeOf[T] + val (tS, senderTree, sender) = getSenderChannel(c) + + verify(c)(senderTree, c.universe.weakTypeOf[M], tS, tT) + + c.universe.reify(c.prefix.splice.actorRef.tell(msg.splice, sender.splice)) + } + + def opsImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { + type PrefixType = AnyOps[M] + })(channel: c.Expr[ChannelRef[T]]): c.Expr[Unit] = { + val tT = c.universe.weakTypeOf[T] + val (tS, senderTree, sender) = getSenderChannel(c) + + verify(c)(senderTree, c.universe.weakTypeOf[M], tS, tT) + + c.universe.reify(channel.splice.actorRef.tell(c.prefix.splice.value, sender.splice)) + } + + def futureImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { + type PrefixType = FutureOps[M] + })(channel: c.Expr[ChannelRef[T]]): c.Expr[Future[M]] = { + val tT = c.universe.weakTypeOf[T] + val (tS, senderTree, sender) = getSenderChannel(c) + + verify(c)(senderTree, c.universe.weakTypeOf[M], tS, tT) + + c.universe.reify( + { + val s$1 = sender.splice + c.prefix.splice.future.andThen { + case Success(s) ⇒ channel.splice.actorRef.tell(s, s$1) + case _ ⇒ + }(ExecutionContexts.sameThreadExecutionContext) + }) + } + + def getSenderChannel(c: Context): (c.universe.Type, c.Tree, c.Expr[ActorRef]) = { + val replyChannel = c.inferImplicitValue(c.typeOf[ChannelRef[_]]) + if (!replyChannel.isEmpty) { + import c.universe._ + replyChannel.tpe match { + case TypeRef(_, _, param :: Nil) ⇒ + (param, replyChannel, c.Expr(Select(replyChannel, "actorRef"))(c.universe.weakTypeTag[ActorRef])) + } + } else abort(c, "no implicit sender ChannelRef found") + } + + def verify(c: Context)(sender: c.universe.Tree, msgT: c.universe.Type, sndT: c.universe.Type, chT: c.universe.Type)(): Unit = { + def rec(msg: Set[c.universe.Type], checked: Set[c.universe.Type], depth: Int): Unit = + if (msg.nonEmpty) { + val u: c.universe.type = c.universe + val replies = msg map (m ⇒ m -> replyChannels(u)(chT, m)) + val missing = replies collect { case (k, v) if v.size == 0 ⇒ k } + if (missing.nonEmpty) + error(c, s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)") + else { + val nextSend = replies.map(_._2).flatten map (m ⇒ m -> replyChannels(u)(sndT, m)) + val nextMissing = nextSend collect { case (k, v) if v.size == 0 ⇒ k } + if (nextMissing.nonEmpty) + error(c, s"implicit sender `$sender` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)") + else { + val nextChecked = checked ++ msg + val nextMsg = nextSend.map(_._2).flatten -- nextChecked + rec(nextMsg, nextChecked, depth + 1) + } + } + } + rec(Set(msgT), Set(c.universe.typeOf[Nothing]), 1) + } +} \ No newline at end of file diff --git a/akka-channels/src/main/scala/akka/channels/package.scala b/akka-channels/src/main/scala/akka/channels/package.scala new file mode 100644 index 0000000000..0b0a53de24 --- /dev/null +++ b/akka-channels/src/main/scala/akka/channels/package.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka + +import language.implicitConversions +import akka.actor.ActorRef +import scala.concurrent.Future + +package object channels { + implicit def actorRefOps(ref: ActorRef) = new ActorRefOps(ref) + implicit def futureOps[T](f: Future[T]) = new FutureOps(f) + implicit def anyOps[T](x: T) = new AnyOps(x) +} diff --git a/akka-macros/src/main/scala/akka/channels/ChannelRef.scala b/akka-macros/src/main/scala/akka/channels/ChannelRef.scala deleted file mode 100644 index aa78dc0c04..0000000000 --- a/akka-macros/src/main/scala/akka/channels/ChannelRef.scala +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.channels - -import language.experimental.macros -import akka.actor.ActorRef -import scala.reflect.runtime.universe.TypeTag -import scala.reflect.macros.Context -import scala.annotation.tailrec -import scala.reflect.macros.Universe -import akka.actor.Actor -import akka.actor.ActorContext -import scala.concurrent.Future -import akka.util.Timeout - -class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal { - - def ![M](msg: M): Unit = macro ChannelRef.tell[T, M] - - def ?[M](msg: M): Future[_] = macro ChannelRef.ask[T, M] - - def forward[M](msg: M): Unit = macro ChannelRef.forward[T, M] - - def narrow[C <: ChannelList]: ChannelRef[C] = macro ChannelRef.narrowImpl[C, T] - -} - -object ChannelRef { - import Channels._ - - def tell[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { - type PrefixType = ChannelRef[T] - })(msg: c.Expr[M]): c.Expr[Unit] = { - import c.{ universe ⇒ u } - - def getSenderChannel = { - val replyChannel = c.inferImplicitValue(c.typeOf[ChannelRef[_]]) - if (!replyChannel.isEmpty) { - import u._ - replyChannel.tpe match { - case TypeRef(_, _, param :: Nil) ⇒ - Some((param, replyChannel, c.Expr(Select(replyChannel, "actorRef"))(u.weakTypeTag[ActorRef]))) - } - } else None - } - - def getSenderRef = { - val senderTree = c.inferImplicitValue(c.typeOf[ActorRef]) - if (senderTree.isEmpty) { - val noSender = c.universe.reify(Actor.noSender) - ((u.typeOf[(Any, Nothing) :+: TNil], noSender.tree, noSender)) - } else ((u.typeOf[(Any, Any) :+: TNil], senderTree, c.Expr(senderTree)(c.WeakTypeTag(senderTree.tpe)))) - } - - val tT = u.weakTypeOf[T] - val (tS, senderTree, sender) = getSenderChannel getOrElse getSenderRef - - def err(msg: String) = c.error(c.enclosingPosition, msg) - - def verify(msg: Set[u.Type], checked: Set[u.Type], depth: Int): Unit = if (msg.nonEmpty) { - val replies = msg map (m ⇒ m -> replyChannels(u)(tT, m)) - val missing = replies collect { case (k, v) if v.size == 0 ⇒ k } - if (missing.nonEmpty) - err(s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)") - else { - val nextSend = replies.map(_._2).flatten map (m ⇒ m -> replyChannels(u)(tS, m)) - val nextMissing = nextSend collect { case (k, v) if v.size == 0 ⇒ k } - if (nextMissing.nonEmpty) - err(s"implicit sender `$senderTree` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)") - else { - val nextChecked = checked ++ msg - val nextMsg = nextSend.map(_._2).flatten -- nextChecked - verify(nextMsg, nextChecked, depth + 1) - } - } - } - - verify(Set(u.weakTypeOf[M]), Set(u.typeOf[Nothing]), 1) - u.reify(c.prefix.splice.actorRef.tell(msg.splice, sender.splice)) - } - - def ask[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { - type PrefixType = ChannelRef[T] - })(msg: c.Expr[M]): c.Expr[Future[_]] = { - import c.universe._ - - val out = replyChannels(c.universe)(weakTypeOf[T], weakTypeOf[M]) - if (out.isEmpty) { - c.error(c.enclosingPosition, s"This ChannelRef does not support messages of type ${weakTypeOf[M]}") - reify(null) - } else { - val timeout = c.inferImplicitValue(typeOf[Timeout]) - if (timeout.isEmpty) - c.error(c.enclosingPosition, s"no implicit akka.util.Timeout found") - val result = appliedType(weakTypeOf[Future[_]].typeConstructor, List(lub(out))) - c.Expr( - TypeApply( - Select( - reify(akka.pattern.ask( - c.prefix.splice.actorRef, msg.splice)( - c.Expr(timeout)(weakTypeTag[Timeout]).splice)).tree, - "asInstanceOf"), - List(TypeTree().setType(result)))) - } - } - - def forward[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context { - type PrefixType = ChannelRef[T] - })(msg: c.Expr[M]): c.Expr[Unit] = { - import c.universe._ - if (weakTypeOf[M] =:= weakTypeOf[Values.WrappedMessage[T]]) { - reify( - c.prefix.splice.actorRef.forward( - msg.splice.asInstanceOf[Values.WrappedMessage[_]].value)( - c.Expr(Ident("implicitly"))(weakTypeTag[ActorContext]).splice)) - } else { - c.error(c.enclosingPosition, s"cannot forward message unless types match exactly") - reify(()) - } - } - - def narrowImpl[C <: ChannelList: c.WeakTypeTag, T <: ChannelList: c.WeakTypeTag]( - c: Context { - type PrefixType = ChannelRef[T] - }): c.Expr[ChannelRef[C]] = { - import c.{ universe ⇒ u } - narrowCheck(u)(u.weakTypeOf[T], u.weakTypeOf[C]) match { - case Nil ⇒ // okay - case err :: Nil ⇒ c.error(c.enclosingPosition, err) - case list ⇒ c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", "")) - } - u.reify(c.prefix.splice.asInstanceOf[ChannelRef[C]]) - } - -} \ No newline at end of file diff --git a/akka-macros/src/main/scala/akka/channels/Channels.scala b/akka-macros/src/main/scala/akka/channels/Channels.scala deleted file mode 100644 index 5c3b979ae3..0000000000 --- a/akka-macros/src/main/scala/akka/channels/Channels.scala +++ /dev/null @@ -1,357 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.channels - -import language.experimental.macros -import akka.actor.{ Actor, ActorRef } -import scala.reflect.macros.Context -import scala.reflect.runtime.{ universe ⇒ ru } -import scala.reflect.runtime.universe.TypeTag -import scala.reflect.api.Universe -import scala.runtime.AbstractPartialFunction -import akka.actor.Props -import scala.collection.immutable -import scala.collection.mutable.ArrayBuffer -import scala.reflect.{ classTag, ClassTag } -import scala.concurrent.{ ExecutionContext, Future } -import akka.util.Timeout -import akka.pattern.ask -import scala.util.control.NoStackTrace -import akka.AkkaException -import akka.actor.ExtendedActorSystem -import akka.actor.ActorInitializationException - -/** - * Typed channels atop untyped actors. - * - * The idea is that the actor declares all its input types up front, including - * what it expects the sender to handle wrt.replies, and then ChannelRef - * carries this information for statically verifying that messages sent to an - * actor have an actual chance of being processed. - * - * There are several implementation-imposed restrictions: - * - * - not two channels with different input types may have the same erased - * type; this is currently not enforced at compile time (and leads to - * channels being “ignored” at runtime) - * - messages received by the actor are dispatched to channels based on the - * erased type, which may be less precise than the actual channel type; this - * can lead to ClassCastExceptions if sending through the untyped ActorRef - */ -trait Channels[P <: ChannelList, C <: ChannelList] extends Actor { - - import Channels._ - - /** - * Create a child actor with properly typed ChannelRef, verifying that this - * actor can handle everything which the child tries to send via its - * `parent` ChannelRef. - */ - def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Channels[Pa, Ch]): ChannelRef[Ch] = macro createChildImpl[C, Pa, Ch] - - /** - * Properly typed ChannelRef for the context.parent. - */ - def parentChannel: ChannelRef[P] = new ChannelRef(context.parent) - - /** - * The properly typed self-channel is used implicitly when sending to other - * typed channels for verifying that replies can be handled. - */ - implicit def selfChannel = new ChannelRef[C](self) - - /* - * This map holds the current behavior for each erasure-tagged channel; the - * basic receive impl will dispatch incoming messages according to the most - * specific erased type in this map. - */ - private var behavior = Map.empty[Class[_], FF] - - private trait FF - private object FF { - def apply(x: Any): FF = x match { - case f: Function1[_, _] ⇒ F1(f.asInstanceOf[Values.WrappedMessage[ChannelList] ⇒ Unit]) - case f: Function2[_, _, _] ⇒ F2(f.asInstanceOf[(Any, ChannelRef[ChannelList]) ⇒ Unit]) - } - } - private case class F1(f: Values.WrappedMessage[ChannelList] ⇒ Unit) extends FF - private case class F2(f: (Any, ChannelRef[ChannelList]) ⇒ Unit) extends FF - - /** - * Declare an input channel of the given type; the returned object takes a partial function: - * - * {{{ - * channel[A] { - * case (a, s) => - * // a is of type A and - * // s is a ChannelRef for the sender, capable of sending the declared reply type for A - * } - * }}} - */ - def channel[T]: Channels[P, C]#Behaviorist[Nothing, T] = macro channelImpl[T, C, P] - - new Behaviorist(null) - - protected class Behaviorist[-R, Ch](tt: ru.TypeTag[Ch]) { - def apply(recv: R): Unit = - behavior ++= (for (t ← inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> FF(recv)) - } - - /* - * HORRIBLE HACK AHEAD - * - * I’d like to keep this a trait, but traits cannot have constructor - * arguments, not even TypeTags. - */ - protected var channelListTypeTag: TypeTag[C] = _ - - /** - * Sort so that subtypes always precede their supertypes, but without - * obeying any order between unrelated subtypes (insert sort). - */ - private def sortClasses(in: Iterable[Class[_]]): immutable.Seq[Class[_]] = - (new ArrayBuffer[Class[_]](in.size) /: in) { (buf, cls) ⇒ - buf.indexWhere(_ isAssignableFrom cls) match { - case -1 ⇒ buf append cls - case x ⇒ buf insert (x, cls) - } - buf - }.to[immutable.IndexedSeq] - - final lazy val receive = new AbstractPartialFunction[Any, Unit] { - - val index = sortClasses(behavior.keys) - - if (channelListTypeTag != null) verifyCompleteness() - - private def verifyCompleteness() { - val channels = inputChannels(ru)(channelListTypeTag.tpe) - val classes = channels groupBy (e ⇒ channelListTypeTag.mirror.runtimeClass(e.widen)) - val missing = classes.keySet -- behavior.keySet - if (missing.nonEmpty) { - val m = missing.map(classes).flatten - throw ActorInitializationException(s"missing declarations for channels ${m mkString ", "}") - } - } - - override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = x match { - case CheckType(tt) ⇒ - narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match { - case Nil ⇒ sender ! CheckTypeACK - case err :: Nil ⇒ sender ! CheckTypeNAK(err) - case list ⇒ sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", "")) - } - case _ ⇒ - val msgClass = x.getClass - index find (_ isAssignableFrom msgClass) match { - case None ⇒ default(x) - case Some(cls) ⇒ - behavior(cls) match { - case F1(f) ⇒ f(new Values.WrappedMessage[ChannelList](x)) - case F2(f) ⇒ f(x, new ChannelRef(sender)) - } - } - } - - def isDefinedAt(x: Any): Boolean = x match { - case c: CheckType[_] ⇒ true - case _ ⇒ - val msgClass = x.getClass - index find (_ isAssignableFrom msgClass) match { - case None ⇒ false - case Some(cls) ⇒ true - } - } - } -} - -object Channels { - - type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit] - - case class CheckType[T](tt: TypeTag[T]) - case object CheckTypeACK - case class CheckTypeNAK(errors: String) - case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace - - /** - * This macro transforms a channel[] call which returns “some” Behaviorist - * into a _channel[] call with precise reply channel descriptors, so that the - * partial function it is applied to can enjoy proper type checking. - * - * T is the message type - * C is the channel list of the enclosing Channels - * P is the parent channel list - */ - def channelImpl[T: c.WeakTypeTag, C <: ChannelList: c.WeakTypeTag, P <: ChannelList: c.WeakTypeTag]( - c: Context { - type PrefixType = Channels[P, C] - }): c.Expr[Channels[P, C]#Behaviorist[Nothing, T]] = { - - val tT = c.weakTypeOf[T] - val tC = c.weakTypeOf[C] - - import c.universe._ - - val undefined = missingChannels(c.universe)(tC, inputChannels(c.universe)(tT)) - if (undefined.nonEmpty) { - c.error(c.enclosingPosition, s"no channel defined for types ${undefined mkString ", "}") - reify(null) - } else { - checkUnique(c.universe)(tT, tC) foreach (c.error(c.enclosingPosition, _)) - val receive = - if (tT <:< typeOf[ChannelList]) { - appliedType(typeOf[Function1[_, _]].typeConstructor, List( - appliedType(typeOf[Values.WrappedMessage[_]].typeConstructor, List(tT)), - typeOf[Unit])) - } else { - val channels = toChannels(c.universe)(replyChannels(c.universe)(tC, tT)) - appliedType(typeOf[Function2[_, _, _]].typeConstructor, List( - tT, - appliedType(typeOf[ChannelRef[_]].typeConstructor, List(channels)), - typeOf[Unit])) - } - c.Expr( - Apply( - Select( - New(AppliedTypeTree(Select(c.prefix.tree, newTypeName("Behaviorist")), List( - TypeTree().setType(receive), - TypeTree().setType(tT)))), - nme.CONSTRUCTOR), - List( - Block(List( - If(reify(c.prefix.splice.channelListTypeTag == null).tree, - Apply( - Select(c.prefix.tree, "channelListTypeTag_$eq"), - List(TypeApply( - Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"), - List(TypeTree().setType(c.weakTypeOf[C]))))), - c.literalUnit.tree)), - TypeApply( - Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"), - List(TypeTree().setType(tT))))))) - } - } - - def createChildImpl[C <: ChannelList: c.WeakTypeTag, Pa <: ChannelList: c.WeakTypeTag, Ch <: ChannelList: c.WeakTypeTag]( - c: Context { - type PrefixType = Channels[_, C] - })(factory: c.Expr[Channels[Pa, Ch]]): c.Expr[ChannelRef[Ch]] = { - - import c.universe._ - if (weakTypeOf[Pa] =:= weakTypeOf[Nothing]) { - c.abort(c.enclosingPosition, "Parent argument must not be Nothing") - } - if (weakTypeOf[Ch] =:= weakTypeOf[Nothing]) { - c.abort(c.enclosingPosition, "channel list must not be Nothing") - } - val missing = missingChannels(c.universe)(weakTypeOf[C], inputChannels(c.universe)(weakTypeOf[Pa])) - if (missing.isEmpty) { - implicit val t = c.TypeTag[Ch](c.weakTypeOf[Ch]) - reify(new ChannelRef[Ch](c.prefix.splice.context.actorOf(Props(factory.splice)))) - } else { - c.error(c.enclosingPosition, s"This actor cannot support a child requiring channels ${missing mkString ", "}") - reify(???) - } - } - - def checkUnique(u: Universe)(channel: u.Type, list: u.Type): Option[String] = { - val channels = inputChannels(u)(list) groupBy (_.erasure) - val dupes = channels.get(channel.erasure).getOrElse(Nil).filterNot(_ =:= channel) - if (dupes.isEmpty) None - else Some(s"erasure ${channel.erasure} overlaps with declared channels ${dupes mkString ", "}") - } - - /** - * check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings - */ - def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = { - var errors = List.empty[String] - for (in ← inputChannels(u)(target)) { - val replies = replyChannels(u)(orig, in) - if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in" - else { - val targetReplies = replyChannels(u)(target, in) - val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _)) - if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in" - val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t)) - if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in" - } - } - errors.reverse - } - - /** - * get all input channels from a ChannelList or return the given type - */ - final def inputChannels(u: Universe)(list: u.Type): List[u.Type] = { - import u._ - val imp = u.mkImporter(ru) - val cl = imp.importType(ru.typeOf[ChannelList]) - val tnil = imp.importType(ru.typeOf[TNil]) - def rec(l: u.Type, acc: List[u.Type]): List[u.Type] = l match { - case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, if (acc contains in) acc else in :: acc) - case last ⇒ if (last =:= tnil) acc.reverse else (last :: acc).reverse - } - if (list <:< cl) rec(list, Nil) - else List(list) - } - - /** - * find all input channels matching the given message type and return a - * list of their respective reply channels - */ - final def replyChannels(u: Universe)(list: u.Type, msg: u.Type): List[u.Type] = { - import u._ - def rec(l: Type, acc: List[Type]): List[Type] = { - l match { - case TypeRef(_, _, TypeRef(_, _, in :: out :: Nil) :: tail :: Nil) if msg <:< in ⇒ - rec(tail, if (acc contains out) acc else out :: acc) - case TypeRef(_, _, _ :: tail :: Nil) ⇒ - rec(tail, acc) - case _ ⇒ acc.reverse - } - } - val n = typeOf[Nothing] - if (msg =:= n) List(n) else rec(list, Nil) - } - - /** - * filter from the `required` list of types all which are subtypes of inputs of the ChannelList - */ - final def missingChannels(u: Universe)(channels: u.Type, required: List[u.Type]): List[u.Type] = { - import u._ - // making the top-level method recursive blows up the compiler (when compiling the macro itself) - def rec(ch: Type, req: List[Type]): List[Type] = { - ch match { - case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, req filterNot (_ <:< in)) - case last ⇒ req filterNot (_ <:< last) - } - } - rec(channels, required) - } - - /** - * convert a list of types List(, , ...) into a ChannelList - * ( Channel[, Nothing] :=: Channel[, Nothing] :=: ... :=: TNil ) - */ - final def toChannels(u: Universe)(list: List[u.Type]): u.Type = { - import u._ - def rec(l: List[Type], acc: Type): Type = l match { - case head :: (tail: List[Type]) ⇒ - if (head =:= weakTypeOf[Nothing]) rec(tail, acc) - else - rec(tail, - appliedType(weakTypeOf[:+:[_, _]].typeConstructor, List( - appliedType(weakTypeOf[Tuple2[_, _]].typeConstructor, List( - head, - weakTypeOf[Nothing])), - acc))) - case _ ⇒ acc - } - rec(list.reverse, weakTypeOf[TNil]) - } - -} \ No newline at end of file diff --git a/akka-macros/src/main/scala/akka/channels/package.scala b/akka-macros/src/main/scala/akka/channels/package.scala deleted file mode 100644 index bcd3930d8d..0000000000 --- a/akka-macros/src/main/scala/akka/channels/package.scala +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka - -import language.implicitConversions -import akka.actor.ActorRef - -package object channels { - implicit def actorRef2Ops(ref: ActorRef) = new Values.ActorRefOps(ref) -} - -package channels { - import akka.util.Timeout - import akka.pattern.ask - import scala.concurrent.{ ExecutionContext, Future } - import scala.reflect.runtime.{ universe ⇒ ru } - - sealed trait ChannelList - sealed trait TNil extends ChannelList - sealed trait :+:[A <: (_, _), B <: ChannelList] extends ChannelList - - object Values { - class ActorRefOps(val ref: ActorRef) extends AnyVal { - def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = { - import Channels._ - ref ? CheckType(tt) map { - case CheckTypeACK ⇒ new ChannelRef[C](ref) - case CheckTypeNAK(error) ⇒ throw NarrowingException(error) - } - } - } - - class WrappedMessage[T <: ChannelList](val value: Any) extends AnyVal - } -} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index b5d81b16e3..c3e5c3fc38 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -73,7 +73,7 @@ object AkkaBuild extends Build { generatedPdf in Sphinx <<= generatedPdf in Sphinx in LocalProject(docs.id) map identity ), - aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, macros, macroTests) + aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests) ) lazy val actor = Project( @@ -409,19 +409,19 @@ object AkkaBuild extends Build { ) ) configs (MultiJvm) - lazy val macros = Project( - id = "akka-macros", - base = file("akka-macros"), + lazy val channels = Project( + id = "akka-channels", + base = file("akka-channels"), dependencies = Seq(actor), settings = defaultSettings ++ Seq( libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _) ) ) - lazy val macroTests = Project( - id = "akka-macro-tests", - base = file("akka-macro-tests"), - dependencies = Seq(macros, testkit % "compile;test->test"), + lazy val channelsTests = Project( + id = "akka-channels-tests", + base = file("akka-channels-tests"), + dependencies = Seq(channels, testkit % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-compiler" % _) )