add narrowing from ActorRef to ChannelRef

This commit is contained in:
Roland 2013-01-03 23:36:44 +01:00
parent 4b2a28887d
commit da10291cdd
4 changed files with 128 additions and 40 deletions

View file

@ -4,11 +4,14 @@
package akka.channels
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit._
import akka.actor.ActorRef
import akka.makkros.Test._
import scala.tools.reflect.ToolBoxError
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.Failure
object ChannelSpec {
@ -64,8 +67,8 @@ object ChannelSpec {
case (C, _) client ! C
}
createChild(new Channels[(C, Nothing) :+: TNil, TNil])
createChild(new Channels[(A, Nothing) :+:(C, Nothing) :+: TNil, TNil])
createChild(new Channels[(C, Nothing) :+: TNil, TNil] {})
createChild(new Channels[(A, Nothing) :+:(C, Nothing) :+: TNil, TNil] {})
}
}
@ -183,7 +186,7 @@ class ChannelSpec extends AkkaSpec with ImplicitSender {
|import akka.channels._
|import ChannelSpec._
|new Channels[TNil, (A, B) :+: (C, D) :+: TNil] {
| createChild(new Channels)
| createChild(new Channels[Nothing, Nothing] {})
|}
""".stripMargin)
}.message must include("Parent argument must not be Nothing")
@ -195,7 +198,7 @@ class ChannelSpec extends AkkaSpec with ImplicitSender {
|import akka.channels._
|import ChannelSpec._
|new Channels[TNil, (A, B) :+: (C, D) :+: TNil] {
| createChild(new Channels[(B, Nothing) :+: TNil, TNil])
| createChild(new Channels[(B, Nothing) :+: TNil, TNil] {})
|}
""".stripMargin)
}.message must include("This actor cannot support a child requiring channels akka.channels.ChannelSpec.B")
@ -276,6 +279,28 @@ class ChannelSpec extends AkkaSpec with ImplicitSender {
}.message must include("reply types Nothing are superfluous for channel akka.channels.ChannelSpec.A")
}
"support narrowing ActorRefs" in {
import Channels._
val channel = ChannelExt(system).actorOf(new RecvC(testActor))
val ref = channel.actorRef
implicit val t = Timeout(1.second.dilated)
import system.dispatcher
val r = Await.result(ref.narrow[(C, Nothing) :+: TNil], t.duration)
r ! C
expectMsg(C)
}
"deny wrong narrowing of ActorRefs" in {
import Channels._
val channel = ChannelExt(system).actorOf(new RecvC(testActor))
val ref = channel.actorRef
implicit val t = Timeout(1.second.dilated)
import system.dispatcher
val f = ref.narrow[(D, Nothing) :+: TNil]
Await.ready(f, t.duration)
f.value.get must be(Failure(Channels.NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D")))
}
}
}

View file

@ -15,6 +15,10 @@ import scala.reflect.runtime.universe
object ChannelExt extends ExtensionKey[ChannelExtension]
class ChannelExtension(system: ExtendedActorSystem) extends Extension {
// kick-start the universe (needed due to thread safety issues in runtime mirror)
private val t = implicitly[TypeTag[(Int, Int) :+: TNil]]
def actorOf[Ch <: ChannelList: TypeTag](factory: Channels[_, Ch]): ChannelRef[Ch] =
new ChannelRef[Ch](system.actorOf(Props(factory)))
}

View file

