Merge pull request #1097 from akka/wip-channels-∂π

add Typed Channels, see #2929
This commit is contained in:
Roland Kuhn 2013-02-01 06:37:41 -08:00
commit e06076553c
19 changed files with 2295 additions and 15 deletions

View file

@ -62,6 +62,19 @@ object ExecutionContexts {
* @return a reference to the global ExecutionContext
*/
def global(): ExecutionContext = ExecutionContext.global
/**
* WARNING: Not A General Purpose ExecutionContext!
*
* This is an execution context which runs everything on the calling thread.
* It is very useful for actions which are known to be non-blocking and
* non-throwing in order to save a round-trip to the thread pool.
*/
private[akka] object sameThreadExecutionContext extends ExecutionContext with BatchingExecutor {
override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("exception in sameThreadExecutionContext", t)
}
}
/**

View file

@ -13,20 +13,13 @@ import scala.concurrent.{ ExecutionContext, Future, Promise, Await }
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scala.util.Success
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
/**
* Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread
*/
object CircuitBreaker {
/**
* Synchronous execution context to run in caller's thread - used by companion object factory methods
*/
private[CircuitBreaker] val syncExecutionContext = new ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(t: Throwable): Unit = ()
}
/**
* Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed
* in Future when using withCircuitBreaker. To use another ExecutionContext for the callbacks you can specify the
@ -38,7 +31,7 @@ object CircuitBreaker {
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
*/
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(syncExecutionContext)
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext)
/**
* Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed
@ -301,7 +294,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
bodyFuture.onComplete({
case s: Success[_] if !deadline.isOverdue() callSucceeds()
case _ callFails()
})(CircuitBreaker.syncExecutionContext)
})(sameThreadExecutionContext)
bodyFuture
}

View file

@ -0,0 +1,21 @@
package akka.makkros
import language.experimental.macros
import scala.reflect.macros.Context
import scala.tools.reflect.ToolBox
import scala.reflect.ClassTag
import scala.tools.reflect.ToolBoxError
object Test {
def eval(code: String, compileOptions: String = "-cp akka-actor/target/classes:akka-channels/target/classes"): Any = {
val tb = mkToolbox(compileOptions)
tb.eval(tb.parse(code))
}
def mkToolbox(compileOptions: String = ""): ToolBox[_ <: scala.reflect.api.Universe] = {
val m = scala.reflect.runtime.currentMirror
m.mkToolBox(options = compileOptions)
}
}

View file

@ -0,0 +1,629 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels
import akka.testkit._
import akka.actor.ActorRef
import akka.makkros.Test._
import scala.tools.reflect.ToolBoxError
import scala.reflect.runtime.{ universe ru }
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.Failure
import akka.actor.ActorSystem
import scala.reflect.api.Universe
import scala.concurrent.Future
import akka.actor.ActorInitializationException
import akka.actor.Props
import akka.actor.Actor
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
import java.lang.reflect.InvocationTargetException
object ChannelSpec {
trait Msg
trait A extends Msg
object A extends A
object A1 extends A
object A2 extends A
trait B extends Msg
object B extends B
trait C extends Msg
object C extends C
object C1 extends C
trait D extends Msg
object D extends D
// used for sender verification in the first two test cases
class Tester extends Actor with Channels[TNil, (A, C) :+: (B, D) :+: TNil] {
channel[A.type] { (msg, snd) snd <-!- C }
channel[A] { (msg, snd) snd <-!- C1 }
channel[B] {
case (B, s) s <-!- D
}
}
class RecvC(ref: ActorRef) extends Actor with Channels[TNil, (C, Nothing) :+: TNil] {
channel[C] { case (x, _) ref ! x }
}
// pos compile test for multiple reply channels
class SubChannels extends Actor with Channels[TNil, (A, B) :+: (A, C) :+: TNil] {
channel[A] {
case (A1, x) B -!-> x
case (_, x) x <-!- C
}
}
// pos compile test for children
class Children extends Actor with Channels[TNil, (A, B) :+: (C, Nothing) :+: TNil] {
val c = createChild(new Actor with Channels[(A, Nothing) :+: TNil, (B, C) :+: TNil] {
channel[B] { case (B, s) s <-!- C }
})
var client: ActorRef = _
channel[A] {
case (A, s) c <-!- B; client = sender
}
channel[C] {
case (C, _) client ! C
}
createChild(new Actor with Channels[(C, Nothing) :+: TNil, TNil] {})
createChild(new Actor with Channels[(A, Nothing) :+:(C, Nothing) :+: TNil, TNil] {})
}
// compile test for polymorphic actors
class WriteOnly[T1: ru.TypeTag, T2: ru.TypeTag](target: ChannelRef[(T1, T2) :+: TNil]) extends Actor with Channels[TNil, (D, D) :+: (T1, T2) :+: TNil] {
implicit val t = Timeout(1.second)
import akka.pattern.ask
channel[D] { (d, snd) snd <-!- d }
channel[T1] { (x, snd) x -?-> target -!-> snd -!-> snd }
}
// compile test for whole-channel polymorphism
class Poly[T <: ChannelList: ru.TypeTag](target: ChannelRef[T]) extends Actor with Channels[TNil, (A, A) :+: (B, B) :+: T] {
implicit val timeout = Timeout(1.second)
channel[T] { (x, snd)
val xx: WrappedMessage[T, Any] = x
val f: Future[WrappedMessage[(ReplyChannels[T], Nothing) :+: TNil, ReplyChannels[T]]] = target <-?- x
f -!-> snd
}
import language.existentials
channel[(A, _) :+: (B, _) :+: TNil] { (x, snd)
val m: Msg = x.value
val c: ChannelRef[TNil] = snd
}
channel[A] { (x, snd) x -!-> snd }
}
// companion to WriteOnly for testing pass-through
class EchoTee(target: ActorRef) extends Actor with Channels[TNil, (C, C) :+: TNil] {
channel[C] { (c, snd) target ! C1; snd <-!- C1 }
}
class MissingChannel extends Actor with Channels[TNil, (A, A) :+: (B, B) :+: TNil] {
channel[A.type] { (_, _) }
}
}
class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, classOf[AkkaSpec].getClassLoader)) with ImplicitSender {
import ChannelSpec._
implicit val selfChannel = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
"Actor with Channels" must {
"construct refs" in {
val ref = ChannelExt(system).actorOf(new Tester, "t1")
ref <-!- A
expectMsg(C)
lastSender must be(ref.actorRef)
ref <-!- B
expectMsg(D)
lastSender must be(ref.actorRef)
}
"select return channels" in {
val ref = ChannelExt(system).actorOf(new Tester, "t2")
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t3")
ref <-!- A
expectMsg(C)
lastSender must be(selfChannel.actorRef)
}
"correctly dispatch to subchannels" in {
val ref = ChannelExt(system).actorOf(new Tester, "t4")
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t5")
ref <-!- A2
expectMsg(C1)
lastSender must be(selfChannel.actorRef)
}
"not permit wrong message type" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val c = new ChannelRef[TNil](null)
|new ChannelRef[(A, C) :+: TNil](null) <-!- B
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B.type")
}
"not permit wrong message type in complex channel" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val c = new ChannelRef[TNil](null)
|new ChannelRef[(A, C) :+: (B, D) :+: TNil](null) <-!- C
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.C.type")
}
"not permit unfit sender ref" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(C, D) :+: TNil](null)
|new ChannelRef[(A, B) :+: TNil](null) <-!- A
""".stripMargin)
}.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B")
}
"permit any sender for Nothing replies" in {
implicit val selfChannel = new ChannelRef[TNil](testActor)
new ChannelRef[(A, Nothing) :+: TNil](testActor) <-!- A
expectMsg(A)
}
"require complete reply type sets" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[TNil](null)
|new ChannelRef[(A, B) :+: (A, C) :+: TNil](null) <-!- A
""".stripMargin)
}.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B, akka.channels.ChannelSpec.C")
}
"verify ping-pong chains" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(B, B) :+: TNil](null)
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A
""".stripMargin)
}.message must include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.C")
}
"tolerate infinite ping-pong" in {
val ex = intercept[InvocationTargetException] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(B, B) :+: (C, B) :+: TNil](null)
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A
""".stripMargin)
}
def cause(ex: Throwable): Throwable =
if (ex.getCause == null) ex else cause(ex.getCause)
cause(ex).getClass must be(classOf[NullPointerException])
}
"not permit nonsensical channel declarations" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new Actor with Channels[TNil, (A, B) :+: TNil] {
| channel[B] {
| case (B, _) =>
| }
|}
""".stripMargin)
}.message must include("no channel defined for types akka.channels.ChannelSpec.B")
}
"not permit subchannel replies" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new Actor with Channels[TNil, (A, B) :+: (A1.type, C) :+: TNil] {
| channel[A] {
| case (A1, x) => x <-!- C
| }
|}
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.C.type")
}
"not permit Nothing children" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import akka.actor.Actor
|import ChannelSpec._
|new Actor with Channels[TNil, (A, B) :+: (C, D) :+: TNil] {
| createChild(new Actor with Channels[Nothing, Nothing] {})
|}
""".stripMargin)
}.message must include("Parent argument must not be Nothing")
}
"not permit too demanding children" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import akka.actor.Actor
|import ChannelSpec._
|new Actor with Channels[TNil, (A, B) :+: (C, D) :+: TNil] {
| createChild(new Actor with Channels[(B, Nothing) :+: TNil, TNil] {})
|}
""".stripMargin)
}.message must include("This actor cannot support a child requiring channels akka.channels.ChannelSpec.B")
}
"have a working selfChannel" in {
val ref = ChannelExt(system).actorOf(new Children, "t10")
ref <-!- A
expectMsg(C)
}
"have a working parentChannel" in {
val parent = ChannelExt(system).actorOf(new Actor with Channels[TNil, (A, Nothing) :+: TNil] {
createChild(new Actor with Channels[(A, Nothing) :+: TNil, TNil] {
parentChannel <-!- A
})
channel[A] { (msg, snd) testActor ! msg }
}, "t11")
expectMsg(A)
}
"not permit top-level Actor with Channels which send to parent" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|null.asInstanceOf[ChannelExtension].actorOf(new Actor with Channels[(A, A) :+: TNil, (A, Nothing) :+: TNil] {}, "")
""".stripMargin)
}.message must include("type mismatch")
}
"not permit sending wrong things to parents" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new Actor with Channels[TNil, (A, Nothing) :+: TNil] {
| createChild(new Actor with Channels[(A, Nothing) :+: TNil, TNil] {
| parentChannel <-!- B
| })
|}
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B.type")
}
"support narrowing of references" in {
val ref = new ChannelRef[(A, B) :+:(C, D) :+: TNil](null)
val n: ChannelRef[(A1.type, B) :+: TNil] = ref.narrow[(A1.type, B) :+: TNil]
}
"not allow narrowed refs to open new channels" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new ChannelRef[(A, C) :+: TNil](null).narrow[(A, C) :+: (B, C) :+: TNil]
""".stripMargin)
}.message must include("original ChannelRef does not support input type akka.channels.ChannelSpec.B")
}
"not allow narrowed refs to widen channels" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new ChannelRef[(A1.type, C) :+: TNil](null).narrow[(A, C) :+: TNil]
""".stripMargin)
}.message must include("original ChannelRef does not support input type akka.channels.ChannelSpec.A")
}
"not allow narrowed refs to miss reply channels" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new ChannelRef[(A, C) :+: (A, D) :+: TNil](null).narrow[(A, C) :+: TNil]
""".stripMargin)
}.message must include("reply types akka.channels.ChannelSpec.D not covered for channel akka.channels.ChannelSpec.A")
}
"not allow narrowed refs to narrow reply channels" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new ChannelRef[(A, C) :+: (B, D) :+: TNil](null).narrow[(A, C) :+: (A, Nothing) :+: TNil]
""".stripMargin)
}.message must include("reply types Nothing are superfluous for channel akka.channels.ChannelSpec.A")
}
"support narrowing ActorRefs" in {
import Channels._
val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t15")
val ref = channel.actorRef
implicit val t = Timeout(1.second.dilated)
import system.dispatcher
val r = Await.result(ref.narrow[(C, Nothing) :+: TNil], t.duration)
r <-!- C
expectMsg(C)
}
"deny wrong narrowing of ActorRefs" in {
val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t16")
val ref = channel.actorRef
implicit val t = Timeout(1.second.dilated)
import system.dispatcher
val f = ref.narrow[(D, Nothing) :+: TNil]
Await.ready(f, t.duration)
f.value.get must be(Failure(NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D")))
}
"be equal according to its actor" in {
val c1, c2 = new ChannelRef[TNil](testActor)
c1 must be === c2
}
"allow wrapping of ChannelRefs with pass-through" in {
val target = ChannelExt(system).actorOf(new RecvC(testActor), "t17")
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target), "t18")
wrap <-!- C
expectMsg(C)
lastSender must be(target.actorRef)
wrap <-!- D
expectMsg(D)
lastSender must be(wrap.actorRef)
}
"allow wrapping of Actor with ChannelsRefs with replies" in {
val probe = TestProbe()
val target = ChannelExt(system).actorOf(new EchoTee(probe.ref), "t19")
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, C](target), "t20")
C -!-> wrap
expectMsg(C1)
expectMsg(C1)
probe.expectMsg(C1)
}
"support typed ask" in {
val t = ChannelExt(system).actorOf(new Tester, "t21")
implicit val timeout = Timeout(1.second)
val r: Future[C] = (t <-?- A).lub
Await.result(r, 1.second) must be(C)
}
"support typed ask with multiple reply channels" in {
val t = ChannelExt(system).actorOf(new SubChannels, "t22")
implicit val timeout = Timeout(1.second)
val r: Future[Msg] = (t <-?- A1).lub
Await.result(r, 1.second) must be(B)
}
"check that channels do not erase to the same types" in {
val m = intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|new Actor with Channels[TNil, (List[A], A) :+: (List[B], B) :+: TNil] {
| channel[List[A]] { (x, s) }
| channel[List[B]] { (x, s) }
|}
""".stripMargin)
}.message
m must include("erasure List[Any] overlaps with declared channels List[akka.channels.ChannelSpec.A]")
m must include("erasure List[Any] overlaps with declared channels List[akka.channels.ChannelSpec.B]")
}
"check that all channels were declared" in {
EventFilter[ActorInitializationException](occurrences = 1) intercept {
system.actorOf(Props(new Actor {
context.actorOf(Props[MissingChannel])
override val supervisorStrategy = OneForOneStrategy() {
case ex: ActorInitializationException testActor ! ex.getCause; Stop
}
def receive = {
case _
}
}))
}
val m = expectMsgType[ActorInitializationException].getMessage
m must include("missing declarations for channels")
m must include("akka.channels.ChannelSpec.A")
m must include("akka.channels.ChannelSpec.B")
}
"be able to forward fully generic channels" in {
val cd = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, D) :+: TNil] {
channel[C] { (x, snd) snd <-!- D }
}, "t25")
val t = ChannelExt(system).actorOf(new Poly(cd), "t26")
t <-!- A
expectMsg(A)
t <-!- C
expectMsg(D)
lastSender must be === t.actorRef
}
}
"A WrappedMessage" must {
"be sendable to a ChannelRef" in {
implicit val selfChannel = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, Nothing) :+:(D, Nothing) :+: TNil] {
channel[C] { (c, snd) testActor ! c }
channel[D] { (d, snd) testActor ! d }
}, "t27")
val t = ChannelExt(system).actorOf(new Tester, "t28")
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
val fa = Future successful a
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
val fb = Future successful b
t <-!- a
expectMsg(C)
a -!-> t
expectMsg(C)
t <-!- b
expectMsg(D)
b -!-> t
expectMsg(D)
t <-!- fa
expectMsg(C)
fa -!-> t
expectMsg(C)
t <-!- fb
expectMsg(D)
fb -!-> t
expectMsg(D)
}
"not be sendable with wrong channels" when {
"sending wrong first directly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|new ChannelRef[(A, Nothing) :+: TNil](null) <-!- new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
"sending wrong first indirectly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null) -!-> new ChannelRef[(A, Nothing) :+: TNil](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
"sending wrong second directly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|new ChannelRef[(A, Nothing) :+: TNil](null) <-!- new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
"sending wrong second indirectly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null) -!-> new ChannelRef[(A, Nothing) :+: TNil](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
}
"be askable to a ChannelRef" in {
implicit val timeout = Timeout(1.second)
val t = ChannelExt(system).actorOf(new Tester, "t30")
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
val fa = Future successful a
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
val fb = Future successful b
(Await.result(t <-?- a, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
(Await.result(a -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
(Await.result(t <-?- b, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
(Await.result(b -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
(Await.result(t <-?- fa, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
(Await.result(fa -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
(Await.result(t <-?- fb, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
(Await.result(fb -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
}
"not be askable with wrong channels" when {
"sending wrong first directly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val t = akka.util.Timeout(null)
|new ChannelRef[(A, Nothing) :+: TNil](null) <-?- new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
"sending wrong first indirectly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val t = akka.util.Timeout(null)
|new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null) -?-> new ChannelRef[(A, Nothing) :+: TNil](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
"sending wrong second directly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val t = akka.util.Timeout(null)
|new ChannelRef[(A, Nothing) :+: TNil](null) <-?- new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
"sending wrong second indirectly" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|implicit val t = akka.util.Timeout(null)
|new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null) -?-> new ChannelRef[(A, Nothing) :+: TNil](null)
""".stripMargin)
}.message must include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
}
}
"be LUBbable within a Future" in {
implicit val timeout = Timeout(1.second)
val t = ChannelExt(system).actorOf(new Tester, "t31")
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
(Await.result((a -?-> t).lub, timeout.duration): Msg) must be(C)
}
"not be LUBbable if not wrapped" in {
intercept[ToolBoxError] {
eval("""
|import akka.channels._
|import ChannelSpec._
|import scala.concurrent.Future
|null.asInstanceOf[Future[Int]].lub
""".stripMargin)
}.message must include("Cannot prove that Int <:< akka.channels.WrappedMessage")
}
}
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ExtendedActorSystem
import scala.reflect.runtime.universe._
import akka.actor.Props
import scala.reflect.ClassTag
import scala.reflect.runtime.universe
import akka.actor.Actor
object ChannelExt extends ExtensionKey[ChannelExtension]
class ChannelExtension(system: ExtendedActorSystem) extends Extension {
// FIXME: kick-start the universe (needed due to thread safety issues in runtime mirror)
// see https://issues.scala-lang.org/browse/SI-6240
private val t = typeTag[(Int, Int) :+: TNil]
def actorOf[Ch <: ChannelList](factory: Actor with Channels[TNil, Ch], name: String): ChannelRef[Ch] =
new ChannelRef[Ch](system.actorOf(Props(factory), name))
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels
import language.experimental.{ macros makkros }
import akka.actor.ActorRef
import scala.concurrent.Future
/**
* A channel reference, holding a type list of all channels supported by the
* underlying actor. This actors reference can be obtained as the `actorRef`
* member.
*/
class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
/**
* Send a message over this channel, tell semantics, returning the message.
*/
def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M]
/**
* Eventually send the value contained in the future over this channel,
* tell semantics, returning a Future which is completed after sending
* with the value which was sent (Future.andThen semantics).
*/
def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M]
/**
* Send a message over this channel, ask semantics, returning a Future
* which will be completed with the reply message or a TimeoutException.
* If the message is a Future itself, eventually send the Futures value.
*/
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M]
/**
* Narrow this ChannelRef by removing channels or narrowing input types or
* widening output types.
*/
def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T]
}

