From da10291cdd7a4b3cb145faec6d8aafb6d15b0ed9 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 3 Jan 2013 23:36:44 +0100 Subject: [PATCH] add narrowing from ActorRef to ChannelRef --- .../scala/akka/channels/ChannelSpec.scala | 37 +++++- .../akka/channels/ChannelExtension.scala | 4 + .../main/scala/akka/channels/ChannelRef.scala | 14 +-- .../main/scala/akka/channels/Channels.scala | 113 ++++++++++++++---- 4 files changed, 128 insertions(+), 40 deletions(-) diff --git a/akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala b/akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala index 61901bbe4f..0049d612d2 100644 --- a/akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala +++ b/akka-macro-tests/src/test/scala/akka/channels/ChannelSpec.scala @@ -4,11 +4,14 @@ package akka.channels -import akka.testkit.AkkaSpec -import akka.testkit.ImplicitSender +import akka.testkit._ import akka.actor.ActorRef import akka.makkros.Test._ import scala.tools.reflect.ToolBoxError +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.util.Failure object ChannelSpec { @@ -64,8 +67,8 @@ object ChannelSpec { case (C, _) ⇒ client ! C } - createChild(new Channels[(C, Nothing) :+: TNil, TNil]) - createChild(new Channels[(A, Nothing) :+:(C, Nothing) :+: TNil, TNil]) + createChild(new Channels[(C, Nothing) :+: TNil, TNil] {}) + createChild(new Channels[(A, Nothing) :+:(C, Nothing) :+: TNil, TNil] {}) } } @@ -183,7 +186,7 @@ class ChannelSpec extends AkkaSpec with ImplicitSender { |import akka.channels._ |import ChannelSpec._ |new Channels[TNil, (A, B) :+: (C, D) :+: TNil] { - | createChild(new Channels) + | createChild(new Channels[Nothing, Nothing] {}) |} """.stripMargin) }.message must include("Parent argument must not be Nothing") @@ -195,7 +198,7 @@ class ChannelSpec extends AkkaSpec with ImplicitSender { |import akka.channels._ |import ChannelSpec._ |new Channels[TNil, (A, B) :+: (C, D) :+: TNil] { - | createChild(new Channels[(B, Nothing) :+: TNil, TNil]) + | createChild(new Channels[(B, Nothing) :+: TNil, TNil] {}) |} """.stripMargin) }.message must include("This actor cannot support a child requiring channels akka.channels.ChannelSpec.B") @@ -276,6 +279,28 @@ class ChannelSpec extends AkkaSpec with ImplicitSender { }.message must include("reply types Nothing are superfluous for channel akka.channels.ChannelSpec.A") } + "support narrowing ActorRefs" in { + import Channels._ + val channel = ChannelExt(system).actorOf(new RecvC(testActor)) + val ref = channel.actorRef + implicit val t = Timeout(1.second.dilated) + import system.dispatcher + val r = Await.result(ref.narrow[(C, Nothing) :+: TNil], t.duration) + r ! C + expectMsg(C) + } + + "deny wrong narrowing of ActorRefs" in { + import Channels._ + val channel = ChannelExt(system).actorOf(new RecvC(testActor)) + val ref = channel.actorRef + implicit val t = Timeout(1.second.dilated) + import system.dispatcher + val f = ref.narrow[(D, Nothing) :+: TNil] + Await.ready(f, t.duration) + f.value.get must be(Failure(Channels.NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D"))) + } + } } \ No newline at end of file diff --git a/akka-macros/src/main/scala/akka/channels/ChannelExtension.scala b/akka-macros/src/main/scala/akka/channels/ChannelExtension.scala index bd18f43e0c..6d3d60226d 100644 --- a/akka-macros/src/main/scala/akka/channels/ChannelExtension.scala +++ b/akka-macros/src/main/scala/akka/channels/ChannelExtension.scala @@ -15,6 +15,10 @@ import scala.reflect.runtime.universe object ChannelExt extends ExtensionKey[ChannelExtension] class ChannelExtension(system: ExtendedActorSystem) extends Extension { + + // kick-start the universe (needed due to thread safety issues in runtime mirror) + private val t = implicitly[TypeTag[(Int, Int) :+: TNil]] + def actorOf[Ch <: ChannelList: TypeTag](factory: ⇒ Channels[_, Ch]): ChannelRef[Ch] = new ChannelRef[Ch](system.actorOf(Props(factory))) } diff --git a/akka-macros/src/main/scala/akka/channels/ChannelRef.scala b/akka-macros/src/main/scala/akka/channels/ChannelRef.scala index ed264f97ba..13e91ada2f 100644 --- a/akka-macros/src/main/scala/akka/channels/ChannelRef.scala +++ b/akka-macros/src/main/scala/akka/channels/ChannelRef.scala @@ -59,16 +59,10 @@ object ChannelRef { type PrefixType = ChannelRef[T] }): c.Expr[ChannelRef[C]] = { import c.{ universe ⇒ u } - for (in ← inputChannels(u)(u.weakTypeOf[C])) { - val replies = replyChannels(u)(u.weakTypeOf[T], in) - if (replies.isEmpty) c.error(c.enclosingPosition, s"original ChannelRef does not support input type $in") - else { - val targetReplies = replyChannels(u)(u.weakTypeOf[C], in) - val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _)) - if (unsatisfied.nonEmpty) c.error(c.enclosingPosition, s"reply types ${unsatisfied mkString ", "} not covered for channel $in") - val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t)) - if (leftovers.nonEmpty) c.error(c.enclosingPosition, s"reply types ${leftovers mkString ", "} are superfluous for channel $in") - } + narrowCheck(u)(u.weakTypeOf[T], u.weakTypeOf[C]) match { + case Nil ⇒ // okay + case err :: Nil ⇒ c.error(c.enclosingPosition, err) + case list ⇒ c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", "")) } u.reify(c.prefix.splice.asInstanceOf[ChannelRef[C]]) } diff --git a/akka-macros/src/main/scala/akka/channels/Channels.scala b/akka-macros/src/main/scala/akka/channels/Channels.scala index dc149211f0..84f05f91cc 100644 --- a/akka-macros/src/main/scala/akka/channels/Channels.scala +++ b/akka-macros/src/main/scala/akka/channels/Channels.scala @@ -5,15 +5,21 @@ package akka.channels import language.experimental.macros -import akka.actor.Actor +import akka.actor.{ Actor, ActorRef } import scala.reflect.macros.Context +import scala.reflect.runtime.{ universe ⇒ ru } import scala.reflect.runtime.universe.TypeTag -import scala.reflect.macros.Universe +import scala.reflect.api.Universe import scala.runtime.AbstractPartialFunction import akka.actor.Props import scala.collection.immutable import scala.collection.mutable.ArrayBuffer import scala.reflect.{ classTag, ClassTag } +import scala.concurrent.{ ExecutionContext, Future } +import akka.util.Timeout +import akka.pattern.ask +import scala.util.control.NoStackTrace +import akka.AkkaException /** * Typed channels atop untyped actors. @@ -32,7 +38,7 @@ import scala.reflect.{ classTag, ClassTag } * erased type, which may be less precise than the actual channel type; this * can lead to ClassCastExceptions if sending through the untyped ActorRef */ -class Channels[P <: ChannelList, C <: ChannelList: TypeTag] extends Actor { +trait Channels[P <: ChannelList, C <: ChannelList] extends Actor { import Channels._ @@ -75,6 +81,14 @@ class Channels[P <: ChannelList, C <: ChannelList: TypeTag] extends Actor { behavior += cls -> recv.asInstanceOf[Recv[Any, ChannelList]] } + /* + * HORRIBLE HACK AHEAD + * + * I’d like to keep this a trait, but traits cannot have constructor + * arguments, not even TypeTags. + */ + protected var channelListTypeTag: TypeTag[C] = _ + /** * Sort so that subtypes always precede their supertypes, but without * obeying any order between unrelated subtypes (insert sort). @@ -92,20 +106,29 @@ class Channels[P <: ChannelList, C <: ChannelList: TypeTag] extends Actor { val index = sortClasses(behavior.keys) - override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = { - val msgClass = x.getClass - index find (_ isAssignableFrom msgClass) match { - case None ⇒ default(x) - case Some(cls) ⇒ behavior(cls).apply(x, new ChannelRef(sender)) - } + override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = x match { + case CheckType(tt) ⇒ + narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match { + case Nil ⇒ sender ! CheckTypeACK + case err :: Nil ⇒ sender ! CheckTypeNAK(err) + case list ⇒ sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", "")) + } + case _ ⇒ + val msgClass = x.getClass + index find (_ isAssignableFrom msgClass) match { + case None ⇒ default(x) + case Some(cls) ⇒ behavior(cls).apply(x, new ChannelRef(sender)) + } } - def isDefinedAt(x: Any): Boolean = { - val msgClass = x.getClass - index find (_ isAssignableFrom msgClass) match { - case None ⇒ false - case Some(cls) ⇒ true - } + def isDefinedAt(x: Any): Boolean = x match { + case c: CheckType[_] ⇒ true + case _ ⇒ + val msgClass = x.getClass + index find (_ isAssignableFrom msgClass) match { + case None ⇒ false + case Some(cls) ⇒ true + } } } } @@ -114,6 +137,11 @@ object Channels { type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit] + case class CheckType[T](tt: TypeTag[T]) + case object CheckTypeACK + case class CheckTypeNAK(errors: String) + case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace + /** * This macro transforms a channel[] call which returns “some” Behaviorist * into a _channel[] call with precise reply channel descriptors, so that the @@ -131,16 +159,26 @@ object Channels { reify(null) } else { val channels = toChannels(c.universe)(out) - c.Expr(Apply( - TypeApply( - Select(c.prefix.tree, "_channel"), List( - TypeTree().setType(c.weakTypeOf[T]), - TypeTree().setType(channels))), - List(Select( + c.Expr( + Apply( TypeApply( - Select(Select(Ident("scala"), "reflect"), "classTag"), - List(TypeTree().setType(c.weakTypeOf[T]))), - "runtimeClass")))) + Select(c.prefix.tree, "_channel"), List( + TypeTree().setType(c.weakTypeOf[T]), + TypeTree().setType(channels))), + List( + Block(List( + If(reify(c.prefix.splice.channelListTypeTag == null).tree, + Apply( + Select(c.prefix.tree, "channelListTypeTag_$eq"), + List(TypeApply( + Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"), + List(TypeTree().setType(c.weakTypeOf[C]))))), + c.literalUnit.tree)), + Select( + TypeApply( + Select(Select(Ident("scala"), "reflect"), "classTag"), + List(TypeTree().setType(c.weakTypeOf[T]))), + "runtimeClass"))))) } } @@ -166,6 +204,25 @@ object Channels { } } + /** + * check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings + */ + def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = { + var errors = List.empty[String] + for (in ← inputChannels(u)(target)) { + val replies = replyChannels(u)(orig, in) + if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in" + else { + val targetReplies = replyChannels(u)(target, in) + val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _)) + if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in" + val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t)) + if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in" + } + } + errors.reverse + } + /** * get all required channels from a Parent[_] */ @@ -230,4 +287,12 @@ object Channels { rec(list.reverse, weakTypeOf[TNil]) } + implicit class ActorRefOps(val ref: ActorRef) extends AnyVal { + def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = { + ref ? CheckType(tt) map { + case CheckTypeACK ⇒ new ChannelRef[C](ref) + case CheckTypeNAK(error) ⇒ throw NarrowingException(error) + } + } + } } \ No newline at end of file