@ -59,16 +59,10 @@ object ChannelRef {
type PrefixType = ChannelRef[T]
}): c.Expr[ChannelRef[C]] = {
import c.{ universe u }
for (in inputChannels(u)(u.weakTypeOf[C])) {
val replies = replyChannels(u)(u.weakTypeOf[T], in)
if (replies.isEmpty) c.error(c.enclosingPosition, s"original ChannelRef does not support input type $in")
else {
val targetReplies = replyChannels(u)(u.weakTypeOf[C], in)
val unsatisfied = replies filterNot (r targetReplies exists (r <:< _))
if (unsatisfied.nonEmpty) c.error(c.enclosingPosition, s"reply types ${unsatisfied mkString ", "} not covered for channel $in")
val leftovers = targetReplies filterNot (t replies exists (_ <:< t))
if (leftovers.nonEmpty) c.error(c.enclosingPosition, s"reply types ${leftovers mkString ", "} are superfluous for channel $in")
}
narrowCheck(u)(u.weakTypeOf[T], u.weakTypeOf[C]) match {
case Nil // okay
case err :: Nil c.error(c.enclosingPosition, err)
case list c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", ""))
}
u.reify(c.prefix.splice.asInstanceOf[ChannelRef[C]])
}

View file

@ -5,15 +5,21 @@
package akka.channels
import language.experimental.macros
import akka.actor.Actor
import akka.actor.{ Actor, ActorRef }
import scala.reflect.macros.Context
import scala.reflect.runtime.{ universe ru }
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.macros.Universe
import scala.reflect.api.Universe
import scala.runtime.AbstractPartialFunction
import akka.actor.Props
import scala.collection.immutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{ classTag, ClassTag }
import scala.concurrent.{ ExecutionContext, Future }
import akka.util.Timeout
import akka.pattern.ask
import scala.util.control.NoStackTrace
import akka.AkkaException
/**
* Typed channels atop untyped actors.
@ -32,7 +38,7 @@ import scala.reflect.{ classTag, ClassTag }
* erased type, which may be less precise than the actual channel type; this
* can lead to ClassCastExceptions if sending through the untyped ActorRef
*/
class Channels[P <: ChannelList, C <: ChannelList: TypeTag] extends Actor {
trait Channels[P <: ChannelList, C <: ChannelList] extends Actor {
import Channels._
@ -75,6 +81,14 @@ class Channels[P <: ChannelList, C <: ChannelList: TypeTag] extends Actor {
behavior += cls -> recv.asInstanceOf[Recv[Any, ChannelList]]
}
/*
* HORRIBLE HACK AHEAD
*
* Id like to keep this a trait, but traits cannot have constructor
* arguments, not even TypeTags.
*/
protected var channelListTypeTag: TypeTag[C] = _
/**
* Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort).
@ -92,20 +106,29 @@ class Channels[P <: ChannelList, C <: ChannelList: TypeTag] extends Actor {
val index = sortClasses(behavior.keys)
override def applyOrElse[A, B >: Unit](x: A, default: A B): B = {
val msgClass = x.getClass
index find (_ isAssignableFrom msgClass) match {
case None default(x)
case Some(cls) behavior(cls).apply(x, new ChannelRef(sender))
}
override def applyOrElse[A, B >: Unit](x: A, default: A B): B = x match {
case CheckType(tt)
narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match {
case Nil sender ! CheckTypeACK
case err :: Nil sender ! CheckTypeNAK(err)
case list sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", ""))
}
case _
val msgClass = x.getClass
index find (_ isAssignableFrom msgClass) match {
case None default(x)
case Some(cls) behavior(cls).apply(x, new ChannelRef(sender))
}
}
def isDefinedAt(x: Any): Boolean = {
val msgClass = x.getClass
index find (_ isAssignableFrom msgClass) match {
case None false
case Some(cls) true
}
def isDefinedAt(x: Any): Boolean = x match {
case c: CheckType[_] true
case _
val msgClass = x.getClass
index find (_ isAssignableFrom msgClass) match {
case None false
case Some(cls) true
}
}
}
}
@ -114,6 +137,11 @@ object Channels {
type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit]
case class CheckType[T](tt: TypeTag[T])
case object CheckTypeACK
case class CheckTypeNAK(errors: String)
case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace
/**
* This macro transforms a channel[] call which returns some Behaviorist
* into a _channel[] call with precise reply channel descriptors, so that the
@ -131,16 +159,26 @@ object Channels {
reify(null)
} else {
val channels = toChannels(c.universe)(out)
c.Expr(Apply(
TypeApply(
Select(c.prefix.tree, "_channel"), List(
TypeTree().setType(c.weakTypeOf[T]),
TypeTree().setType(channels))),
List(Select(
c.Expr(
Apply(
TypeApply(
Select(Select(Ident("scala"), "reflect"), "classTag"),
List(TypeTree().setType(c.weakTypeOf[T]))),
"runtimeClass"))))
Select(c.prefix.tree, "_channel"), List(
TypeTree().setType(c.weakTypeOf[T]),
TypeTree().setType(channels))),
List(
Block(List(
If(reify(c.prefix.splice.channelListTypeTag == null).tree,
Apply(
Select(c.prefix.tree, "channelListTypeTag_$eq"),
List(TypeApply(
Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"),
List(TypeTree().setType(c.weakTypeOf[C]))))),
c.literalUnit.tree)),
Select(
TypeApply(
Select(Select(Ident("scala"), "reflect"), "classTag"),
List(TypeTree().setType(c.weakTypeOf[T]))),
"runtimeClass")))))
}
}
@ -166,6 +204,25 @@ object Channels {
}
}
/**
* check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings
*/
def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = {
var errors = List.empty[String]
for (in inputChannels(u)(target)) {
val replies = replyChannels(u)(orig, in)
if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in"
else {
val targetReplies = replyChannels(u)(target, in)
val unsatisfied = replies filterNot (r targetReplies exists (r <:< _))
if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in"
val leftovers = targetReplies filterNot (t replies exists (_ <:< t))
if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in"
}
}
errors.reverse
}
/**
* get all required channels from a Parent[_]
*/
@ -230,4 +287,12 @@ object Channels {
rec(list.reverse, weakTypeOf[TNil])
}
implicit class ActorRefOps(val ref: ActorRef) extends AnyVal {
def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = {
ref ? CheckType(tt) map {
case CheckTypeACK new ChannelRef[C](ref)
case CheckTypeNAK(error) throw NarrowingException(error)
}
}
}
}