View file

@ -0,0 +1,181 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels
import language.experimental.{ macros makkros }
import akka.actor.{ Actor, ActorRef }
import scala.reflect.macros.Context
import scala.reflect.runtime.{ universe ru }
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.api.Universe
import scala.runtime.AbstractPartialFunction
import akka.actor.Props
import scala.collection.immutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{ classTag, ClassTag }
import scala.concurrent.{ ExecutionContext, Future }
import akka.util.Timeout
import akka.pattern.ask
import scala.util.control.NoStackTrace
import akka.AkkaException
import akka.actor.ExtendedActorSystem
import akka.actor.ActorInitializationException
/**
* Typed channels atop untyped actors.
*
* The idea is that the actor declares all its input types up front, including
* what it expects the sender to handle wrt.replies, and then ChannelRef
* carries this information for statically verifying that messages sent to an
* actor have an actual chance of being processed.
*
* There are several implementation-imposed restrictions:
*
* - not two channels with different input types may have the same erased
* type; this is currently not enforced at compile time (and leads to
* channels being ignored at runtime)
* - messages received by the actor are dispatched to channels based on the
* erased type, which may be less precise than the actual channel type; this
* can lead to ClassCastExceptions if sending through the untyped ActorRef
*/
trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor
import macros.Helpers._
/**
* Create a child actor with properly typed ChannelRef, verifying that this
* actor can handle everything which the child tries to send via its
* `parent` ChannelRef.
*/
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch]
/**
* Create a child actor with properly typed ChannelRef and the given name,
* verifying that this actor can handle everything which the child tries to
* send via its `parent` ChannelRef.
*/
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch], name: String): ChannelRef[Ch] = macro macros.CreateChild.implName[C, Pa, Ch]
/**
* Properly typed ChannelRef for the context.parent.
*/
final def parentChannel: ChannelRef[P] = new ChannelRef(context.parent)
/**
* The properly typed self-channel is used implicitly when sending to other
* typed channels for verifying that replies can be handled.
*/
implicit final def selfChannel = new ChannelRef[C](self)
/*
* This map holds the current behavior for each erasure-tagged channel; the
* basic receive impl will dispatch incoming messages according to the most
* specific erased type in this map.
*/
private var behavior = Map.empty[Class[_], FF]
/**
* Functions for storage in the behavior, to get around erasure
*/
private trait FF
private case class F1(f: (WrappedMessage[ChannelList, Any], ChannelRef[ChannelList]) Unit) extends FF
private case class F2(f: (Any, ChannelRef[ChannelList]) Unit) extends FF
/**
* Declare an input channel of the given type; the returned object takes a partial function:
*
* {{{
* channel[A] {
* case (a, s) =>
* // a is of type A and
* // s is a ChannelRef for the sender, capable of sending the declared reply type for A
* }
* }}}
*/
def channel[T]: (Nothing Unit) = macro macros.Channel.impl[Any, ChannelList, ChannelList, T, C, P]
def behaviorist[R, Ch: ru.TypeTag](wrapped: Boolean): (R Unit) = new Behaviorist[R, Ch](wrapped)
private class Behaviorist[-R, Ch: ru.TypeTag](wrapped: Boolean) extends (R Unit) {
private def ff(recv: R): FF =
if (wrapped)
F1(recv.asInstanceOf[(WrappedMessage[ChannelList, Any], ChannelRef[ChannelList]) Unit])
else
F2(recv.asInstanceOf[(Any, ChannelRef[ChannelList]) Unit])
def apply(recv: R): Unit = {
val tt = implicitly[ru.TypeTag[Ch]]
behavior ++= (for (t inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> ff(recv))
}
}
/*
* HORRIBLE HACK AHEAD
*
* Id like to keep this a trait, but traits cannot have constructor
* arguments, not even TypeTags.
*/
var channelListTypeTag: TypeTag[C] = _
/**
* Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort).
*/
private def sortClasses(in: Iterable[Class[_]]): immutable.Seq[Class[_]] =
(new ArrayBuffer[Class[_]](in.size) /: in) { (buf, cls)
buf.indexWhere(_ isAssignableFrom cls) match {
case -1 buf append cls
case x buf insert (x, cls)
}
buf
}.to[immutable.IndexedSeq]
final lazy val receive = new AbstractPartialFunction[Any, Unit] {
val index = sortClasses(behavior.keys)
if (channelListTypeTag != null) verifyCompleteness()
private def verifyCompleteness() {
val channels = inputChannels(ru)(channelListTypeTag.tpe)
if (channels.isEmpty)
throw ActorInitializationException("Actor with Channels cannot have no channels")
val classes = channels groupBy (e channelListTypeTag.mirror.runtimeClass(e.widen))
val missing = classes.keySet -- behavior.keySet
if (missing.nonEmpty) {
val m = missing.map(classes).flatten
throw ActorInitializationException(s"missing declarations for channels ${m mkString ", "}")
}
}
override def applyOrElse[A, B >: Unit](x: A, default: A B): B = x match {
case CheckType(tt)
narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match {
case Nil sender ! CheckTypeACK
case err :: Nil sender ! CheckTypeNAK(err)
case list sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", ""))
}
case _
val msgClass = x.getClass
index find (_ isAssignableFrom msgClass) match {
case None default(x)
case Some(cls)
behavior(cls) match {
case F1(f) f(new WrappedMessage[ChannelList, Any](x), new ChannelRef(sender))
case F2(f) f(x, new ChannelRef(sender))
}
}
}
def isDefinedAt(x: Any): Boolean = x match {
case c: CheckType[_] true
case _
val msgClass = x.getClass
index exists (_ isAssignableFrom msgClass)
}
}
}
object Channels {
}

