major facelift: -!-> and -?-> appear
- rename projects to akka-channels and akka-channels-tests - move implementation into akka.channels.macros package - remove picking up ActorRef as sender (or none at all) - factor out logic to make different façades acting upon Future[] or Any so that -!-> and -?-> can complement the traditional <-!- and <-?- - the new operators are easily distinguishable from !/? and the rightwards-pointing go with the flow and compose better, let’s try them out
This commit is contained in:
parent
cfcc9da9bc
commit
e862890ded
18 changed files with 718 additions and 572 deletions
|
|
@ -62,6 +62,19 @@ object ExecutionContexts {
|
|||
* @return a reference to the global ExecutionContext
|
||||
*/
|
||||
def global(): ExecutionContext = ExecutionContext.global
|
||||
|
||||
/**
|
||||
* WARNING: Not A General Purpose ExecutionContext!
|
||||
*
|
||||
* This is an execution context which runs everything on the calling thread.
|
||||
* It is very useful for actions which are known to be non-blocking and
|
||||
* non-throwing in order to save a round-trip to the thread pool.
|
||||
*/
|
||||
object sameThreadExecutionContext extends ExecutionContext {
|
||||
override def execute(runnable: Runnable): Unit = runnable.run()
|
||||
override def reportFailure(t: Throwable): Unit =
|
||||
throw new IllegalStateException("exception in sameThreadExecutionContext", t)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import scala.tools.reflect.ToolBoxError
|
|||
|
||||
object Test {
|
||||
|
||||
def eval(code: String, compileOptions: String = "-cp akka-actor/target/classes:akka-macros/target/classes"): Any = {
|
||||
def eval(code: String, compileOptions: String = "-cp akka-actor/target/classes:akka-channels/target/classes"): Any = {
|
||||
val tb = mkToolbox(compileOptions)
|
||||
tb.eval(tb.parse(code))
|
||||
}
|
||||
|
|
@ -44,10 +44,10 @@ object ChannelSpec {
|
|||
|
||||
// used for sender verification in the first two test cases
|
||||
class Tester extends Channels[TNil, (A, C) :+: (B, D) :+: TNil] {
|
||||
channel[A.type] { (msg, snd) ⇒ snd ! C }
|
||||
channel[A] { (msg, snd) ⇒ snd ! C1 }
|
||||
channel[A.type] { (msg, snd) ⇒ snd <-!- C }
|
||||
channel[A] { (msg, snd) ⇒ snd <-!- C1 }
|
||||
channel[B] {
|
||||
case (B, s) ⇒ s ! D
|
||||
case (B, s) ⇒ s <-!- D
|
||||
}
|
||||
}
|
||||
class RecvC(ref: ActorRef) extends Channels[TNil, (C, Nothing) :+: TNil] {
|
||||
|
|
@ -57,20 +57,20 @@ object ChannelSpec {
|
|||
// pos compile test for multiple reply channels
|
||||
class SubChannels extends Channels[TNil, (A, B) :+: (A, C) :+: TNil] {
|
||||
channel[A] {
|
||||
case (A1, x) ⇒ x ! B
|
||||
case (_, x) ⇒ x ! C
|
||||
case (A1, x) ⇒ B -!-> x
|
||||
case (_, x) ⇒ x <-!- C
|
||||
}
|
||||
}
|
||||
|
||||
// pos compile test for children
|
||||
class Children extends Channels[TNil, (A, B) :+: (C, Nothing) :+: TNil] {
|
||||
val c = createChild(new Channels[(A, Nothing) :+: TNil, (B, C) :+: TNil] {
|
||||
channel[B] { case (B, s) ⇒ s ! C }
|
||||
channel[B] { case (B, s) ⇒ s <-!- C }
|
||||
})
|
||||
|
||||
var client: ActorRef = _
|
||||
channel[A] {
|
||||
case (A, s) ⇒ c ! B; client = sender
|
||||
case (A, s) ⇒ c <-!- B; client = sender
|
||||
}
|
||||
channel[C] {
|
||||
case (C, _) ⇒ client ! C
|
||||
|
|
@ -81,9 +81,10 @@ object ChannelSpec {
|
|||
}
|
||||
|
||||
// compile test for polymorphic actors
|
||||
class WriteOnly[T <: ChannelList: ru.TypeTag](target: ChannelRef[T]) extends Channels[TNil, (D, D) :+: T] {
|
||||
channel[D] { (d, snd) ⇒ snd ! d }
|
||||
channel[T] { x ⇒ target forward x }
|
||||
class WriteOnly[T1: ru.TypeTag, T2: ru.TypeTag](target: ChannelRef[(T1, T2) :+: TNil]) extends Channels[TNil, (D, D) :+: (T1, T2) :+: TNil] {
|
||||
channel[D] { (d, snd) ⇒ snd <-!- d }
|
||||
implicit val t = Timeout(1.second)
|
||||
channel[T1] { (x, snd) ⇒ x -?-> target }
|
||||
}
|
||||
|
||||
class MissingChannel extends Channels[TNil, (A, A) :+: (B, B) :+: TNil] {
|
||||
|
|
@ -102,10 +103,10 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|
||||
"construct refs" in {
|
||||
val ref = ChannelExt(system).actorOf(new Tester)
|
||||
ref ! A
|
||||
ref <-!- A
|
||||
expectMsg(C)
|
||||
lastSender must be(ref.actorRef)
|
||||
ref ! B
|
||||
ref <-!- B
|
||||
expectMsg(D)
|
||||
lastSender must be(ref.actorRef)
|
||||
}
|
||||
|
|
@ -113,7 +114,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
"select return channels" in {
|
||||
val ref = ChannelExt(system).actorOf(new Tester)
|
||||
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor))
|
||||
ref ! A
|
||||
ref <-!- A
|
||||
expectMsg(C)
|
||||
lastSender must be(selfChannel.actorRef)
|
||||
}
|
||||
|
|
@ -121,7 +122,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
"correctly dispatch to subchannels" in {
|
||||
val ref = ChannelExt(system).actorOf(new Tester)
|
||||
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor))
|
||||
ref ! A2
|
||||
ref <-!- A2
|
||||
expectMsg(C1)
|
||||
lastSender must be(selfChannel.actorRef)
|
||||
}
|
||||
|
|
@ -131,7 +132,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new ChannelRef[(A, C) :+: TNil](null) ! B
|
||||
|implicit val c = new ChannelRef[TNil](null)
|
||||
|new ChannelRef[(A, C) :+: TNil](null) <-!- B
|
||||
""".stripMargin)
|
||||
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B.type")
|
||||
}
|
||||
|
|
@ -141,7 +143,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new ChannelRef[(A, C) :+: (B, D) :+: TNil](null) ! C
|
||||
|implicit val c = new ChannelRef[TNil](null)
|
||||
|new ChannelRef[(A, C) :+: (B, D) :+: TNil](null) <-!- C
|
||||
""".stripMargin)
|
||||
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.C.type")
|
||||
}
|
||||
|
|
@ -152,14 +155,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(C, D) :+: TNil](null)
|
||||
|new ChannelRef[(A, B) :+: TNil](null) ! A
|
||||
|new ChannelRef[(A, B) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B")
|
||||
}
|
||||
|
||||
"permit any sender for Nothing replies" in {
|
||||
implicit val selfChannel = new ChannelRef[TNil](testActor)
|
||||
new ChannelRef[(A, Nothing) :+: TNil](testActor) ! A
|
||||
new ChannelRef[(A, Nothing) :+: TNil](testActor) <-!- A
|
||||
expectMsg(A)
|
||||
}
|
||||
|
||||
|
|
@ -169,7 +172,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[TNil](null)
|
||||
|new ChannelRef[(A, B) :+: (A, C) :+: TNil](null) ! A
|
||||
|new ChannelRef[(A, B) :+: (A, C) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B, akka.channels.ChannelSpec.C")
|
||||
}
|
||||
|
|
@ -180,7 +183,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(B, B) :+: TNil](null)
|
||||
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) ! A
|
||||
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.C")
|
||||
}
|
||||
|
|
@ -191,7 +194,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(B, B) :+: (C, B) :+: TNil](null)
|
||||
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) ! A
|
||||
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}
|
||||
def cause(ex: Throwable): Throwable =
|
||||
|
|
@ -220,7 +223,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|import ChannelSpec._
|
||||
|new Channels[TNil, (A, B) :+: (A1.type, C) :+: TNil] {
|
||||
| channel[A] {
|
||||
| case (A1, x) => x ! C
|
||||
| case (A1, x) => x <-!- C
|
||||
| }
|
||||
|}
|
||||
""".stripMargin)
|
||||
|
|
@ -253,14 +256,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|
||||
"have a working selfChannel" in {
|
||||
val ref = ChannelExt(system).actorOf(new Children)
|
||||
ref ! A
|
||||
ref <-!- A
|
||||
expectMsg(C)
|
||||
}
|
||||
|
||||
"have a working parentChannel" in {
|
||||
val parent = ChannelExt(system).actorOf(new Channels[TNil, (A, Nothing) :+: TNil] {
|
||||
createChild(new Channels[(A, Nothing) :+: TNil, TNil] {
|
||||
parentChannel ! A
|
||||
parentChannel <-!- A
|
||||
})
|
||||
channel[A] { (msg, snd) ⇒ testActor ! msg }
|
||||
})
|
||||
|
|
@ -284,7 +287,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|import ChannelSpec._
|
||||
|new Channels[TNil, (A, Nothing) :+: TNil] {
|
||||
| createChild(new Channels[(A, Nothing) :+: TNil, TNil] {
|
||||
| parentChannel ! B
|
||||
| parentChannel <-!- B
|
||||
| })
|
||||
|}
|
||||
""".stripMargin)
|
||||
|
|
@ -343,7 +346,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
implicit val t = Timeout(1.second.dilated)
|
||||
import system.dispatcher
|
||||
val r = Await.result(ref.narrow[(C, Nothing) :+: TNil], t.duration)
|
||||
r ! C
|
||||
r <-!- C
|
||||
expectMsg(C)
|
||||
}
|
||||
|
||||
|
|
@ -354,7 +357,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
import system.dispatcher
|
||||
val f = ref.narrow[(D, Nothing) :+: TNil]
|
||||
Await.ready(f, t.duration)
|
||||
f.value.get must be(Failure(Channels.NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D")))
|
||||
f.value.get must be(Failure(NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D")))
|
||||
}
|
||||
|
||||
"be equal according to its actor" in {
|
||||
|
|
@ -364,11 +367,11 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
|
||||
"allow wrapping of ChannelRefs with pass-through" in {
|
||||
val target = ChannelExt(system).actorOf(new RecvC(testActor))
|
||||
val wrap = ChannelExt(system).actorOf(new WriteOnly(target))
|
||||
wrap ! C
|
||||
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target))
|
||||
wrap <-!- C
|
||||
expectMsg(C)
|
||||
lastSender must be(target.actorRef)
|
||||
wrap ! D
|
||||
wrap <-!- D
|
||||
expectMsg(D)
|
||||
lastSender must be(wrap.actorRef)
|
||||
}
|
||||
|
|
@ -376,14 +379,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
"support typed ask" in {
|
||||
val t = ChannelExt(system).actorOf(new Tester)
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val r: Future[C] = t ? A
|
||||
val r: Future[C] = t <-?- A
|
||||
Await.result(r, 1.second) must be(C)
|
||||
}
|
||||
|
||||
"support typed ask with multiple reply channels" in {
|
||||
val t = ChannelExt(system).actorOf(new SubChannels)
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val r: Future[Msg] = t ? A1
|
||||
val r: Future[Msg] = t <-?- A1
|
||||
Await.result(r, 1.second) must be(B)
|
||||
}
|
||||
|
||||
30
akka-channels/src/main/scala/akka/channels/ChannelRef.scala
Normal file
30
akka-channels/src/main/scala/akka/channels/ChannelRef.scala
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.{ macros ⇒ makkros }
|
||||
import akka.actor.ActorRef
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.annotation.tailrec
|
||||
import scala.reflect.macros.Universe
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorContext
|
||||
import scala.concurrent.Future
|
||||
import akka.util.Timeout
|
||||
import akka.AkkaException
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace
|
||||
|
||||
class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
|
||||
|
||||
def <-!-[M](msg: M): Unit = macro macros.Tell.impl[T, M]
|
||||
|
||||
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[T, M]
|
||||
|
||||
def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T]
|
||||
|
||||
}
|
||||
172
akka-channels/src/main/scala/akka/channels/Channels.scala
Normal file
172
akka-channels/src/main/scala/akka/channels/Channels.scala
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.{ macros ⇒ makkros }
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
import scala.reflect.api.Universe
|
||||
import scala.runtime.AbstractPartialFunction
|
||||
import akka.actor.Props
|
||||
import scala.collection.immutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.reflect.{ classTag, ClassTag }
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.AkkaException
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.ActorInitializationException
|
||||
|
||||
/**
|
||||
* Typed channels atop untyped actors.
|
||||
*
|
||||
* The idea is that the actor declares all its input types up front, including
|
||||
* what it expects the sender to handle wrt.replies, and then ChannelRef
|
||||
* carries this information for statically verifying that messages sent to an
|
||||
* actor have an actual chance of being processed.
|
||||
*
|
||||
* There are several implementation-imposed restrictions:
|
||||
*
|
||||
* - not two channels with different input types may have the same erased
|
||||
* type; this is currently not enforced at compile time (and leads to
|
||||
* channels being “ignored” at runtime)
|
||||
* - messages received by the actor are dispatched to channels based on the
|
||||
* erased type, which may be less precise than the actual channel type; this
|
||||
* can lead to ClassCastExceptions if sending through the untyped ActorRef
|
||||
*/
|
||||
trait Channels[P <: ChannelList, C <: ChannelList] extends Actor {
|
||||
|
||||
import macros.Helpers._
|
||||
|
||||
/**
|
||||
* Create a child actor with properly typed ChannelRef, verifying that this
|
||||
* actor can handle everything which the child tries to send via its
|
||||
* `parent` ChannelRef.
|
||||
*/
|
||||
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch]
|
||||
|
||||
/**
|
||||
* Properly typed ChannelRef for the context.parent.
|
||||
*/
|
||||
final def parentChannel: ChannelRef[P] = new ChannelRef(context.parent)
|
||||
|
||||
/**
|
||||
* The properly typed self-channel is used implicitly when sending to other
|
||||
* typed channels for verifying that replies can be handled.
|
||||
*/
|
||||
implicit final def selfChannel = new ChannelRef[C](self)
|
||||
|
||||
/*
|
||||
* This map holds the current behavior for each erasure-tagged channel; the
|
||||
* basic receive impl will dispatch incoming messages according to the most
|
||||
* specific erased type in this map.
|
||||
*/
|
||||
private var behavior = Map.empty[Class[_], FF]
|
||||
|
||||
/**
|
||||
* Functions for storage in the behavior, to get around erasure
|
||||
*/
|
||||
private trait FF
|
||||
private case class F1(f: (WrappedMessage[ChannelList], ChannelRef[ChannelList]) ⇒ Unit) extends FF
|
||||
private case class F2(f: (Any, ChannelRef[ChannelList]) ⇒ Unit) extends FF
|
||||
|
||||
/**
|
||||
* Declare an input channel of the given type; the returned object takes a partial function:
|
||||
*
|
||||
* {{{
|
||||
* channel[A] {
|
||||
* case (a, s) =>
|
||||
* // a is of type A and
|
||||
* // s is a ChannelRef for the sender, capable of sending the declared reply type for A
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
def channel[T]: Channels[P, C]#Behaviorist[Nothing, T] = macro macros.Channel.impl[T, C, P]
|
||||
|
||||
class Behaviorist[-R, Ch](tt: ru.TypeTag[Ch], wrapped: Boolean) {
|
||||
private def ff(recv: R): FF =
|
||||
if (wrapped)
|
||||
F1(recv.asInstanceOf[(WrappedMessage[ChannelList], ChannelRef[ChannelList]) ⇒ Unit])
|
||||
else
|
||||
F2(recv.asInstanceOf[(Any, ChannelRef[ChannelList]) ⇒ Unit])
|
||||
def apply(recv: R): Unit =
|
||||
behavior ++= (for (t ← inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> ff(recv))
|
||||
}
|
||||
|
||||
/*
|
||||
* HORRIBLE HACK AHEAD
|
||||
*
|
||||
* I’d like to keep this a trait, but traits cannot have constructor
|
||||
* arguments, not even TypeTags.
|
||||
*/
|
||||
protected var channelListTypeTag: TypeTag[C] = _
|
||||
|
||||
/**
|
||||
* Sort so that subtypes always precede their supertypes, but without
|
||||
* obeying any order between unrelated subtypes (insert sort).
|
||||
*/
|
||||
private def sortClasses(in: Iterable[Class[_]]): immutable.Seq[Class[_]] =
|
||||
(new ArrayBuffer[Class[_]](in.size) /: in) { (buf, cls) ⇒
|
||||
buf.indexWhere(_ isAssignableFrom cls) match {
|
||||
case -1 ⇒ buf append cls
|
||||
case x ⇒ buf insert (x, cls)
|
||||
}
|
||||
buf
|
||||
}.to[immutable.IndexedSeq]
|
||||
|
||||
final lazy val receive = new AbstractPartialFunction[Any, Unit] {
|
||||
|
||||
val index = sortClasses(behavior.keys)
|
||||
|
||||
if (channelListTypeTag != null) verifyCompleteness()
|
||||
|
||||
private def verifyCompleteness() {
|
||||
val channels = inputChannels(ru)(channelListTypeTag.tpe)
|
||||
val classes = channels groupBy (e ⇒ channelListTypeTag.mirror.runtimeClass(e.widen))
|
||||
val missing = classes.keySet -- behavior.keySet
|
||||
if (missing.nonEmpty) {
|
||||
val m = missing.map(classes).flatten
|
||||
throw ActorInitializationException(s"missing declarations for channels ${m mkString ", "}")
|
||||
}
|
||||
}
|
||||
|
||||
override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = x match {
|
||||
case CheckType(tt) ⇒
|
||||
narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match {
|
||||
case Nil ⇒ sender ! CheckTypeACK
|
||||
case err :: Nil ⇒ sender ! CheckTypeNAK(err)
|
||||
case list ⇒ sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", ""))
|
||||
}
|
||||
case _ ⇒
|
||||
val msgClass = x.getClass
|
||||
index find (_ isAssignableFrom msgClass) match {
|
||||
case None ⇒ default(x)
|
||||
case Some(cls) ⇒
|
||||
behavior(cls) match {
|
||||
case F1(f) ⇒ f(new WrappedMessage[ChannelList](x), new ChannelRef(sender))
|
||||
case F2(f) ⇒ f(x, new ChannelRef(sender))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def isDefinedAt(x: Any): Boolean = x match {
|
||||
case c: CheckType[_] ⇒ true
|
||||
case _ ⇒
|
||||
val msgClass = x.getClass
|
||||
index find (_ isAssignableFrom msgClass) match {
|
||||
case None ⇒ false
|
||||
case Some(cls) ⇒ true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object Channels {
|
||||
|
||||
}
|
||||
36
akka-channels/src/main/scala/akka/channels/Ops.scala
Normal file
36
akka-channels/src/main/scala/akka/channels/Ops.scala
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
package akka.channels
|
||||
|
||||
import language.experimental.{ macros ⇒ makkros }
|
||||
import akka.actor.ActorRef
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import scala.util.Success
|
||||
|
||||
sealed trait ChannelList
|
||||
sealed trait TNil extends ChannelList
|
||||
sealed trait :+:[A <: (_, _), B <: ChannelList] extends ChannelList
|
||||
|
||||
class ActorRefOps(val ref: ActorRef) extends AnyVal {
|
||||
import macros.Helpers._
|
||||
def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = {
|
||||
import Channels._
|
||||
ref ? CheckType(tt) map {
|
||||
case CheckTypeACK ⇒ new ChannelRef[C](ref)
|
||||
case CheckTypeNAK(error) ⇒ throw NarrowingException(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FutureOps[T](val future: Future[T]) extends AnyVal {
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureImpl[C, T]
|
||||
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[C, T]
|
||||
}
|
||||
|
||||
class AnyOps[T](val value: T) extends AnyVal {
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): Unit = macro macros.Tell.opsImpl[C, T]
|
||||
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.opsImpl[C, T]
|
||||
}
|
||||
|
||||
class WrappedMessage[T <: ChannelList](val value: Any) extends AnyVal
|
||||
63
akka-channels/src/main/scala/akka/channels/macros/Ask.scala
Normal file
63
akka-channels/src/main/scala/akka/channels/macros/Ask.scala
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.concurrent.Future
|
||||
import akka.util.Timeout
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
object Ask {
|
||||
import Helpers._
|
||||
|
||||
def impl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
})(msg: c.Expr[M]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
askTree(c)(weakTypeOf[M], weakTypeOf[T])(reify(c.prefix.splice.actorRef), msg)
|
||||
}
|
||||
|
||||
def opsImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = AnyOps[M]
|
||||
})(channel: c.Expr[ChannelRef[T]]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
askTree(c)(weakTypeOf[M], weakTypeOf[T])(reify(channel.splice.actorRef), reify(c.prefix.splice.value))
|
||||
}
|
||||
|
||||
def futureImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = FutureOps[M]
|
||||
})(channel: c.Expr[ChannelRef[T]]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
val tree = askTree(c)(weakTypeOf[M], weakTypeOf[T])(c.Expr(Ident("c$1")), c.Expr(Ident("x$1")))
|
||||
reify({
|
||||
val c$1 = channel.splice.actorRef
|
||||
c.prefix.splice.future.flatMap(x$1 ⇒ tree.splice)(ExecutionContexts.sameThreadExecutionContext)
|
||||
})
|
||||
}
|
||||
|
||||
def askTree[M](c: Context with Singleton)(msgT: c.universe.Type, chT: c.universe.Type)(target: c.Expr[ActorRef], msg: c.Expr[M]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
val out = replyChannels(c.universe)(chT, msgT)
|
||||
if (out.isEmpty) {
|
||||
c.error(c.enclosingPosition, s"This ChannelRef does not support messages of type $msgT")
|
||||
reify(null)
|
||||
} else {
|
||||
val timeout = c.inferImplicitValue(typeOf[Timeout])
|
||||
if (timeout.isEmpty)
|
||||
c.error(c.enclosingPosition, s"no implicit akka.util.Timeout found")
|
||||
val result = appliedType(weakTypeOf[Future[_]].typeConstructor, List(lub(out)))
|
||||
c.Expr(
|
||||
TypeApply(
|
||||
Select(
|
||||
reify(akka.pattern.ask(
|
||||
target.splice, msg.splice)(
|
||||
c.Expr(timeout)(weakTypeTag[Timeout]).splice)).tree,
|
||||
"asInstanceOf"),
|
||||
List(TypeTree().setType(result))))(weakTypeTag[Future[_]])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
|
||||
object Channel {
|
||||
import Helpers._
|
||||
|
||||
/**
|
||||
* This macro transforms a channel[] call which returns “some” Behaviorist
|
||||
* into a _channel[] call with precise reply channel descriptors, so that the
|
||||
* partial function it is applied to can enjoy proper type checking.
|
||||
*
|
||||
* T is the message type
|
||||
* C is the channel list of the enclosing Channels
|
||||
* P is the parent channel list
|
||||
*/
|
||||
def impl[T: c.WeakTypeTag, C <: ChannelList: c.WeakTypeTag, P <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Channels[P, C]
|
||||
}): c.Expr[Channels[P, C]#Behaviorist[Nothing, T]] = {
|
||||
|
||||
val tT = c.weakTypeOf[T]
|
||||
val tC = c.weakTypeOf[C]
|
||||
|
||||
import c.universe._
|
||||
|
||||
val undefined = missingChannels(c.universe)(tC, inputChannels(c.universe)(tT))
|
||||
if (undefined.nonEmpty) {
|
||||
c.error(c.enclosingPosition, s"no channel defined for types ${undefined mkString ", "}")
|
||||
reify(null)
|
||||
} else {
|
||||
checkUnique(c.universe)(tT, tC) foreach (c.error(c.enclosingPosition, _))
|
||||
val channels = toChannels(c.universe)(replyChannels(c.universe)(tC, tT))
|
||||
val (receive, wrapped) =
|
||||
if (tT <:< typeOf[ChannelList]) {
|
||||
appliedType(typeOf[Function2[_, _, _]].typeConstructor, List(
|
||||
appliedType(typeOf[WrappedMessage[_]].typeConstructor, List(tT)),
|
||||
appliedType(typeOf[ChannelRef[_]].typeConstructor, List(channels)),
|
||||
typeOf[Unit])) -> true
|
||||
} else {
|
||||
appliedType(typeOf[Function2[_, _, _]].typeConstructor, List(
|
||||
tT,
|
||||
appliedType(typeOf[ChannelRef[_]].typeConstructor, List(channels)),
|
||||
typeOf[Unit])) -> false
|
||||
}
|
||||
c.Expr(
|
||||
Block(List(
|
||||
If(
|
||||
{
|
||||
val cltt = c.Expr(Select(c.prefix.tree, "channelListTypeTag"))
|
||||
reify(cltt.splice == null).tree
|
||||
},
|
||||
Apply(
|
||||
Select(c.prefix.tree, "channelListTypeTag_$eq"),
|
||||
List(TypeApply(
|
||||
Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"),
|
||||
List(TypeTree().setType(c.weakTypeOf[C]))))),
|
||||
c.literalUnit.tree)),
|
||||
Apply(
|
||||
Select(
|
||||
New(AppliedTypeTree(Select(c.prefix.tree, newTypeName("Behaviorist")), List(
|
||||
TypeTree().setType(receive),
|
||||
TypeTree().setType(tT)))),
|
||||
nme.CONSTRUCTOR),
|
||||
List(
|
||||
TypeApply(
|
||||
Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"),
|
||||
List(TypeTree().setType(tT))),
|
||||
Literal(Constant(wrapped))))))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import akka.actor.Props
|
||||
|
||||
object CreateChild {
|
||||
import Helpers._
|
||||
|
||||
def impl[C <: ChannelList: c.WeakTypeTag, Pa <: ChannelList: c.WeakTypeTag, Ch <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Channels[_, C]
|
||||
})(factory: c.Expr[Channels[Pa, Ch]]): c.Expr[ChannelRef[Ch]] = {
|
||||
|
||||
import c.universe._
|
||||
if (weakTypeOf[Pa] =:= weakTypeOf[Nothing]) {
|
||||
c.abort(c.enclosingPosition, "Parent argument must not be Nothing")
|
||||
}
|
||||
if (weakTypeOf[Ch] =:= weakTypeOf[Nothing]) {
|
||||
c.abort(c.enclosingPosition, "channel list must not be Nothing")
|
||||
}
|
||||
val missing = missingChannels(c.universe)(weakTypeOf[C], inputChannels(c.universe)(weakTypeOf[Pa]))
|
||||
if (missing.isEmpty) {
|
||||
implicit val t = c.TypeTag[Ch](c.weakTypeOf[Ch])
|
||||
reify(new ChannelRef[Ch](c.prefix.splice.context.actorOf(Props(factory.splice))))
|
||||
} else {
|
||||
c.error(c.enclosingPosition, s"This actor cannot support a child requiring channels ${missing mkString ", "}")
|
||||
reify(???)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
119
akka-channels/src/main/scala/akka/channels/macros/Helpers.scala
Normal file
119
akka-channels/src/main/scala/akka/channels/macros/Helpers.scala
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
package akka.channels.macros
|
||||
|
||||
import akka.AkkaException
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
|
||||
object Helpers {
|
||||
|
||||
type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit]
|
||||
|
||||
case class CheckType[T](tt: TypeTag[T])
|
||||
case object CheckTypeACK
|
||||
case class CheckTypeNAK(errors: String)
|
||||
|
||||
def error(c: Context, msg: String) = c.error(c.enclosingPosition, msg)
|
||||
def abort(c: Context, msg: String) = c.abort(c.enclosingPosition, msg)
|
||||
|
||||
def checkUnique(u: Universe)(channel: u.Type, list: u.Type): Option[String] = {
|
||||
val channels = inputChannels(u)(list) groupBy (_.erasure)
|
||||
val dupes = channels.get(channel.erasure).getOrElse(Nil).filterNot(_ =:= channel)
|
||||
if (dupes.isEmpty) None
|
||||
else Some(s"erasure ${channel.erasure} overlaps with declared channels ${dupes mkString ", "}")
|
||||
}
|
||||
|
||||
/**
|
||||
* check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings
|
||||
*/
|
||||
def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = {
|
||||
var errors = List.empty[String]
|
||||
for (in ← inputChannels(u)(target)) {
|
||||
val replies = replyChannels(u)(orig, in)
|
||||
if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in"
|
||||
else {
|
||||
val targetReplies = replyChannels(u)(target, in)
|
||||
val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _))
|
||||
if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in"
|
||||
val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t))
|
||||
if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in"
|
||||
}
|
||||
}
|
||||
errors.reverse
|
||||
}
|
||||
|
||||
/**
|
||||
* get all input channels from a ChannelList or return the given type
|
||||
*/
|
||||
final def inputChannels(u: Universe)(list: u.Type): List[u.Type] = {
|
||||
import u._
|
||||
val imp = u.mkImporter(ru)
|
||||
val cl = imp.importType(ru.typeOf[ChannelList])
|
||||
val tnil = imp.importType(ru.typeOf[TNil])
|
||||
def rec(l: u.Type, acc: List[u.Type]): List[u.Type] = l match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, if (acc contains in) acc else in :: acc)
|
||||
case last ⇒ if (last =:= tnil) acc.reverse else (last :: acc).reverse
|
||||
}
|
||||
if (list <:< cl) rec(list, Nil)
|
||||
else List(list)
|
||||
}
|
||||
|
||||
/**
|
||||
* find all input channels matching the given message type and return a
|
||||
* list of their respective reply channels
|
||||
*/
|
||||
final def replyChannels(u: Universe)(list: u.Type, msg: u.Type): List[u.Type] = {
|
||||
import u._
|
||||
def rec(l: Type, acc: List[Type]): List[Type] = {
|
||||
l match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: out :: Nil) :: tail :: Nil) if msg <:< in ⇒
|
||||
rec(tail, if (acc contains out) acc else out :: acc)
|
||||
case TypeRef(_, _, _ :: tail :: Nil) ⇒
|
||||
rec(tail, acc)
|
||||
case _ ⇒ acc.reverse
|
||||
}
|
||||
}
|
||||
val n = typeOf[Nothing]
|
||||
if (msg =:= n) List(n) else rec(list, Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* filter from the `required` list of types all which are subtypes of inputs of the ChannelList
|
||||
*/
|
||||
final def missingChannels(u: Universe)(channels: u.Type, required: List[u.Type]): List[u.Type] = {
|
||||
import u._
|
||||
// making the top-level method recursive blows up the compiler (when compiling the macro itself)
|
||||
def rec(ch: Type, req: List[Type]): List[Type] = {
|
||||
ch match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, req filterNot (_ <:< in))
|
||||
case last ⇒ req filterNot (_ <:< last)
|
||||
}
|
||||
}
|
||||
rec(channels, required)
|
||||
}
|
||||
|
||||
/**
|
||||
* convert a list of types List(<T1>, <T2>, ...) into a ChannelList
|
||||
* ( Channel[<T1>, Nothing] :=: Channel[<T2>, Nothing] :=: ... :=: TNil )
|
||||
*/
|
||||
final def toChannels(u: Universe)(list: List[u.Type]): u.Type = {
|
||||
import u._
|
||||
def rec(l: List[Type], acc: Type): Type = l match {
|
||||
case head :: (tail: List[Type]) ⇒
|
||||
if (head =:= weakTypeOf[Nothing]) rec(tail, acc)
|
||||
else
|
||||
rec(tail,
|
||||
appliedType(weakTypeOf[:+:[_, _]].typeConstructor, List(
|
||||
appliedType(weakTypeOf[Tuple2[_, _]].typeConstructor, List(
|
||||
head,
|
||||
weakTypeOf[Nothing])),
|
||||
acc)))
|
||||
case _ ⇒ acc
|
||||
}
|
||||
rec(list.reverse, weakTypeOf[TNil])
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
|
||||
object Narrow {
|
||||
import Helpers._
|
||||
|
||||
def impl[C <: ChannelList: c.WeakTypeTag, T <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
}): c.Expr[ChannelRef[C]] = {
|
||||
import c.{ universe ⇒ u }
|
||||
narrowCheck(u)(u.weakTypeOf[T], u.weakTypeOf[C]) match {
|
||||
case Nil ⇒ // okay
|
||||
case err :: Nil ⇒ c.error(c.enclosingPosition, err)
|
||||
case list ⇒ c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", ""))
|
||||
}
|
||||
u.reify(c.prefix.splice.asInstanceOf[ChannelRef[C]])
|
||||
}
|
||||
}
|
||||
90
akka-channels/src/main/scala/akka/channels/macros/Tell.scala
Normal file
90
akka-channels/src/main/scala/akka/channels/macros/Tell.scala
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import akka.actor._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Failure, Success }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
object Tell {
|
||||
import Helpers._
|
||||
|
||||
def impl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
})(msg: c.Expr[M]): c.Expr[Unit] = {
|
||||
val tT = c.universe.weakTypeOf[T]
|
||||
val (tS, senderTree, sender) = getSenderChannel(c)
|
||||
|
||||
verify(c)(senderTree, c.universe.weakTypeOf[M], tS, tT)
|
||||
|
||||
c.universe.reify(c.prefix.splice.actorRef.tell(msg.splice, sender.splice))
|
||||
}
|
||||
|
||||
def opsImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = AnyOps[M]
|
||||
})(channel: c.Expr[ChannelRef[T]]): c.Expr[Unit] = {
|
||||
val tT = c.universe.weakTypeOf[T]
|
||||
val (tS, senderTree, sender) = getSenderChannel(c)
|
||||
|
||||
verify(c)(senderTree, c.universe.weakTypeOf[M], tS, tT)
|
||||
|
||||
c.universe.reify(channel.splice.actorRef.tell(c.prefix.splice.value, sender.splice))
|
||||
}
|
||||
|
||||
def futureImpl[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = FutureOps[M]
|
||||
})(channel: c.Expr[ChannelRef[T]]): c.Expr[Future[M]] = {
|
||||
val tT = c.universe.weakTypeOf[T]
|
||||
val (tS, senderTree, sender) = getSenderChannel(c)
|
||||
|
||||
verify(c)(senderTree, c.universe.weakTypeOf[M], tS, tT)
|
||||
|
||||
c.universe.reify(
|
||||
{
|
||||
val s$1 = sender.splice
|
||||
c.prefix.splice.future.andThen {
|
||||
case Success(s) ⇒ channel.splice.actorRef.tell(s, s$1)
|
||||
case _ ⇒
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
})
|
||||
}
|
||||
|
||||
def getSenderChannel(c: Context): (c.universe.Type, c.Tree, c.Expr[ActorRef]) = {
|
||||
val replyChannel = c.inferImplicitValue(c.typeOf[ChannelRef[_]])
|
||||
if (!replyChannel.isEmpty) {
|
||||
import c.universe._
|
||||
replyChannel.tpe match {
|
||||
case TypeRef(_, _, param :: Nil) ⇒
|
||||
(param, replyChannel, c.Expr(Select(replyChannel, "actorRef"))(c.universe.weakTypeTag[ActorRef]))
|
||||
}
|
||||
} else abort(c, "no implicit sender ChannelRef found")
|
||||
}
|
||||
|
||||
def verify(c: Context)(sender: c.universe.Tree, msgT: c.universe.Type, sndT: c.universe.Type, chT: c.universe.Type)(): Unit = {
|
||||
def rec(msg: Set[c.universe.Type], checked: Set[c.universe.Type], depth: Int): Unit =
|
||||
if (msg.nonEmpty) {
|
||||
val u: c.universe.type = c.universe
|
||||
val replies = msg map (m ⇒ m -> replyChannels(u)(chT, m))
|
||||
val missing = replies collect { case (k, v) if v.size == 0 ⇒ k }
|
||||
if (missing.nonEmpty)
|
||||
error(c, s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextSend = replies.map(_._2).flatten map (m ⇒ m -> replyChannels(u)(sndT, m))
|
||||
val nextMissing = nextSend collect { case (k, v) if v.size == 0 ⇒ k }
|
||||
if (nextMissing.nonEmpty)
|
||||
error(c, s"implicit sender `$sender` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextChecked = checked ++ msg
|
||||
val nextMsg = nextSend.map(_._2).flatten -- nextChecked
|
||||
rec(nextMsg, nextChecked, depth + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
rec(Set(msgT), Set(c.universe.typeOf[Nothing]), 1)
|
||||
}
|
||||
}
|
||||
15
akka-channels/src/main/scala/akka/channels/package.scala
Normal file
15
akka-channels/src/main/scala/akka/channels/package.scala
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
||||
import language.implicitConversions
|
||||
import akka.actor.ActorRef
|
||||
import scala.concurrent.Future
|
||||
|
||||
package object channels {
|
||||
implicit def actorRefOps(ref: ActorRef) = new ActorRefOps(ref)
|
||||
implicit def futureOps[T](f: Future[T]) = new FutureOps(f)
|
||||
implicit def anyOps[T](x: T) = new AnyOps(x)
|
||||
}
|
||||
|
|
@ -1,137 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.macros
|
||||
import akka.actor.ActorRef
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.annotation.tailrec
|
||||
import scala.reflect.macros.Universe
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorContext
|
||||
import scala.concurrent.Future
|
||||
import akka.util.Timeout
|
||||
|
||||
class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
|
||||
|
||||
def : Unit = macro ChannelRef.tell[T, M]
|
||||
|
||||
def ?[M](msg: M): Future[_] = macro ChannelRef.ask[T, M]
|
||||
|
||||
def forward[M](msg: M): Unit = macro ChannelRef.forward[T, M]
|
||||
|
||||
def narrow[C <: ChannelList]: ChannelRef[C] = macro ChannelRef.narrowImpl[C, T]
|
||||
|
||||
}
|
||||
|
||||
object ChannelRef {
|
||||
import Channels._
|
||||
|
||||
def tell[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
})(msg: c.Expr[M]): c.Expr[Unit] = {
|
||||
import c.{ universe ⇒ u }
|
||||
|
||||
def getSenderChannel = {
|
||||
val replyChannel = c.inferImplicitValue(c.typeOf[ChannelRef[_]])
|
||||
if (!replyChannel.isEmpty) {
|
||||
import u._
|
||||
replyChannel.tpe match {
|
||||
case TypeRef(_, _, param :: Nil) ⇒
|
||||
Some((param, replyChannel, c.Expr(Select(replyChannel, "actorRef"))(u.weakTypeTag[ActorRef])))
|
||||
}
|
||||
} else None
|
||||
}
|
||||
|
||||
def getSenderRef = {
|
||||
val senderTree = c.inferImplicitValue(c.typeOf[ActorRef])
|
||||
if (senderTree.isEmpty) {
|
||||
val noSender = c.universe.reify(Actor.noSender)
|
||||
((u.typeOf[(Any, Nothing) :+: TNil], noSender.tree, noSender))
|
||||
} else ((u.typeOf[(Any, Any) :+: TNil], senderTree, c.Expr(senderTree)(c.WeakTypeTag(senderTree.tpe))))
|
||||
}
|
||||
|
||||
val tT = u.weakTypeOf[T]
|
||||
val (tS, senderTree, sender) = getSenderChannel getOrElse getSenderRef
|
||||
|
||||
def err(msg: String) = c.error(c.enclosingPosition, msg)
|
||||
|
||||
def verify(msg: Set[u.Type], checked: Set[u.Type], depth: Int): Unit = if (msg.nonEmpty) {
|
||||
val replies = msg map (m ⇒ m -> replyChannels(u)(tT, m))
|
||||
val missing = replies collect { case (k, v) if v.size == 0 ⇒ k }
|
||||
if (missing.nonEmpty)
|
||||
err(s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextSend = replies.map(_._2).flatten map (m ⇒ m -> replyChannels(u)(tS, m))
|
||||
val nextMissing = nextSend collect { case (k, v) if v.size == 0 ⇒ k }
|
||||
if (nextMissing.nonEmpty)
|
||||
err(s"implicit sender `$senderTree` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextChecked = checked ++ msg
|
||||
val nextMsg = nextSend.map(_._2).flatten -- nextChecked
|
||||
verify(nextMsg, nextChecked, depth + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
verify(Set(u.weakTypeOf[M]), Set(u.typeOf[Nothing]), 1)
|
||||
u.reify(c.prefix.splice.actorRef.tell(msg.splice, sender.splice))
|
||||
}
|
||||
|
||||
def ask[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
})(msg: c.Expr[M]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
|
||||
val out = replyChannels(c.universe)(weakTypeOf[T], weakTypeOf[M])
|
||||
if (out.isEmpty) {
|
||||
c.error(c.enclosingPosition, s"This ChannelRef does not support messages of type ${weakTypeOf[M]}")
|
||||
reify(null)
|
||||
} else {
|
||||
val timeout = c.inferImplicitValue(typeOf[Timeout])
|
||||
if (timeout.isEmpty)
|
||||
c.error(c.enclosingPosition, s"no implicit akka.util.Timeout found")
|
||||
val result = appliedType(weakTypeOf[Future[_]].typeConstructor, List(lub(out)))
|
||||
c.Expr(
|
||||
TypeApply(
|
||||
Select(
|
||||
reify(akka.pattern.ask(
|
||||
c.prefix.splice.actorRef, msg.splice)(
|
||||
c.Expr(timeout)(weakTypeTag[Timeout]).splice)).tree,
|
||||
"asInstanceOf"),
|
||||
List(TypeTree().setType(result))))
|
||||
}
|
||||
}
|
||||
|
||||
def forward[T <: ChannelList: c.WeakTypeTag, M: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
})(msg: c.Expr[M]): c.Expr[Unit] = {
|
||||
import c.universe._
|
||||
if (weakTypeOf[M] =:= weakTypeOf[Values.WrappedMessage[T]]) {
|
||||
reify(
|
||||
c.prefix.splice.actorRef.forward(
|
||||
msg.splice.asInstanceOf[Values.WrappedMessage[_]].value)(
|
||||
c.Expr(Ident("implicitly"))(weakTypeTag[ActorContext]).splice))
|
||||
} else {
|
||||
c.error(c.enclosingPosition, s"cannot forward message unless types match exactly")
|
||||
reify(())
|
||||
}
|
||||
}
|
||||
|
||||
def narrowImpl[C <: ChannelList: c.WeakTypeTag, T <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = ChannelRef[T]
|
||||
}): c.Expr[ChannelRef[C]] = {
|
||||
import c.{ universe ⇒ u }
|
||||
narrowCheck(u)(u.weakTypeOf[T], u.weakTypeOf[C]) match {
|
||||
case Nil ⇒ // okay
|
||||
case err :: Nil ⇒ c.error(c.enclosingPosition, err)
|
||||
case list ⇒ c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", ""))
|
||||
}
|
||||
u.reify(c.prefix.splice.asInstanceOf[ChannelRef[C]])
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,357 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.macros
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
import scala.reflect.api.Universe
|
||||
import scala.runtime.AbstractPartialFunction
|
||||
import akka.actor.Props
|
||||
import scala.collection.immutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.reflect.{ classTag, ClassTag }
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.AkkaException
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.ActorInitializationException
|
||||
|
||||
/**
|
||||
* Typed channels atop untyped actors.
|
||||
*
|
||||
* The idea is that the actor declares all its input types up front, including
|
||||
* what it expects the sender to handle wrt.replies, and then ChannelRef
|
||||
* carries this information for statically verifying that messages sent to an
|
||||
* actor have an actual chance of being processed.
|
||||
*
|
||||
* There are several implementation-imposed restrictions:
|
||||
*
|
||||
* - not two channels with different input types may have the same erased
|
||||
* type; this is currently not enforced at compile time (and leads to
|
||||
* channels being “ignored” at runtime)
|
||||
* - messages received by the actor are dispatched to channels based on the
|
||||
* erased type, which may be less precise than the actual channel type; this
|
||||
* can lead to ClassCastExceptions if sending through the untyped ActorRef
|
||||
*/
|
||||
trait Channels[P <: ChannelList, C <: ChannelList] extends Actor {
|
||||
|
||||
import Channels._
|
||||
|
||||
/**
|
||||
* Create a child actor with properly typed ChannelRef, verifying that this
|
||||
* actor can handle everything which the child tries to send via its
|
||||
* `parent` ChannelRef.
|
||||
*/
|
||||
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Channels[Pa, Ch]): ChannelRef[Ch] = macro createChildImpl[C, Pa, Ch]
|
||||
|
||||
/**
|
||||
* Properly typed ChannelRef for the context.parent.
|
||||
*/
|
||||
def parentChannel: ChannelRef[P] = new ChannelRef(context.parent)
|
||||
|
||||
/**
|
||||
* The properly typed self-channel is used implicitly when sending to other
|
||||
* typed channels for verifying that replies can be handled.
|
||||
*/
|
||||
implicit def selfChannel = new ChannelRef[C](self)
|
||||
|
||||
/*
|
||||
* This map holds the current behavior for each erasure-tagged channel; the
|
||||
* basic receive impl will dispatch incoming messages according to the most
|
||||
* specific erased type in this map.
|
||||
*/
|
||||
private var behavior = Map.empty[Class[_], FF]
|
||||
|
||||
private trait FF
|
||||
private object FF {
|
||||
def apply(x: Any): FF = x match {
|
||||
case f: Function1[_, _] ⇒ F1(f.asInstanceOf[Values.WrappedMessage[ChannelList] ⇒ Unit])
|
||||
case f: Function2[_, _, _] ⇒ F2(f.asInstanceOf[(Any, ChannelRef[ChannelList]) ⇒ Unit])
|
||||
}
|
||||
}
|
||||
private case class F1(f: Values.WrappedMessage[ChannelList] ⇒ Unit) extends FF
|
||||
private case class F2(f: (Any, ChannelRef[ChannelList]) ⇒ Unit) extends FF
|
||||
|
||||
/**
|
||||
* Declare an input channel of the given type; the returned object takes a partial function:
|
||||
*
|
||||
* {{{
|
||||
* channel[A] {
|
||||
* case (a, s) =>
|
||||
* // a is of type A and
|
||||
* // s is a ChannelRef for the sender, capable of sending the declared reply type for A
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
def channel[T]: Channels[P, C]#Behaviorist[Nothing, T] = macro channelImpl[T, C, P]
|
||||
|
||||
new Behaviorist(null)
|
||||
|
||||
protected class Behaviorist[-R, Ch](tt: ru.TypeTag[Ch]) {
|
||||
def apply(recv: R): Unit =
|
||||
behavior ++= (for (t ← inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> FF(recv))
|
||||
}
|
||||
|
||||
/*
|
||||
* HORRIBLE HACK AHEAD
|
||||
*
|
||||
* I’d like to keep this a trait, but traits cannot have constructor
|
||||
* arguments, not even TypeTags.
|
||||
*/
|
||||
protected var channelListTypeTag: TypeTag[C] = _
|
||||
|
||||
/**
|
||||
* Sort so that subtypes always precede their supertypes, but without
|
||||
* obeying any order between unrelated subtypes (insert sort).
|
||||
*/
|
||||
private def sortClasses(in: Iterable[Class[_]]): immutable.Seq[Class[_]] =
|
||||
(new ArrayBuffer[Class[_]](in.size) /: in) { (buf, cls) ⇒
|
||||
buf.indexWhere(_ isAssignableFrom cls) match {
|
||||
case -1 ⇒ buf append cls
|
||||
case x ⇒ buf insert (x, cls)
|
||||
}
|
||||
buf
|
||||
}.to[immutable.IndexedSeq]
|
||||
|
||||
final lazy val receive = new AbstractPartialFunction[Any, Unit] {
|
||||
|
||||
val index = sortClasses(behavior.keys)
|
||||
|
||||
if (channelListTypeTag != null) verifyCompleteness()
|
||||
|
||||
private def verifyCompleteness() {
|
||||
val channels = inputChannels(ru)(channelListTypeTag.tpe)
|
||||
val classes = channels groupBy (e ⇒ channelListTypeTag.mirror.runtimeClass(e.widen))
|
||||
val missing = classes.keySet -- behavior.keySet
|
||||
if (missing.nonEmpty) {
|
||||
val m = missing.map(classes).flatten
|
||||
throw ActorInitializationException(s"missing declarations for channels ${m mkString ", "}")
|
||||
}
|
||||
}
|
||||
|
||||
override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = x match {
|
||||
case CheckType(tt) ⇒
|
||||
narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match {
|
||||
case Nil ⇒ sender ! CheckTypeACK
|
||||
case err :: Nil ⇒ sender ! CheckTypeNAK(err)
|
||||
case list ⇒ sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", ""))
|
||||
}
|
||||
case _ ⇒
|
||||
val msgClass = x.getClass
|
||||
index find (_ isAssignableFrom msgClass) match {
|
||||
case None ⇒ default(x)
|
||||
case Some(cls) ⇒
|
||||
behavior(cls) match {
|
||||
case F1(f) ⇒ f(new Values.WrappedMessage[ChannelList](x))
|
||||
case F2(f) ⇒ f(x, new ChannelRef(sender))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def isDefinedAt(x: Any): Boolean = x match {
|
||||
case c: CheckType[_] ⇒ true
|
||||
case _ ⇒
|
||||
val msgClass = x.getClass
|
||||
index find (_ isAssignableFrom msgClass) match {
|
||||
case None ⇒ false
|
||||
case Some(cls) ⇒ true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object Channels {
|
||||
|
||||
type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit]
|
||||
|
||||
case class CheckType[T](tt: TypeTag[T])
|
||||
case object CheckTypeACK
|
||||
case class CheckTypeNAK(errors: String)
|
||||
case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace
|
||||
|
||||
/**
|
||||
* This macro transforms a channel[] call which returns “some” Behaviorist
|
||||
* into a _channel[] call with precise reply channel descriptors, so that the
|
||||
* partial function it is applied to can enjoy proper type checking.
|
||||
*
|
||||
* T is the message type
|
||||
* C is the channel list of the enclosing Channels
|
||||
* P is the parent channel list
|
||||
*/
|
||||
def channelImpl[T: c.WeakTypeTag, C <: ChannelList: c.WeakTypeTag, P <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Channels[P, C]
|
||||
}): c.Expr[Channels[P, C]#Behaviorist[Nothing, T]] = {
|
||||
|
||||
val tT = c.weakTypeOf[T]
|
||||
val tC = c.weakTypeOf[C]
|
||||
|
||||
import c.universe._
|
||||
|
||||
val undefined = missingChannels(c.universe)(tC, inputChannels(c.universe)(tT))
|
||||
if (undefined.nonEmpty) {
|
||||
c.error(c.enclosingPosition, s"no channel defined for types ${undefined mkString ", "}")
|
||||
reify(null)
|
||||
} else {
|
||||
checkUnique(c.universe)(tT, tC) foreach (c.error(c.enclosingPosition, _))
|
||||
val receive =
|
||||
if (tT <:< typeOf[ChannelList]) {
|
||||
appliedType(typeOf[Function1[_, _]].typeConstructor, List(
|
||||
appliedType(typeOf[Values.WrappedMessage[_]].typeConstructor, List(tT)),
|
||||
typeOf[Unit]))
|
||||
} else {
|
||||
val channels = toChannels(c.universe)(replyChannels(c.universe)(tC, tT))
|
||||
appliedType(typeOf[Function2[_, _, _]].typeConstructor, List(
|
||||
tT,
|
||||
appliedType(typeOf[ChannelRef[_]].typeConstructor, List(channels)),
|
||||
typeOf[Unit]))
|
||||
}
|
||||
c.Expr(
|
||||
Apply(
|
||||
Select(
|
||||
New(AppliedTypeTree(Select(c.prefix.tree, newTypeName("Behaviorist")), List(
|
||||
TypeTree().setType(receive),
|
||||
TypeTree().setType(tT)))),
|
||||
nme.CONSTRUCTOR),
|
||||
List(
|
||||
Block(List(
|
||||
If(reify(c.prefix.splice.channelListTypeTag == null).tree,
|
||||
Apply(
|
||||
Select(c.prefix.tree, "channelListTypeTag_$eq"),
|
||||
List(TypeApply(
|
||||
Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"),
|
||||
List(TypeTree().setType(c.weakTypeOf[C]))))),
|
||||
c.literalUnit.tree)),
|
||||
TypeApply(
|
||||
Select(Select(Select(Select(Select(Ident("scala"), "reflect"), "runtime"), nme.PACKAGE), "universe"), "typeTag"),
|
||||
List(TypeTree().setType(tT)))))))
|
||||
}
|
||||
}
|
||||
|
||||
def createChildImpl[C <: ChannelList: c.WeakTypeTag, Pa <: ChannelList: c.WeakTypeTag, Ch <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Channels[_, C]
|
||||
})(factory: c.Expr[Channels[Pa, Ch]]): c.Expr[ChannelRef[Ch]] = {
|
||||
|
||||
import c.universe._
|
||||
if (weakTypeOf[Pa] =:= weakTypeOf[Nothing]) {
|
||||
c.abort(c.enclosingPosition, "Parent argument must not be Nothing")
|
||||
}
|
||||
if (weakTypeOf[Ch] =:= weakTypeOf[Nothing]) {
|
||||
c.abort(c.enclosingPosition, "channel list must not be Nothing")
|
||||
}
|
||||
val missing = missingChannels(c.universe)(weakTypeOf[C], inputChannels(c.universe)(weakTypeOf[Pa]))
|
||||
if (missing.isEmpty) {
|
||||
implicit val t = c.TypeTag[Ch](c.weakTypeOf[Ch])
|
||||
reify(new ChannelRef[Ch](c.prefix.splice.context.actorOf(Props(factory.splice))))
|
||||
} else {
|
||||
c.error(c.enclosingPosition, s"This actor cannot support a child requiring channels ${missing mkString ", "}")
|
||||
reify(???)
|
||||
}
|
||||
}
|
||||
|
||||
def checkUnique(u: Universe)(channel: u.Type, list: u.Type): Option[String] = {
|
||||
val channels = inputChannels(u)(list) groupBy (_.erasure)
|
||||
val dupes = channels.get(channel.erasure).getOrElse(Nil).filterNot(_ =:= channel)
|
||||
if (dupes.isEmpty) None
|
||||
else Some(s"erasure ${channel.erasure} overlaps with declared channels ${dupes mkString ", "}")
|
||||
}
|
||||
|
||||
/**
|
||||
* check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings
|
||||
*/
|
||||
def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = {
|
||||
var errors = List.empty[String]
|
||||
for (in ← inputChannels(u)(target)) {
|
||||
val replies = replyChannels(u)(orig, in)
|
||||
if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in"
|
||||
else {
|
||||
val targetReplies = replyChannels(u)(target, in)
|
||||
val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _))
|
||||
if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in"
|
||||
val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t))
|
||||
if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in"
|
||||
}
|
||||
}
|
||||
errors.reverse
|
||||
}
|
||||
|
||||
/**
|
||||
* get all input channels from a ChannelList or return the given type
|
||||
*/
|
||||
final def inputChannels(u: Universe)(list: u.Type): List[u.Type] = {
|
||||
import u._
|
||||
val imp = u.mkImporter(ru)
|
||||
val cl = imp.importType(ru.typeOf[ChannelList])
|
||||
val tnil = imp.importType(ru.typeOf[TNil])
|
||||
def rec(l: u.Type, acc: List[u.Type]): List[u.Type] = l match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, if (acc contains in) acc else in :: acc)
|
||||
case last ⇒ if (last =:= tnil) acc.reverse else (last :: acc).reverse
|
||||
}
|
||||
if (list <:< cl) rec(list, Nil)
|
||||
else List(list)
|
||||
}
|
||||
|
||||
/**
|
||||
* find all input channels matching the given message type and return a
|
||||
* list of their respective reply channels
|
||||
*/
|
||||
final def replyChannels(u: Universe)(list: u.Type, msg: u.Type): List[u.Type] = {
|
||||
import u._
|
||||
def rec(l: Type, acc: List[Type]): List[Type] = {
|
||||
l match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: out :: Nil) :: tail :: Nil) if msg <:< in ⇒
|
||||
rec(tail, if (acc contains out) acc else out :: acc)
|
||||
case TypeRef(_, _, _ :: tail :: Nil) ⇒
|
||||
rec(tail, acc)
|
||||
case _ ⇒ acc.reverse
|
||||
}
|
||||
}
|
||||
val n = typeOf[Nothing]
|
||||
if (msg =:= n) List(n) else rec(list, Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* filter from the `required` list of types all which are subtypes of inputs of the ChannelList
|
||||
*/
|
||||
final def missingChannels(u: Universe)(channels: u.Type, required: List[u.Type]): List[u.Type] = {
|
||||
import u._
|
||||
// making the top-level method recursive blows up the compiler (when compiling the macro itself)
|
||||
def rec(ch: Type, req: List[Type]): List[Type] = {
|
||||
ch match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, req filterNot (_ <:< in))
|
||||
case last ⇒ req filterNot (_ <:< last)
|
||||
}
|
||||
}
|
||||
rec(channels, required)
|
||||
}
|
||||
|
||||
/**
|
||||
* convert a list of types List(<T1>, <T2>, ...) into a ChannelList
|
||||
* ( Channel[<T1>, Nothing] :=: Channel[<T2>, Nothing] :=: ... :=: TNil )
|
||||
*/
|
||||
final def toChannels(u: Universe)(list: List[u.Type]): u.Type = {
|
||||
import u._
|
||||
def rec(l: List[Type], acc: Type): Type = l match {
|
||||
case head :: (tail: List[Type]) ⇒
|
||||
if (head =:= weakTypeOf[Nothing]) rec(tail, acc)
|
||||
else
|
||||
rec(tail,
|
||||
appliedType(weakTypeOf[:+:[_, _]].typeConstructor, List(
|
||||
appliedType(weakTypeOf[Tuple2[_, _]].typeConstructor, List(
|
||||
head,
|
||||
weakTypeOf[Nothing])),
|
||||
acc)))
|
||||
case _ ⇒ acc
|
||||
}
|
||||
rec(list.reverse, weakTypeOf[TNil])
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
||||
import language.implicitConversions
|
||||
import akka.actor.ActorRef
|
||||
|
||||
package object channels {
|
||||
implicit def actorRef2Ops(ref: ActorRef) = new Values.ActorRefOps(ref)
|
||||
}
|
||||
|
||||
package channels {
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
|
||||
sealed trait ChannelList
|
||||
sealed trait TNil extends ChannelList
|
||||
sealed trait :+:[A <: (_, _), B <: ChannelList] extends ChannelList
|
||||
|
||||
object Values {
|
||||
class ActorRefOps(val ref: ActorRef) extends AnyVal {
|
||||
def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = {
|
||||
import Channels._
|
||||
ref ? CheckType(tt) map {
|
||||
case CheckTypeACK ⇒ new ChannelRef[C](ref)
|
||||
case CheckTypeNAK(error) ⇒ throw NarrowingException(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class WrappedMessage[T <: ChannelList](val value: Any) extends AnyVal
|
||||
}
|
||||
}
|
||||
|
|
@ -73,7 +73,7 @@ object AkkaBuild extends Build {
|
|||
generatedPdf in Sphinx <<= generatedPdf in Sphinx in LocalProject(docs.id) map identity
|
||||
|
||||
),
|
||||
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, macros, macroTests)
|
||||
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests)
|
||||
)
|
||||
|
||||
lazy val actor = Project(
|
||||
|
|
@ -409,19 +409,19 @@ object AkkaBuild extends Build {
|
|||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
lazy val macros = Project(
|
||||
id = "akka-macros",
|
||||
base = file("akka-macros"),
|
||||
lazy val channels = Project(
|
||||
id = "akka-channels",
|
||||
base = file("akka-channels"),
|
||||
dependencies = Seq(actor),
|
||||
settings = defaultSettings ++ Seq(
|
||||
libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _)
|
||||
)
|
||||
)
|
||||
|
||||
lazy val macroTests = Project(
|
||||
id = "akka-macro-tests",
|
||||
base = file("akka-macro-tests"),
|
||||
dependencies = Seq(macros, testkit % "compile;test->test"),
|
||||
lazy val channelsTests = Project(
|
||||
id = "akka-channels-tests",
|
||||
base = file("akka-channels-tests"),
|
||||
dependencies = Seq(channels, testkit % "compile;test->test"),
|
||||
settings = defaultSettings ++ Seq(
|
||||
libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-compiler" % _)
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue