review comments

- some API docs
- require names for top-level actors
- allow names for children
- flag error when no channels declared
This commit is contained in:
Roland 2013-01-31 18:59:48 +01:00
parent c362e8168f
commit 86ded1fb0b
9 changed files with 109 additions and 60 deletions

View file

@ -70,8 +70,8 @@ object ExecutionContexts {
* It is very useful for actions which are known to be non-blocking and * It is very useful for actions which are known to be non-blocking and
* non-throwing in order to save a round-trip to the thread pool. * non-throwing in order to save a round-trip to the thread pool.
*/ */
object sameThreadExecutionContext extends ExecutionContext { private[akka] object sameThreadExecutionContext extends ExecutionContext with BatchingExecutor {
override def execute(runnable: Runnable): Unit = runnable.run() override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(t: Throwable): Unit = override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("exception in sameThreadExecutionContext", t) throw new IllegalStateException("exception in sameThreadExecutionContext", t)
} }

View file

@ -126,7 +126,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"Actor with Channels" must { "Actor with Channels" must {
"construct refs" in { "construct refs" in {
val ref = ChannelExt(system).actorOf(new Tester) val ref = ChannelExt(system).actorOf(new Tester, "t1")
ref <-!- A ref <-!- A
expectMsg(C) expectMsg(C)
lastSender must be(ref.actorRef) lastSender must be(ref.actorRef)
@ -136,16 +136,16 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
} }
"select return channels" in { "select return channels" in {
val ref = ChannelExt(system).actorOf(new Tester) val ref = ChannelExt(system).actorOf(new Tester, "t2")
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor)) implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t3")
ref <-!- A ref <-!- A
expectMsg(C) expectMsg(C)
lastSender must be(selfChannel.actorRef) lastSender must be(selfChannel.actorRef)
} }
"correctly dispatch to subchannels" in { "correctly dispatch to subchannels" in {
val ref = ChannelExt(system).actorOf(new Tester) val ref = ChannelExt(system).actorOf(new Tester, "t4")
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor)) implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t5")
ref <-!- A2 ref <-!- A2
expectMsg(C1) expectMsg(C1)
lastSender must be(selfChannel.actorRef) lastSender must be(selfChannel.actorRef)
@ -281,7 +281,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
} }
"have a working selfChannel" in { "have a working selfChannel" in {
val ref = ChannelExt(system).actorOf(new Children) val ref = ChannelExt(system).actorOf(new Children, "t10")
ref <-!- A ref <-!- A
expectMsg(C) expectMsg(C)
} }
@ -292,7 +292,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
parentChannel <-!- A parentChannel <-!- A
}) })
channel[A] { (msg, snd) testActor ! msg } channel[A] { (msg, snd) testActor ! msg }
}) }, "t11")
expectMsg(A) expectMsg(A)
} }
@ -301,7 +301,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
eval(""" eval("""
|import akka.channels._ |import akka.channels._
|import ChannelSpec._ |import ChannelSpec._
|null.asInstanceOf[ChannelExtension].actorOf(new Actor with Channels[(A, A) :+: TNil, (A, Nothing) :+: TNil] {}) |null.asInstanceOf[ChannelExtension].actorOf(new Actor with Channels[(A, A) :+: TNil, (A, Nothing) :+: TNil] {}, "")
""".stripMargin) """.stripMargin)
}.message must include("type mismatch") }.message must include("type mismatch")
} }
@ -367,7 +367,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"support narrowing ActorRefs" in { "support narrowing ActorRefs" in {
import Channels._ import Channels._
val channel = ChannelExt(system).actorOf(new RecvC(testActor)) val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t15")
val ref = channel.actorRef val ref = channel.actorRef
implicit val t = Timeout(1.second.dilated) implicit val t = Timeout(1.second.dilated)
import system.dispatcher import system.dispatcher
@ -377,7 +377,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
} }
"deny wrong narrowing of ActorRefs" in { "deny wrong narrowing of ActorRefs" in {
val channel = ChannelExt(system).actorOf(new RecvC(testActor)) val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t16")
val ref = channel.actorRef val ref = channel.actorRef
implicit val t = Timeout(1.second.dilated) implicit val t = Timeout(1.second.dilated)
import system.dispatcher import system.dispatcher
@ -392,8 +392,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
} }
"allow wrapping of ChannelRefs with pass-through" in { "allow wrapping of ChannelRefs with pass-through" in {
val target = ChannelExt(system).actorOf(new RecvC(testActor)) val target = ChannelExt(system).actorOf(new RecvC(testActor), "t17")
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target)) val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target), "t18")
wrap <-!- C wrap <-!- C
expectMsg(C) expectMsg(C)
lastSender must be(target.actorRef) lastSender must be(target.actorRef)
@ -404,8 +404,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"allow wrapping of Actor with ChannelsRefs with replies" in { "allow wrapping of Actor with ChannelsRefs with replies" in {
val probe = TestProbe() val probe = TestProbe()
val target = ChannelExt(system).actorOf(new EchoTee(probe.ref)) val target = ChannelExt(system).actorOf(new EchoTee(probe.ref), "t19")
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, C](target)) val wrap = ChannelExt(system).actorOf(new WriteOnly[C, C](target), "t20")
C -!-> wrap C -!-> wrap
expectMsg(C1) expectMsg(C1)
expectMsg(C1) expectMsg(C1)
@ -413,14 +413,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
} }
"support typed ask" in { "support typed ask" in {
val t = ChannelExt(system).actorOf(new Tester) val t = ChannelExt(system).actorOf(new Tester, "t21")
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
val r: Future[C] = (t <-?- A).lub val r: Future[C] = (t <-?- A).lub
Await.result(r, 1.second) must be(C) Await.result(r, 1.second) must be(C)
} }
"support typed ask with multiple reply channels" in { "support typed ask with multiple reply channels" in {
val t = ChannelExt(system).actorOf(new SubChannels) val t = ChannelExt(system).actorOf(new SubChannels, "t22")
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
val r: Future[Msg] = (t <-?- A1).lub val r: Future[Msg] = (t <-?- A1).lub
Await.result(r, 1.second) must be(B) Await.result(r, 1.second) must be(B)
@ -462,8 +462,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"be able to forward fully generic channels" in { "be able to forward fully generic channels" in {
val cd = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, D) :+: TNil] { val cd = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, D) :+: TNil] {
channel[C] { (x, snd) snd <-!- D } channel[C] { (x, snd) snd <-!- D }
}) }, "t25")
val t = ChannelExt(system).actorOf(new Poly(cd)) val t = ChannelExt(system).actorOf(new Poly(cd), "t26")
t <-!- A t <-!- A
expectMsg(A) expectMsg(A)
t <-!- C t <-!- C
@ -479,8 +479,8 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
implicit val selfChannel = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, Nothing) :+:(D, Nothing) :+: TNil] { implicit val selfChannel = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, Nothing) :+:(D, Nothing) :+: TNil] {
channel[C] { (c, snd) testActor ! c } channel[C] { (c, snd) testActor ! c }
channel[D] { (d, snd) testActor ! d } channel[D] { (d, snd) testActor ! d }
}) }, "t27")
val t = ChannelExt(system).actorOf(new Tester) val t = ChannelExt(system).actorOf(new Tester, "t28")
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
val fa = Future successful a val fa = Future successful a
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B) val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
@ -548,7 +548,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"be askable to a ChannelRef" in { "be askable to a ChannelRef" in {
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
val t = ChannelExt(system).actorOf(new Tester) val t = ChannelExt(system).actorOf(new Tester, "t30")
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
val fa = Future successful a val fa = Future successful a
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B) val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
@ -608,7 +608,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"be LUBbable within a Future" in { "be LUBbable within a Future" in {
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
val t = ChannelExt(system).actorOf(new Tester) val t = ChannelExt(system).actorOf(new Tester, "t31")
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
(Await.result((a -?-> t).lub, timeout.duration): Msg) must be(C) (Await.result((a -?-> t).lub, timeout.duration): Msg) must be(C)
} }

View file

@ -17,9 +17,10 @@ object ChannelExt extends ExtensionKey[ChannelExtension]
class ChannelExtension(system: ExtendedActorSystem) extends Extension { class ChannelExtension(system: ExtendedActorSystem) extends Extension {
// kick-start the universe (needed due to thread safety issues in runtime mirror) // FIXME: kick-start the universe (needed due to thread safety issues in runtime mirror)
// see https://issues.scala-lang.org/browse/SI-6240
private val t = typeTag[(Int, Int) :+: TNil] private val t = typeTag[(Int, Int) :+: TNil]
def actorOf[Ch <: ChannelList](factory: Actor with Channels[TNil, Ch]): ChannelRef[Ch] = def actorOf[Ch <: ChannelList](factory: Actor with Channels[TNil, Ch], name: String): ChannelRef[Ch] =
new ChannelRef[Ch](system.actorOf(Props(factory))) new ChannelRef[Ch](system.actorOf(Props(factory), name))
} }

View file

@ -6,27 +6,38 @@ package akka.channels
import language.experimental.{ macros makkros } import language.experimental.{ macros makkros }
import akka.actor.ActorRef import akka.actor.ActorRef
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.macros.Context
import scala.annotation.tailrec
import scala.reflect.macros.Universe
import akka.actor.Actor
import akka.actor.ActorContext
import scala.concurrent.Future import scala.concurrent.Future
import akka.util.Timeout
import akka.AkkaException
import scala.util.control.NoStackTrace
case class NarrowingException(errors: String) extends AkkaException(errors) with NoStackTrace
/**
* A channel reference, holding a type list of all channels supported by the
* underlying actor. This actors reference can be obtained as the `actorRef`
* member.
*/
class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal { class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
/**
* Send a message over this channel, tell semantics, returning the message.
*/
def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M] def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M]
/**
* Eventually send the value contained in the future over this channel,
* tell semantics, returning a Future which is completed after sending
* with the value which was sent (Future.andThen semantics).
*/
def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M] def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M]
/**
* Send a message over this channel, ask semantics, returning a Future
* which will be completed with the reply message or a TimeoutException.
* If the message is a Future itself, eventually send the Futures value.
*/
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M] def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M]
/**
* Narrow this ChannelRef by removing channels or narrowing input types or
* widening output types.
*/
def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T] def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T]
} }

View file

@ -51,6 +51,13 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒
*/ */
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch] def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch]
/**
* Create a child actor with properly typed ChannelRef and the given name,
* verifying that this actor can handle everything which the child tries to
* send via its `parent` ChannelRef.
*/
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch], name: String): ChannelRef[Ch] = macro macros.CreateChild.implName[C, Pa, Ch]
/** /**
* Properly typed ChannelRef for the context.parent. * Properly typed ChannelRef for the context.parent.
*/ */
@ -131,6 +138,8 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒
private def verifyCompleteness() { private def verifyCompleteness() {
val channels = inputChannels(ru)(channelListTypeTag.tpe) val channels = inputChannels(ru)(channelListTypeTag.tpe)
if (channels.isEmpty)
throw ActorInitializationException("Actor with Channels cannot have no channels")
val classes = channels groupBy (e channelListTypeTag.mirror.runtimeClass(e.widen)) val classes = channels groupBy (e channelListTypeTag.mirror.runtimeClass(e.widen))
val missing = classes.keySet -- behavior.keySet val missing = classes.keySet -- behavior.keySet
if (missing.nonEmpty) { if (missing.nonEmpty) {
@ -162,10 +171,7 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒
case c: CheckType[_] true case c: CheckType[_] true
case _ case _
val msgClass = x.getClass val msgClass = x.getClass
index find (_ isAssignableFrom msgClass) match { index exists (_ isAssignableFrom msgClass)
case None false
case Some(cls) true
}
} }
} }
} }

View file

@ -12,6 +12,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.reflect.runtime.{ universe ru } import scala.reflect.runtime.{ universe ru }
import scala.util.Success import scala.util.Success
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import scala.util.control.NoStackTrace
sealed trait ChannelList sealed trait ChannelList
sealed trait TNil extends ChannelList sealed trait TNil extends ChannelList
@ -26,8 +27,23 @@ sealed trait ReplyChannels[T <: ChannelList] extends ChannelList
*/ */
sealed trait UnknownDoNotWriteMeDown sealed trait UnknownDoNotWriteMeDown
/**
* This exception is used to signal errors when trying to `.narrow` an
* ActorRef into a ChannelRef: if the actor finds the requested channel types
* incompatible with its selfChannel, it will return errors in the same format
* as would occur during compilation of a `ChannelRef.narrow` operation.
*/
case class NarrowingException(message: String) extends akka.AkkaException(message) with NoStackTrace
class ActorRefOps(val ref: ActorRef) extends AnyVal { class ActorRefOps(val ref: ActorRef) extends AnyVal {
import macros.Helpers._ import macros.Helpers._
/**
* Send a query to the actor and check whether it supports the requested
* channel types; the normal timeout semantics of the `ask` pattern apply.
* The Future will be completed either with the desired ChannelRef or with
* an exception (TimeoutException or NarrowingException).
*/
def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = { def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = {
import Channels._ import Channels._
ref ? CheckType(tt) map { ref ? CheckType(tt) map {

View file

@ -22,20 +22,35 @@ object CreateChild {
import c.universe._ import c.universe._
if (weakTypeOf[ParentChannels] =:= weakTypeOf[Nothing]) { verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels])
c.abort(c.enclosingPosition, "Parent argument must not be Nothing")
}
if (weakTypeOf[ChildChannels] =:= weakTypeOf[Nothing]) {
c.abort(c.enclosingPosition, "channel list must not be Nothing")
}
val missing = missingChannels(c.universe)(weakTypeOf[MyChannels], inputChannels(c.universe)(weakTypeOf[ParentChannels]))
if (missing.isEmpty) {
implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels]) implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels])
reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice)))) reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice))))
} else {
c.abort(c.enclosingPosition, s"This actor cannot support a child requiring channels ${missing mkString ", "}")
} }
def implName[MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag, ChildChannels <: ChannelList: c.WeakTypeTag](
c: Context {
type PrefixType = Actor with Channels[_, MyChannels]
})(factory: c.Expr[Actor with Channels[ParentChannels, ChildChannels]], name: c.Expr[String]): c.Expr[ChannelRef[ChildChannels]] = {
import c.universe._
verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels])
implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels])
reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice), name.splice)))
}
def verify(c: Context)(parent: c.Type, child: c.Type, mine: c.Type): Unit = {
import c.universe._
val nothing = weakTypeOf[Nothing]
if (parent =:= nothing) abort(c, "Parent argument must not be Nothing")
if (child =:= nothing) abort(c, "channel list must not be Nothing")
val missing = missingChannels(c.universe)(mine, inputChannels(c.universe)(parent))
if (missing.nonEmpty)
abort(c, s"This actor cannot support a child requiring channels ${missing mkString ", "}")
} }
} }

View file

@ -176,7 +176,7 @@ class ChannelDocSpec extends AkkaSpec {
// type given just for demonstration purposes // type given just for demonstration purposes
val latch: ChannelRef[(Request, Reply) :+: (String, Int) :+: TNil] = val latch: ChannelRef[(Request, Reply) :+: (String, Int) :+: TNil] =
ChannelExt(system).actorOf(new Latch(target)) ChannelExt(system).actorOf(new Latch(target), "latch")
"hello" -!-> latch "hello" -!-> latch
//#processing //#processing
@ -203,7 +203,7 @@ class ChannelDocSpec extends AkkaSpec {
// then it is used somewhat like this: // then it is used somewhat like this:
// //
val parent = ChannelExt(system).actorOf(new Parent) val parent = ChannelExt(system).actorOf(new Parent, "parent")
parent <-!- GetChild parent <-!- GetChild
val child = expectMsgType[ChildRef].child // this assumes TestKit context val child = expectMsgType[ChildRef].child // this assumes TestKit context

View file

@ -40,7 +40,7 @@ an error, since the companion object ``Command`` is not an instance of type
While this example looks pretty simple, the implications are profound. In order While this example looks pretty simple, the implications are profound. In order
to be useful, the system must be as reliable as you would expect a type system to be useful, the system must be as reliable as you would expect a type system
to be. This means that unless you step outside of the it (i.e. doing the to be. This means that unless you step outside of it (i.e. doing the
equivalent of ``.asInstanceOf[_]``) you shall be protected, failures shall be equivalent of ``.asInstanceOf[_]``) you shall be protected, failures shall be
recognized and flagged. There are a number of challenges included in this recognized and flagged. There are a number of challenges included in this
requirement, which are discussed in the following sections. If you are reading requirement, which are discussed in the following sections. If you are reading