View file

@ -0,0 +1,70 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels
import language.experimental.{ macros makkros }
import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.{ ExecutionContext, Future }
import scala.reflect.runtime.{ universe ru }
import scala.util.Success
import akka.dispatch.ExecutionContexts
import scala.util.control.NoStackTrace
sealed trait ChannelList
sealed trait TNil extends ChannelList
sealed trait :+:[A <: (_, _), B <: ChannelList] extends ChannelList
sealed trait ReplyChannels[T <: ChannelList] extends ChannelList
/**
* This type is used to stand in for the unknown reply types of the fabricated
* sender references; users dont need to write it down, and if they do, they
* know that theyre cheating (since these ref types must not escape their
* defining actor context).
*/
sealed trait UnknownDoNotWriteMeDown
/**
* This exception is used to signal errors when trying to `.narrow` an
* ActorRef into a ChannelRef: if the actor finds the requested channel types
* incompatible with its selfChannel, it will return errors in the same format
* as would occur during compilation of a `ChannelRef.narrow` operation.
*/
case class NarrowingException(message: String) extends akka.AkkaException(message) with NoStackTrace
class ActorRefOps(val ref: ActorRef) extends AnyVal {
import macros.Helpers._
/**
* Send a query to the actor and check whether it supports the requested
* channel types; the normal timeout semantics of the `ask` pattern apply.
* The Future will be completed either with the desired ChannelRef or with
* an exception (TimeoutException or NarrowingException).
*/
def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = {
import Channels._
ref ? CheckType(tt) map {
case CheckTypeACK new ChannelRef[C](ref)
case CheckTypeNAK(error) throw NarrowingException(error)
}
}
}
class FutureOps[T](val future: Future[T]) extends AnyVal {
def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureOpsImpl[C, T]
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[ChannelList, Any, C, T]
def lub[LUB](implicit ev: T <:< WrappedMessage[_, LUB]): Future[LUB] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future map (ev(_).value)
}
}
class AnyOps[T](val value: T) extends AnyVal {
def -!->[C <: ChannelList](channel: ChannelRef[C]): T = macro macros.Tell.opsImpl[C, T]
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.opsImpl[ChannelList, Any, C, T]
}
class WrappedMessage[T <: ChannelList, LUB](val value: LUB) extends AnyVal

View file

@ -0,0 +1,127 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels.macros
import akka.channels._
import scala.concurrent.Future
import akka.util.Timeout
import scala.reflect.runtime.{ universe ru }
import ru.TypeTag
import scala.reflect.macros.Context
import scala.reflect.api.Universe
import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts
import scala.reflect.api.{ TypeCreator }
object Ask {
import Helpers._
def impl[ //
ReturnChannels <: ChannelList, // the precise type union describing the reply
ReturnLUB, // the least-upper bound for the reply types
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context {
type PrefixType = ChannelRef[Channel]
})(msg: c.Expr[Msg]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = {
import c.universe._
val tpeChannel = weakTypeOf[Channel]
val tpeMsg = weakTypeOf[Msg]
val isFuture = tpeMsg <:< typeOf[Future[_]]
val unwrapped =
if (isFuture)
tpeMsg match {
case TypeRef(_, _, x :: _) unwrapMsgType(c.universe)(x)
}
else unwrapMsgType(c.universe)(tpeMsg)
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
if (isFuture)
if (unwrapped <:< typeOf[ChannelList])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
else
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
}
def opsImpl[ //
ReturnChannels <: ChannelList, // the precise type union describing the reply
ReturnLUB, // the least-upper bound for the reply types
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context {
type PrefixType = AnyOps[Msg]
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = {
import c.universe._
val tpeChannel = weakTypeOf[Channel]
val tpeMsg = weakTypeOf[Msg]
val unwrapped = unwrapMsgType(c.universe)(tpeMsg)
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
val msg = reify(c.prefix.splice.value)
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
}
// this is the implementation for Future[_] -?-> ChannelRef[_]
def futureImpl[ //
ReturnChannels <: ChannelList, // the precise type union describing the reply
ReturnLUB, // the least-upper bound for the reply types
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context {
type PrefixType = FutureOps[Msg]
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = {
import c.universe._
val tpeChannel = weakTypeOf[Channel]
val tpeMsg = weakTypeOf[Msg]
val unwrapped = unwrapMsgType(c.universe)(tpeMsg)
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
}
val wrapMessage = (m: Any) (new WrappedMessage[TNil, Any](m): Any)
@inline def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
akka.pattern.ask(target, msg).map(wrapMessage).asInstanceOf[Future[T]]
}
def askFuture[T <: WrappedMessage[_, _]](target: ActorRef, future: Future[_])(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future flatMap (m akka.pattern.ask(target, m).map(wrapMessage).asInstanceOf[Future[T]])
}
def askFutureWrapped[T <: WrappedMessage[_, _]](target: ActorRef, future: Future[WrappedMessage[_, _]])(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future flatMap (w akka.pattern.ask(target, w.value).map(wrapMessage).asInstanceOf[Future[T]])
}
}

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels.macros
import akka.channels._
import scala.reflect.runtime.universe
import scala.reflect.macros.Context
import scala.reflect.api.Universe
object Channel {
import Helpers._
/**
* This macro transforms a channel[] call which returns some Behaviorist
* into a _channel[] call with precise reply channel descriptors, so that the
* partial function it is applied to can enjoy proper type checking.
*
* T is the message type
* C is the channel list of the enclosing Channels
* P is the parent channel list
*/
def impl[LUB, ReplyChannels <: ChannelList, MsgTChan <: ChannelList, MsgT: c.WeakTypeTag, MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag](
c: Context {
type PrefixType = Channels[ParentChannels, MyChannels]
}): c.Expr[(Nothing Unit)] = {
val tpeMsgT = c.weakTypeOf[MsgT]
val tpeMyChannels = c.weakTypeOf[MyChannels]
import c.universe._
val undefined = missingChannels(c.universe)(tpeMyChannels, inputChannels(c.universe)(tpeMsgT))
if (undefined.nonEmpty) {
c.abort(c.enclosingPosition, s"no channel defined for types ${undefined mkString ", "}")
} else {
checkUnique(c.universe)(tpeMsgT, tpeMyChannels) foreach (c.error(c.enclosingPosition, _))
// need to calculate the intersection of the reply channel sets for all input channels
val intersection = inputChannels(c.universe)(tpeMsgT) map (replyChannels(c.universe)(tpeMyChannels, _).toSet) reduce (_ intersect _)
val channels = toChannels(c.universe)(intersection.toList, weakTypeOf[UnknownDoNotWriteMeDown])
implicit val ttMyChannels = c.TypeTag[MyChannels](tpeMyChannels)
implicit val ttReplyChannels = c.TypeTag[ReplyChannels](channels)
implicit val ttMsgT = c.TypeTag[MsgT](tpeMsgT)
val prepTree = reify(if (c.prefix.splice.channelListTypeTag == null)
c.prefix.splice.channelListTypeTag = universe.typeTag[MyChannels])
if (tpeMsgT <:< typeOf[ChannelList]) {
implicit val ttMsgTChan = c.TypeTag[MsgTChan](tpeMsgT) // this is MsgT reinterpreted as <: ChannelList
implicit val ttLUB = inputChannels(c.universe)(tpeMsgT) match {
case x :: Nil c.TypeTag[LUB](typeOf[Any])
case xs c.TypeTag[LUB](c.universe.lub(xs))
}
reify {
prepTree.splice
c.prefix.splice.behaviorist[(WrappedMessage[MsgTChan, LUB], ChannelRef[ReplyChannels]) Unit, MsgT](
bool(c, true).splice)(universe.typeTag[MsgT])
}
} else
reify {
prepTree.splice
c.prefix.splice.behaviorist[(MsgT, ChannelRef[ReplyChannels]) Unit, MsgT](
bool(c, false).splice)(universe.typeTag[MsgT])
}
}
}
}

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels.macros
import akka.channels._
import scala.reflect.runtime.{ universe ru }
import ru.TypeTag
import scala.reflect.macros.Context
import scala.reflect.api.Universe
import akka.actor.Props
import akka.actor.Actor
object CreateChild {
import Helpers._
def impl[MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag, ChildChannels <: ChannelList: c.WeakTypeTag](
c: Context {
type PrefixType = Actor with Channels[_, MyChannels]
})(factory: c.Expr[Actor with Channels[ParentChannels, ChildChannels]]): c.Expr[ChannelRef[ChildChannels]] = {
import c.universe._
verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels])
implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels])
reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice))))
}
def implName[MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag, ChildChannels <: ChannelList: c.WeakTypeTag](
c: Context {
type PrefixType = Actor with Channels[_, MyChannels]
})(factory: c.Expr[Actor with Channels[ParentChannels, ChildChannels]], name: c.Expr[String]): c.Expr[ChannelRef[ChildChannels]] = {
import c.universe._
verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels])
implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels])
reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice), name.splice)))
}
def verify(c: Context)(parent: c.Type, child: c.Type, mine: c.Type): Unit = {
import c.universe._
val nothing = weakTypeOf[Nothing]
if (parent =:= nothing) abort(c, "Parent argument must not be Nothing")
if (child =:= nothing) abort(c, "channel list must not be Nothing")
val missing = missingChannels(c.universe)(mine, inputChannels(c.universe)(parent))
if (missing.nonEmpty)
abort(c, s"This actor cannot support a child requiring channels ${missing mkString ", "}")
}
}

View file

@ -0,0 +1,160 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels.macros
import akka.AkkaException
import scala.util.control.NoStackTrace
import akka.channels._
import scala.reflect.runtime.{ universe ru }
import ru.TypeTag
import scala.reflect.macros.Context
import scala.reflect.api.Universe
import scala.reflect.api.TypeCreator
object Helpers {
type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit]
case class CheckType[T](tt: TypeTag[T])
case object CheckTypeACK
case class CheckTypeNAK(errors: String)
def error(c: Context, msg: String) = c.error(c.enclosingPosition, msg)
def abort(c: Context, msg: String) = c.abort(c.enclosingPosition, msg)
def imp[T: c.WeakTypeTag](c: Context): c.Expr[T] = {
import c.universe._
c.Expr[T](TypeApply(Ident("implicitly"), List(TypeTree().setType(weakTypeOf[T]))))
}
def bool(c: Context, b: Boolean): c.Expr[Boolean] = c.Expr[Boolean](c.universe.Literal(c.universe.Constant(b)))
def checkUnique(u: Universe)(channel: u.Type, list: u.Type): Option[String] = {
val channels = inputChannels(u)(list) groupBy (_.erasure)
val dupes = channels.get(channel.erasure).getOrElse(Nil).filterNot(_ =:= channel)
if (dupes.isEmpty) None
else Some(s"erasure ${channel.erasure} overlaps with declared channels ${dupes mkString ", "}")
}
/**
* check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings
*/
def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = {
var errors = List.empty[String]
for (in inputChannels(u)(target)) {
val replies = replyChannels(u)(orig, in)
if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in"
else {
val targetReplies = replyChannels(u)(target, in)
val unsatisfied = replies filterNot (r targetReplies exists (r <:< _))
if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in"
val leftovers = targetReplies filterNot (t replies exists (_ <:< t))
if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in"
}
}
errors.reverse
}
/**
* get all input channels from a ChannelList or return the given type
*/
final def inputChannels(u: Universe)(list: u.Type): List[u.Type] = {
import u._
val imp = u.mkImporter(ru)
val tpeChannelList = imp.importType(ru.typeOf[ChannelList])
val tpeTNil = imp.importType(ru.typeOf[TNil])
def rec(l: u.Type, acc: List[u.Type]): List[u.Type] = l match {
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) rec(tail, if (acc contains in) acc else in :: acc)
case TypeRef(_, _, ExistentialType(_, TypeRef(_, _, in :: _)) :: tail :: Nil) rec(tail, if (acc contains in) acc else in :: acc)
case ExistentialType(_, x) rec(x, acc)
case last if (last =:= tpeTNil) acc.reverse else (last :: acc).reverse
}
if (list <:< tpeChannelList) rec(list, Nil)
else List(list)
}
/**
* find all input channels matching the given message type and return a
* list of their respective reply channels
*/
final def replyChannels(u: Universe)(list: u.Type, msg: u.Type): List[u.Type] = {
import u._
val imp = u.mkImporter(ru)
val tpeReplyTypes = imp.importType(ru.typeOf[ReplyChannels[_]])
val tpeTNil = imp.importType(ru.typeOf[TNil])
val msgTypes = inputChannels(u)(msg)
def rec(l: Type, acc: List[Type]): List[Type] = {
l match {
case TypeRef(_, _, TypeRef(_, _, in :: out :: Nil) :: tail :: Nil) if msgTypes exists (_ <:< in)
rec(tail, if (acc contains out) acc else out :: acc)
case TypeRef(_, _, _ :: tail :: Nil)
rec(tail, acc)
case x if x =:= tpeTNil acc.reverse
case x if x <:< tpeReplyTypes throw new IllegalArgumentException("cannot compute the ReplyChannels of a ReplyChannels type")
case x @ TypeRef(NoPrefix, _, Nil)
acc reverse_::: (if (msgTypes exists (_ <:< x)) appliedType(tpeReplyTypes.typeConstructor, List(x)) :: Nil else Nil)
case x throw new IllegalArgumentException(s"no idea what this type is: $x")
}
}
val n = typeOf[Nothing]
if (msg =:= n) List(n) else rec(list, Nil)
}
/**
* filter from the `required` list of types all which are subtypes of inputs of the ChannelList
*/
final def missingChannels(u: Universe)(channels: u.Type, required: List[u.Type]): List[u.Type] = {
import u._
// making the top-level method recursive blows up the compiler (when compiling the macro itself)
def rec(ch: Type, req: List[Type]): List[Type] = {
ch match {
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) rec(tail, req filterNot (_ <:< in))
case last req filterNot (_ <:< last)
}
}
rec(channels, required)
}
/**
* convert a list of types List(<T1>, <T2>, ...) into a ChannelList
* ( Channel[<T1>, Nothing] :=: Channel[<T2>, Nothing] :=: ... :=: TNil )
*/
final def toChannels(u: Universe)(list: List[u.Type], out: u.Type): u.Type = {
import u._
def rec(l: List[Type], acc: Type): Type = l match {
case head :: (tail: List[Type])
if (head =:= weakTypeOf[Nothing]) rec(tail, acc)
else
rec(tail,
appliedType(weakTypeOf[:+:[_, _]].typeConstructor, List(
appliedType(weakTypeOf[Tuple2[_, _]].typeConstructor, List(
head,
out)),
acc)))
case _ acc
}
rec(list.reverse, weakTypeOf[TNil])
}
/**
* takes a message tpe and tree and returns an expression which yields the
* underlying message (i.e. unwraps WrappedMessage if necessary)
*/
final def toMsg(c: Context)(tree: c.Expr[Any], tpe: c.Type): c.Expr[Any] = {
import c.universe._
if (tpe <:< c.typeOf[WrappedMessage[_, _]])
c.universe.reify(tree.splice.asInstanceOf[WrappedMessage[TNil, Any]].value)
else tree
}
final def unwrapMsgType(u: Universe)(msg: u.Type): u.Type = {
import u._
msg match {
case TypeRef(_, _, x :: _) if msg <:< typeOf[WrappedMessage[_, _]] x
case x x
}
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels.macros
import akka.channels._
import scala.reflect.runtime.{ universe ru }
import ru.TypeTag
import scala.reflect.macros.Context
import scala.reflect.api.Universe
object Narrow {
import Helpers._
def impl[Desired <: ChannelList: c.WeakTypeTag, MyChannels <: ChannelList: c.WeakTypeTag](
c: Context {
type PrefixType = ChannelRef[MyChannels]
}): c.Expr[ChannelRef[Desired]] = {
import c.{ universe u }
narrowCheck(u)(u.weakTypeOf[MyChannels], u.weakTypeOf[Desired]) match {
case Nil // okay
case err :: Nil c.error(c.enclosingPosition, err)
case list c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", ""))
}
u.reify(c.prefix.splice.asInstanceOf[ChannelRef[Desired]])
}
}

View file

@ -0,0 +1,109 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.channels.macros
import akka.channels._
import akka.actor._
import scala.reflect.runtime.{ universe ru }
import ru.TypeTag
import scala.reflect.macros.Context
import scala.reflect.api.Universe
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import akka.dispatch.ExecutionContexts
object Tell {
import Helpers._
def impl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
type PrefixType = ChannelRef[MyChannels]
})(msg: c.Expr[Msg]): c.Expr[Msg] =
doTell(c)(c.universe.weakTypeOf[MyChannels], c.universe.weakTypeOf[Msg], msg, c.prefix)
def opsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
type PrefixType = AnyOps[Msg]
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = {
import c.universe._
doTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.value), channel)
}
def doTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)(
tpeMyChannels: c.Type, tpeMsg: c.Type, msg: c.Expr[Msg], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = {
val (tpeSender, senderTree, sender) = getSenderChannel(c)
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
val cond = bool(c, tpeMsg <:< c.typeOf[WrappedMessage[_, _]])
c.universe.reify {
val $m = msg.splice
target.splice.actorRef.tell(if (cond.splice) $m.asInstanceOf[WrappedMessage[TNil, Any]].value else $m, sender.splice)
$m
}
}
def futureOpsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
type PrefixType = FutureOps[Msg]
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
import c.universe._
doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.future), channel)
}
def futureImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
type PrefixType = ChannelRef[MyChannels]
})(future: c.Expr[Future[Msg]]): c.Expr[Future[Msg]] = {
import c.universe._
doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], future, c.prefix)
}
def doFutureTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)(
tpeMyChannels: c.Type, tpeMsg: c.Type, future: c.Expr[Future[Msg]], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
val (tpeSender, senderTree, sender) = getSenderChannel(c)
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
c.universe.reify(pipeTo[Msg](future.splice, target.splice, sender.splice))
}
@inline def pipeTo[Msg](f: Future[Msg], c: ChannelRef[_], snd: ActorRef): Future[Msg] =
f.future.andThen {
case Success(s: WrappedMessage[_, _]) c.actorRef.tell(s.value, snd)
case Success(s) c.actorRef.tell(s, snd)
case _
}(ExecutionContexts.sameThreadExecutionContext)
def getSenderChannel(c: Context): (c.universe.Type, c.Tree, c.Expr[ActorRef]) = {
val replyChannel = c.inferImplicitValue(c.typeOf[ChannelRef[_]])
if (!replyChannel.isEmpty) {
import c.universe._
replyChannel.tpe match {
case TypeRef(_, _, param :: Nil)
(param, replyChannel, c.Expr(Select(replyChannel, "actorRef"))(c.universe.weakTypeTag[ActorRef]))
}
} else abort(c, "no implicit sender ChannelRef found")
}
def verify(c: Context)(sender: c.universe.Tree, msgT: c.universe.Type, sndT: c.universe.Type, chT: c.universe.Type)(): Unit = {
val unknown = c.universe.weakTypeOf[UnknownDoNotWriteMeDown]
val nothing = c.universe.weakTypeOf[Nothing]
def ignoreUnknown(in: c.universe.Type): c.universe.Type = if (in =:= unknown) nothing else in
def rec(msg: Set[c.universe.Type], checked: Set[c.universe.Type], depth: Int): Unit =
if (msg.nonEmpty) {
val u: c.universe.type = c.universe
val replies = msg map (m m -> (replyChannels(u)(chT, m) map (t ignoreUnknown(t))))
val missing = replies collect { case (k, v) if v.size == 0 k }
if (missing.nonEmpty)
error(c, s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)")
else {
val nextSend = replies.map(_._2).flatten map (m m -> (replyChannels(u)(sndT, m) map (t ignoreUnknown(t))))
val nextMissing = nextSend collect { case (k, v) if v.size == 0 k }
if (nextMissing.nonEmpty)
error(c, s"implicit sender `$sender` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)")
else {
val nextChecked = checked ++ msg
val nextMsg = nextSend.map(_._2).flatten -- nextChecked
rec(nextMsg, nextChecked, depth + 1)
}
}
}
rec(inputChannels(c.universe)(msgT).toSet, Set(c.universe.typeOf[Nothing]), 1)
}
}

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import language.implicitConversions
import akka.actor.ActorRef
import scala.concurrent.Future
package object channels {
implicit def actorRefOps(ref: ActorRef) = new ActorRefOps(ref)
implicit def futureOps[T](f: Future[T]) = new FutureOps(f)
implicit def anyOps[T](x: T) = new AnyOps(x)
}

View file

@ -0,0 +1,215 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.channels
import akka.testkit.AkkaSpec
import akka.channels._
import akka.actor.Actor
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.util.Timeout
import akka.testkit.TestProbe
object ChannelDocSpec {
//#motivation0
trait Request
case class Command(msg: String) extends Request
trait Reply
case object CommandSuccess extends Reply
case class CommandFailure(msg: String) extends Reply
//#motivation0
//#child
case class Stats(b: Request)
case object GetChild
case class ChildRef(child: ChannelRef[(Request, Reply) :+: TNil])
class Child extends Actor
with Channels[(Stats, Nothing) :+: TNil, (Request, Reply) :+: TNil] {
channel[Request] { (x, snd)
parentChannel <-!- Stats(x)
snd <-!- CommandSuccess
}
}
class Parent extends Actor
with Channels[TNil, (Stats, Nothing) :+: (GetChild.type, ChildRef) :+: TNil] {
val child = createChild(new Child)
channel[GetChild.type] { (_, snd) ChildRef(child) -!-> snd }
channel[Stats] { (x, _)
// collect some stats
}
}
//#child
}
class ChannelDocSpec extends AkkaSpec {
import ChannelDocSpec._
class MsgA
class MsgB
class MsgC
class MsgD
"demonstrate why Typed Channels" in {
def someActor = testActor
//#motivation1
val requestProcessor = someActor
requestProcessor ! Command
//#motivation1
expectMsg(Command)
/*
//#motivation2
val requestProcessor = new ChannelRef[(Request, Reply) :+: TNil](someActor)
requestProcessor <-!- Command // this does not compile
//#motivation2
*/
type Example = //
//#motivation-types
(MsgA, MsgB) :+: (MsgC, MsgD) :+: TNil
//#motivation-types
}
// only compile test
"demonstrate channels creation" ignore {
//#declaring-channels
class AC extends Actor with Channels[TNil, (Request, Reply) :+: TNil] {
channel[Request] { (req, snd)
req match {
case Command("ping") snd <-!- CommandSuccess
case _
}
}
}
//#declaring-channels
//#declaring-subchannels
class ACSub extends Actor with Channels[TNil, (Request, Reply) :+: TNil] {
channel[Command] { (cmd, snd) snd <-!- CommandSuccess }
channel[Request] { (req, snd)
if (ThreadLocalRandom.current.nextBoolean) snd <-!- CommandSuccess
else snd <-!- CommandFailure("no luck")
}
}
//#declaring-subchannels
}
// only compile test
"demonstrating message sending" ignore {
import scala.concurrent.ExecutionContext.Implicits.global
//#sending
implicit val dummySender: ChannelRef[(Any, Nothing) :+: TNil] = ???
implicit val timeout: Timeout = ??? // for the ask operations
val channelA: ChannelRef[(MsgA, MsgB) :+: TNil] = ???
val channelB: ChannelRef[(MsgB, MsgC) :+: TNil] = ???
val channelC: ChannelRef[(MsgC, MsgD) :+: TNil] = ???
val a = new MsgA
val fA = Future { new MsgA }
channelA <-!- a // send a to channelA
a -!-> channelA // same thing as above
channelA <-!- fA // eventually send the futures value to channelA
fA -!-> channelA // same thing as above
// ask the actor; return type given in full for illustration
val fB: Future[WrappedMessage[(MsgB, Nothing) :+: TNil, MsgB]] = channelA <-?- a
val fBunwrapped: Future[MsgB] = fB.lub
a -?-> channelA // same thing as above
channelA <-?- fA // eventually ask the actor, return the future
fA -?-> channelA // same thing as above
// chaining works as well
a -?-> channelA -?-> channelB -!-> channelC
//#sending
}
"demonstrate message forwarding" in {
//#forwarding
import scala.reflect.runtime.universe.TypeTag
class Latch[T1: TypeTag, T2: TypeTag](target: ChannelRef[(T1, T2) :+: TNil])
extends Actor with Channels[TNil, (Request, Reply) :+: (T1, T2) :+: TNil] {
implicit val timeout = Timeout(5.seconds)
//#become
channel[Request] {
case (Command("close"), snd)
channel[T1] { (t, s) t -?-> target -!-> s }
snd <-!- CommandSuccess
case (Command("open"), snd)
channel[T1] { (_, _) }
snd <-!- CommandSuccess
}
//#become
channel[T1] { (t, snd) t -?-> target -!-> snd }
}
//#forwarding
val probe = TestProbe()
val _target = new ChannelRef[(String, Int) :+: TNil](probe.ref)
val _self = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
//#usage
implicit val selfChannel: ChannelRef[(Any, Nothing) :+: TNil] = _self
val target: ChannelRef[(String, Int) :+: TNil] = _target // some actor
// type given just for demonstration purposes
val latch: ChannelRef[(Request, Reply) :+: (String, Int) :+: TNil] =
ChannelExt(system).actorOf(new Latch(target), "latch")
"hello" -!-> latch
//#processing
probe.expectMsg("hello")
probe.reply(5)
//#processing
expectMsg(5) // this is a TestKit-based example
Command("open") -!-> latch
expectMsg(CommandSuccess)
"world" -!-> latch
//#processing
probe.expectNoMsg(500.millis)
//#processing
expectNoMsg(500.millis)
//#usage
}
"demonstrate child creation" in {
implicit val selfChannel = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
//#child
//
// then it is used somewhat like this:
//
val parent = ChannelExt(system).actorOf(new Parent, "parent")
parent <-!- GetChild
val child = expectMsgType[ChildRef].child // this assumes TestKit context
child <-!- Command("hey there")
expectMsg(CommandSuccess)
//#child
}
}

View file

@ -8,6 +8,7 @@ Scala API
actors
typed-actors
typed-channels
logging
event-bus
scheduler

View file

@ -0,0 +1,509 @@
.. _typed-channels:
#############################
Typed Channels (EXPERIMENTAL)
#############################
.. note::
*This is a preview of the upcoming Typed Channels support, its API may change
during development up to the released version where the EXPERIMENTAL label is
removed.*
Motivation
==========
Actors derive great strength from their strong encapsulation, which enables
internal restarts as well as changing behavior and also composition. The last
one is enabled by being able to inject an actor into a message exchange
transparently, because all either side ever sees is an :class:`ActorRef`. The
straight-forward way to implement this encapsulation is to keep the actor
references untyped, and before the advent of macros in Scala 2.10 this was the
only tractable way.
As a motivation for change consider the following simple example:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation0
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation1
This is an error which is quite common, and the reason is that the compiler
does not catch it and cannot warn about it. Now if there were some type
restrictions on which messages the ``commandProcessor`` can process, that would
be a different story:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation2
The :class:`ChannelRef` wraps a normal untyped :class:`ActorRef`, but it
expresses a type constraint, namely that this channel accepts only messages of
type :class:`Request`, to which it may reply with messages of type
:class:`Reply`. The types do not express any guarantees on how many messages
will be exchanged, whether they will be received or processed, or whether a
reply will actually be sent. They only restrict those actions which are known
to be doomed already at compile time. In this case the second line would flag
an error, since the companion object ``Command`` is not an instance of type
:class:`Request`.
While this example looks pretty simple, the implications are profound. In order
to be useful, the system must be as reliable as you would expect a type system
to be. This means that unless you step outside of it (i.e. doing the
equivalent of ``.asInstanceOf[_]``) you shall be protected, failures shall be
recognized and flagged. There are a number of challenges included in this
requirement, which are discussed in `The Design Background`_ below.
Terminology
===========
.. describe:: type Channel[I, O] = (I, O)
A Channel is a pair of an input type and and output type. The input type is
the type of message accepted by the channel, the output type is the possible
reply type and may be ``Nothing`` to signify that no reply is sent. The input
type cannot be ``Nothing``.
.. describe:: type ChannelList
A ChannelList is an ordered collection of Channels, without further
restriction on the input or output types of these. This means that a single
input type may be associated with multiple output types within the same
ChannelList.
.. describe:: type TNil <: ChannelList
The empty ChannelList.
.. describe:: type :+:[Channel, ChannelList] <: ChannelList
This binary type constructor is used to build up lists of Channels, for which
infix notation will be most convenient:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation-types
.. describe:: class ChannelRef[T <: ChannelList]
A ChannelRef is what is referred to above as the channel reference, it bears
the ChannelList which describes all input and output types and their relation
for the referenced actor. It also contains the underlying :class:`ActorRef`.
.. describe:: trait Channels[P <: ChannelList, C <: ChannelList]
A mixin for the :class:`Actor` trait which is parameterized in the channel
requirements this actor has for its parent (P) and its selfChannel (C).
.. describe:: selfChannel
An ``Actor with Channels[P, C]`` has a ``selfChannel`` of type
``ChannelRef[C]``. This is the same type of channel reference which is
obtained by creating an instance of this actor.
.. describe:: type ReplyChannels[T <: ChannelList] <: ChannelList
Within an ``Actor with Channels[_, _]`` which takes a fully generic channel,
i.e. a type argument ``T <: ChannelList`` which is part of its selfChannel
type, this channels reply types are not known. The definition of this
channel uses the ReplyChannels type to abstractly refer to this unknown set
of channels in order to forward a reply from a ``ChannelRef[T]`` back to the
original sender. This operations type-safety is ensured at the senders site
by way of the ping-pong analysis described above.
.. describe:: class WrappedMessage[T <: ChannelList, LUB]
Scalas type system cannot directly express type unions. Asking an actor with
a given input type may result in multiple possible reply types, hence the
:class:`Future` holding this reply will contain the value wrapped inside a
container which carries this type (only at compile-time). The type parameter
LUB is the least upper bound of all input channels contained in the
ChannelList T.
Sending Messages across Channels
================================
Sending messages is best demonstrated in a quick overview of the basic operations:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#sending
The first line is included so that the code compiles, since all message sends
including ``!`` will check the implicitly found selfChannel for compatibility
with the target channels reply types. In this case we want to demonstrate just
the syntax of sending, hence the dummy sender which accepts everything and
replies never.
Presupposing three channel references of chainable types, an input value ``a``
and a Future holding such a value, we demonstrate the two basic operations
which are well known from untyped actors: tell/! and ask/?. The type of the
Future returned by the ask operation may seem surprising at first, but as the
last line demonstrates, it is built in a way which makes building actor chains
very simple. What the last line does is the following:
* it asks channelA, which returns a Future
* a callback is installed on the Future which will use the reply value of
channelA and ask channelB with it, returning another Future
* a callback is installed on that Future to send the reply value of channelB to
channelC, returning a Future with that previously sent value (using ``andThen``)
This example also motivates the introduction of the “turned-around” syntax
where messages flow more naturally from left to right, instead of the standard
object-oriented view of having the tell method operate on the ActorRef given to
the left.
This example informally introduced what is more precisely specified in the
following subsection.
The Rules
---------
Operations on typed channels are composable and obey a few simple rules:
* the message to be sent can be one of three things:
* a :class:`Future[_]`, in which case the contained value will be sent once
available; the value will be unwrapped if it is a :class:`WrappedMessage[_, _]`
* a :class:`WrappedMessage[_, _]`, which will be unwrapped (i.e. only the
value is sent)
* everything else is sent as is
* the operators are fully symmetric, i.e. ``-!->`` and ``<-!-`` do the same
thing provided the arguments also switch places
* sending with ``-?->`` or ``<-?-`` always returns a
``Future[WrappedMessage[_, _]]`` representing all possible reply channels,
even if there is only one (use ``.lub`` to get a :class:`Future[_]` with the
most precise single type for the value)
* sending a :class:`Future[_]` with ``-!->`` or ``<-!-`` returns a new
:class:`Future[_]` which will be completed with the value after it has been
sent; sending a strict value returns that value
Declaring an Actor with Channels
================================
The declaration of an Actor with Channels is done like this:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#declaring-channels
It should be noted that it is impossible to declare channels which are not part
of the channel list given as the second type argument to the :class:`Channels`
trait. It is also checked—albeit at runtime—that when the actors construction
is complete (i.e. its constructor and ``preStart`` hook have run) every channel
listed in the selfChannel type parameter has been declared. This can in general
not be done at compile time, both due to the possibility of overriding
subclasses as well as the problem that the compiler cannot determine whether a
``channel[]`` statement will be called in the course of execution due to
external inputs (e.g. if conditionally executed).
It should also be noted that the type of ``req`` in this example is
``Request``, hence it would be a compile-time error to try to match against the
``Command`` companion object. The ``snd`` reference is the sender channel
reference, which in this example is of type
``ChannelRef[(Reply, UnknownDoNotWriteMeDown) :+: TNil]``, meaning that sending
back a reply which is not of type ``Reply`` would be a compile-time error.
The last thing to note is that an actor is not obliged to reply to an incoming
message, even if that was successfully delivered to it: it might not be
appropriate, or it might be impossible, the actor might have failed before
executing the replying message send, etc. And as always, the ``snd`` reference
may be used more than once, and even stored away for later. It must not leave
the actor within it was created, however, because that would defeat the
ping-pong check; this is the reason for the curious name of the fabricated
reply type ``UnknownDoNotWriteMeDown``, if you find yourself declaring that
type as part of a message or similar you know that you are cheating.
Declaration of Subchannels
--------------------------
It can be convenient to carve out subchannels for special treatment like so:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#declaring-subchannels
This means that all ``Command`` requests will be positively answered while all
others may or may not be lucky. This dispatching between the two declarations
does not depend on their order but is solely done based on which type is more
specific.
Forwarding Messages
-------------------
Forwarding messages has been hinted at in the last sample already, but here is
a more complete sample actor:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#forwarding
:exclude: become
This actor declares a single-Channel parametric type which it forwards to a
target actor, handing replies back to the original sender using the ask/pipe
pattern.
.. note::
It is important not to forget the ``TypeTag`` context bound for all type
arguments which are used in channel declarations, otherwise the not very
helpful error “Predef is not an enclosing class” will haunt you.
Changing Behavior at Runtime
----------------------------
The actor from the previous example gets a lot more interesting when
implementing its control channel:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#forwarding
This shows all elements of the toolkit in action: calling ``channel[T1]`` again
during the lifetime of the actor will alter its behavior on that channel. In
this case a latch or gate is modeled which when closed will permit the message
flow through and when not will drop the messages to the floor.
Creating Actors with Channels
-----------------------------
Creating top-level actors with channels is done using the ``ChannelExt`` extension:
.. includecode:: code/docs/channels/ChannelDocSpec.scala
:include: usage
:exclude: processing
Inside an actor with channels children are created using the ``createChild`` method:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#child
In this example we create a simple child actor which responds to requests, but
also keeps its parent informed about what it is doing. The parent channel
within the child is thus declared to accept :class:`Stats` messages, and the
parent must consequently declare such a channel in order to be able to create
such a child. The parents job then is to create the child, make it available
to the outside via properly typed messages and collect the statistics coming in
from the child.
Implementation Restrictions
---------------------------
The erasure-based dispatch of incoming messages requires all channels which are
declared to have unique JVM type representations, i.e. it is not possible to
have two channel declarations with types ``List[A]`` and ``List[B]`` because
both would at runtime only be known as ``List[_]``.
The specific dispatch mechanism also require the declaration of all channels or
subchannels during the actors construction, independent of whether they shall
later change behavior or not. Changing behavior for a subchannel is only
possible if that subchannel was declared up-front.
TypeTags are currently (Scala 2.10.0) not serializable, hence narrowing of
:class:`ActorRef` does not work for remote references.
The Design Background
=====================
This section outlines the most prominent challenges encountered during the
development of Typed Channels and the rationale for their solutions. It is not
necessary to understand this material in order to use Typed Channels, but it
may be useful to explain why certain things are as they are.
The Type Pollution Problem
--------------------------
What if an actor accepts two different types of messages? It might be a main
communications channel which is forwarded to worker actors for performing some
long-running and/or dangerous task, plus an administrative channel for the
routing of requests. Or it might be a generic message throttler which accepts a
generic channel for passing it through (which delay where appropriate) and a
management channel for setting the throttling rate. In the second case it is
especially easy to see that those two channels will probably not be related,
their types will not be derived from a meaningful common supertype; instead the
least upper bound will probably be :class:`AnyRef`. If a typed channel
reference only had the capability to express a single type, this type would
then be no restriction anymore.
One solution to this is to never expose references describe more than one
channel at a time. But where would these references come from? It would be very
difficult to make this construction process type-safe, and it would also be an
inconvenient restriction, since message ordering guarantees only apply for the
same senderreceive pair, and if there are relations between the messages sent
on multiple channels those would need more boilerplate code to realize than if
all interaction were possible through a single reference.
The other solution thus is to express multiple channel types by a single
channel reference, which requires the implementation of type lists and
computations on these. And as we will see below it also requires the
specification of possibly multiple reply channels per input type, hence a type
map. The implementation chosen uses type lists like this:
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation-types
This type expresses two channels: type ``A`` may stimulate replies of type
``B``, while type ``C`` may evoke replies of type ``D``. The type operator
``:+:`` is a binary type which form a list of these channel definitions, and
like every good list it ends with an empty terminator ``TNil``.
The Reply Problem
-----------------
Akka actors have the power to reply to any message they receive, which is also
a message send and shall also be covered by typed channels. Since the sending
actor is the one which will also receive the reply, this needs to be verified.
The solution to this problem is that in addition to the ``self`` reference,
which is implicitly picked up as the sender for untyped actor interactions,
there is also a ``selfChannel`` which describes the typed channels handled by
this actor. Thus at the call site of the message send it must be verified that
this actor can actually handle the reply for that given message send.
The Sender Ping-Pong Problem
----------------------------
After successfully sending a message to an actor over a typed channel, that
actor will have a reference to the messages sender, because normal Akka
message processing rules apply. For this sender reference there must exist a
typed channel reference which describes the possible reply types which are
applicable for each of the incoming message channels. We will see below how
this reference is provided in the code, the problem we want to highlight here
is a different one: the nature of any sender reference is that it is highly
dynamic, the compiler cannot possibly know who sent the message we are
currently processing.
But this does not mean that all hope is lost: the solution is to do *all*
type-checking at the call site of the message send. The receiving actor just
needs to declare its channel descriptions in its own type, and channel
references are derived at construction from this type (implying the existence
of a typed ``actorOf``). Then the actor knows for each received message type
which the allowed reply types are. The typed channel for the sender reference
hence has the reply types for the current input channel as its own input types,
but what should the reply types be? This is the ping-pong problem:
* ActorA sends MsgA to ActorB
* ActorB replies with MsgB
* ActorA replies with MsgC
Every “reply” uses the sender channel, which is dynamic and hence only known
partially. But ActorB did not know who sent the message it just replied to and
hence it cannot check that it can process the possible replies following that
message send. Only ActorA could have known, because it knows its own channels
as well as ActorBs channels completely. The solution is thus to recursively
verify the message send, following all reply channels until all possible
message types to be sent have been verified. This sounds horribly complex, but
the algorithm for doing so actually has a worst-case complexity of O(N) where N
is the number of input channels of ActorA or ActorB, whoever has fewer.
The Parent Problem
------------------
There is one other actor reference which is available to every actor: its
parent. Since the childparent relationship is established permanently when the
child is created by the parent, this problem is easily solvable by encoding the
requirements of the child for its parent channel in its type signature having
the typed variant of ``actorOf`` verify this against the ``selfChannel``.
Anecdotally, since the guardian actor does not care at all about message sent
to it, top-level type channel actors must declare their parent channel to be
empty.
The Exposure/Restriction Problem
--------------------------------
An actor may provide more than one service, either itself or by proxy, each
with their own set of channels. Only having references for the full set of
channels leads to a too wide spread of capabilities: in the example of the
message rate throttling actor its management channel is only meant to be used
by the actor which inserted it, not by the two actors between it was inserted.
Hence the manager will have to create a channel reference which excludes the
management channels before handing out the reference to other actors.
Another variant of this problem is an actor which handles a channel whose input
type is a supertype for a number of derived channels. It should be allowed to
use the “superchannel” in place of any of the subchannels, but not the other
way around. The intuitive approach would be to model this by making the channel
reference contravariant in its channel types and define those channel types
accordingly. This does not work nicely, however, because Scalas type system is
not well-suited to modeling such calculations on unordered type lists; it might
be possible but its implementation would be forbiddingly complex.
Therefore this topic gained traction as macros became available: being able to
write down type calculations using standard collections and their
transformations reduces the implementation to a handful of lines. The “narrow”
operation implemented this way allows all narrowing of input channels and
widening of output channels down to ``(Nothing, Any)`` (which is to say:
removal).
The Forwarding Problem
----------------------
One important feature of actors mentioned above is their composability which is
enabled by being able to forward or delegate messages. It is the nature of this
process that the sending party is not aware of the true destination of the
message, it only sees the façade in front of it. Above we have seen that the
sender ping-pong problem requires all verification to be performed at the
senders end, but if the sender does not know the final recipient, how can it
check that the message exchange is type-safe?
The forwarding party—the middle-man—is also not in the position to make this
call, since all it has is the incomplete sender channel which is lacking reply
type information. The problem which arises lies precisely in these reply
sequences: the ping-pong scheme was verified against the middle-man, and if the
final recipient would reply to the forwarded request, that sender reference
would belong to a different channel and there is no single location in the
source code where all these pieces are known at compile time.
The solution to this problem is not to allow forwarding in the normal untyped
:class:`ActorRef` sense. Replies must always be sent by the recipient of the
original message in order for the type checks at the sender site to be
effective. Since forwarding is an important communication pattern among actors,
support for it is thus provided in the form of the :meth:`ask` pattern combined
with the :meth:`pipe` pattern, which both are not add-ons but fully integrated
operations among typed channels.
The JVM Erasure Problem
-----------------------
When an actor with typed channels receives a message, this message needs to be
dispatched internally to the right channel, so that the right sender channel
can be presented and so on. This dispatch needs to work with the information
contained in the message, which due to the erasure of generic type information
is an incomplete image of the true channel types. Those full types exist only
at compile-time and reifying them into TypeTags at runtime for every message
send would be prohibitively expensive. This means that channels which erase to
the same JVM type cannot coexist within the same actor, message would not be
routable reliably in that case.
The Actor Lookup Problem
------------------------
Everything up to this point has assumed that channel references are passed from
their point of creation to their point of use directly and in the regime of
strong, unerased types. This can also happen between actors by embedding them
in case classes with proper type information. But one particular useful feature
of Akka actors is that they have a stable identity by which they can be found,
a unique name. This name is represented as a :class:`String` and naturally does
not bear any type information concerning the actors channels. Thus, when
looking up an actor with ``system.actorFor(...)`` you will only get an untyped
:class:`ActorRef` and not a channel reference. This :class:`ActorRef` can of
course manually be wrapped in a channel reference bearing the desired channels,
but this is not a type-safe operation.
The solution in this case must be a runtime check. There is an operation to
“narrow” an :class:`ActorRef` to a channel reference of given type, which
behind the scenes will send a message to the designated actor with a TypeTag
representing the requested channels. The actor will check these against its own
TypeTag and reply with the verification result. This check uses the same code
as the compile-time “narrow” operation introduced above.
How to read The Types
=====================
In case of errors in your code the compiler will try to inform you in the most
precise way it can, and that will then contain types like this::
akka.channels.:+:[(com.example.Request, com.example.Reply),
akka.channels.:+:[(com.example.Command, Nothing), TNil]]
These types look unwieldy because of two things: they use fully qualified names
for all the types (thankfully using the ``()`` sugar for :class:`Tuple2`), and
they do not employ infix notation. That same type there might look like this in
your source code::
(Request, Reply) :+: (Command, Nothing) :+: TNil
As soon as someone finds the time, it would be nice if the IDEs learned to
print types making use of the files import statements and infix notation.

View file

@ -74,7 +74,7 @@ object AkkaBuild extends Build {
generatedPdf in Sphinx <<= generatedPdf in Sphinx in LocalProject(docs.id) map identity,
generatedEpub in Sphinx <<= generatedEpub in Sphinx in LocalProject(docs.id) map identity
),
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples)
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests)
)
lazy val actor = Project(
@ -391,7 +391,7 @@ object AkkaBuild extends Build {
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test",
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels,
remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries),
settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
sourceDirectory in Sphinx <<= baseDirectory / "rst",
@ -437,6 +437,24 @@ object AkkaBuild extends Build {
)
) configs (MultiJvm)
lazy val channels = Project(
id = "akka-channels-experimental",
base = file("akka-channels"),
dependencies = Seq(actor),
settings = defaultSettings ++ Seq(
libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _)
)
)
lazy val channelsTests = Project(
id = "akka-channels-tests",
base = file("akka-channels-tests"),
dependencies = Seq(channels, testkit % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-compiler" % _)
)
)
// Settings
override lazy val settings =
@ -807,12 +825,11 @@ object Dependencies {
val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2
val osgiCompendium= "org.osgi" % "org.osgi.compendium" % "4.2.0" // ApacheV2
// Camel Sample
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
// Cluster Sample
val sigar = "org.hyperic" % "sigar" % "1.6.4" // ApacheV2
val sigar = "org.hyperic" % "sigar" % "1.6.4" // ApacheV2
// Test