!cha #3789: Remove typed channels
This commit is contained in:
parent
497c0c3924
commit
31c4d76ba7
19 changed files with 12 additions and 2398 deletions
|
|
@ -1,21 +0,0 @@
|
|||
package akka.makkros
|
||||
|
||||
import language.experimental.macros
|
||||
import scala.reflect.macros.Context
|
||||
import scala.tools.reflect.ToolBox
|
||||
import scala.reflect.ClassTag
|
||||
import scala.tools.reflect.ToolBoxError
|
||||
|
||||
object Test {
|
||||
|
||||
def eval(code: String, compileOptions: String = "-cp akka-actor/target/classes:akka-channels/target/classes"): Any = {
|
||||
val tb = mkToolbox(compileOptions)
|
||||
tb.eval(tb.parse(code))
|
||||
}
|
||||
|
||||
def mkToolbox(compileOptions: String = ""): ToolBox[_ <: scala.reflect.api.Universe] = {
|
||||
val m = scala.reflect.runtime.currentMirror
|
||||
m.mkToolBox(options = compileOptions)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,648 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import akka.testkit._
|
||||
import akka.actor.ActorRef
|
||||
import akka.makkros.Test._
|
||||
import scala.tools.reflect.ToolBoxError
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Await
|
||||
import scala.util.Failure
|
||||
import akka.actor.ActorSystem
|
||||
import scala.reflect.api.Universe
|
||||
import scala.concurrent.Future
|
||||
import akka.actor.ActorInitializationException
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy.Stop
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
object ChannelSpec {
|
||||
|
||||
trait Msg
|
||||
|
||||
trait A extends Msg
|
||||
object A extends A
|
||||
object A1 extends A
|
||||
object A2 extends A
|
||||
|
||||
trait B extends Msg
|
||||
object B extends B
|
||||
|
||||
trait C extends Msg
|
||||
object C extends C
|
||||
object C1 extends C
|
||||
|
||||
trait D extends Msg
|
||||
object D extends D
|
||||
|
||||
// used for sender verification in the first two test cases
|
||||
class Tester extends Actor with Channels[TNil, (A, C) :+: (B, D) :+: TNil] {
|
||||
channel[A.type] { (msg, snd) ⇒ snd <-!- C }
|
||||
channel[A] { (msg, snd) ⇒ snd <-!- C1 }
|
||||
channel[B] {
|
||||
case (B, s) ⇒ s <-!- D
|
||||
}
|
||||
}
|
||||
class RecvC(ref: ActorRef) extends Actor with Channels[TNil, (C, Nothing) :+: TNil] {
|
||||
channel[C] { case (x, _) ⇒ ref ! x }
|
||||
}
|
||||
|
||||
// pos compile test for multiple reply channels
|
||||
class SubChannels extends Actor with Channels[TNil, (A, B) :+: (A, C) :+: TNil] {
|
||||
channel[A] {
|
||||
case (A1, x) ⇒ B -!-> x
|
||||
case (_, x) ⇒ x <-!- C
|
||||
}
|
||||
}
|
||||
|
||||
// pos compile test for children
|
||||
class Children extends Actor with Channels[TNil, (A, B) :+: (C, Nothing) :+: TNil] {
|
||||
val c = createChild(new Actor with Channels[(A, Nothing) :+: TNil, (B, C) :+: TNil] {
|
||||
channel[B] { case (B, s) ⇒ s <-!- C }
|
||||
})
|
||||
|
||||
var client: ActorRef = _
|
||||
channel[A] {
|
||||
case (A, s) ⇒ c <-!- B; client = sender
|
||||
}
|
||||
channel[C] {
|
||||
case (C, _) ⇒ client ! C
|
||||
}
|
||||
|
||||
createChild(new Actor with Channels[(C, Nothing) :+: TNil, TNil] {})
|
||||
createChild(new Actor with Channels[(A, Nothing) :+:(C, Nothing) :+: TNil, TNil] {})
|
||||
}
|
||||
|
||||
// compile test for polymorphic actors
|
||||
class WriteOnly[T1: ru.TypeTag, T2: ru.TypeTag](target: ChannelRef[(T1, T2) :+: TNil]) extends Actor with Channels[TNil, (D, D) :+: (T1, T2) :+: TNil] {
|
||||
implicit val t = Timeout(1.second)
|
||||
import akka.pattern.ask
|
||||
|
||||
channel[D] { (d, snd) ⇒ snd <-!- d }
|
||||
|
||||
channel[T1] { (x, snd) ⇒ x -?-> target -!-> snd -!-> snd }
|
||||
}
|
||||
|
||||
// compile test for whole-channel polymorphism
|
||||
class Poly[T <: ChannelList: ru.TypeTag](target: ChannelRef[T]) extends Actor with Channels[TNil, (A, A) :+: (B, B) :+: T] {
|
||||
implicit val timeout = Timeout(1.second)
|
||||
channel[T] { (x, snd) ⇒
|
||||
val xx: WrappedMessage[T, Any] = x
|
||||
val f: Future[ReplyChannels[T]] = target <-?- x
|
||||
f -!-> snd
|
||||
}
|
||||
import language.existentials
|
||||
channel[(A, _) :+: (B, _) :+: TNil] { (x, snd) ⇒
|
||||
val m: Msg = x.value
|
||||
val c: ChannelRef[TNil] = snd
|
||||
}
|
||||
channel[A] { (x, snd) ⇒ x -!-> snd }
|
||||
}
|
||||
|
||||
// companion to WriteOnly for testing pass-through
|
||||
class EchoTee(target: ActorRef) extends Actor with Channels[TNil, (C, C) :+: TNil] {
|
||||
channel[C] { (c, snd) ⇒ target ! C1; snd <-!- C1 }
|
||||
}
|
||||
|
||||
class MissingChannel extends Actor with Channels[TNil, (A, A) :+: (B, B) :+: TNil] {
|
||||
channel[A.type] { (_, _) ⇒ }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, classOf[AkkaSpec].getClassLoader)) with ImplicitSender {
|
||||
|
||||
import ChannelSpec._
|
||||
|
||||
implicit val selfChannel = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
|
||||
|
||||
"Actor with Channels" must {
|
||||
|
||||
"construct refs" in {
|
||||
val ref = ChannelExt(system).actorOf(new Tester, "t1")
|
||||
ref <-!- A
|
||||
expectMsg(C)
|
||||
lastSender should be(ref.actorRef)
|
||||
ref <-!- B
|
||||
expectMsg(D)
|
||||
lastSender should be(ref.actorRef)
|
||||
}
|
||||
|
||||
"select return channels" in {
|
||||
val ref = ChannelExt(system).actorOf(new Tester, "t2")
|
||||
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t3")
|
||||
ref <-!- A
|
||||
expectMsg(C)
|
||||
lastSender should be(selfChannel.actorRef)
|
||||
}
|
||||
|
||||
"correctly dispatch to subchannels" in {
|
||||
val ref = ChannelExt(system).actorOf(new Tester, "t4")
|
||||
implicit val selfChannel = ChannelExt(system).actorOf(new RecvC(testActor), "t5")
|
||||
ref <-!- A2
|
||||
expectMsg(C1)
|
||||
lastSender should be(selfChannel.actorRef)
|
||||
}
|
||||
|
||||
"not permit wrong message type" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val c = new ChannelRef[TNil](null)
|
||||
|new ChannelRef[(A, C) :+: TNil](null) <-!- B
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B.type")
|
||||
}
|
||||
|
||||
"not permit wrong message type in complex channel" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val c = new ChannelRef[TNil](null)
|
||||
|new ChannelRef[(A, C) :+: (B, D) :+: TNil](null) <-!- C
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.C.type")
|
||||
}
|
||||
|
||||
"not permit unfit sender ref" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(C, D) :+: TNil](null)
|
||||
|new ChannelRef[(A, B) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}.message should include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B")
|
||||
}
|
||||
|
||||
"permit any sender for Nothing replies" in {
|
||||
implicit val selfChannel = new ChannelRef[TNil](testActor)
|
||||
new ChannelRef[(A, Nothing) :+: TNil](testActor) <-!- A
|
||||
expectMsg(A)
|
||||
}
|
||||
|
||||
"require complete reply type sets" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[TNil](null)
|
||||
|new ChannelRef[(A, B) :+: (A, C) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}.message should include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.B, akka.channels.ChannelSpec.C")
|
||||
}
|
||||
|
||||
"verify ping-pong chains" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(B, B) :+: TNil](null)
|
||||
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}.message should include("implicit sender `s` does not support messages of the reply types akka.channels.ChannelSpec.C")
|
||||
}
|
||||
|
||||
"tolerate infinite ping-pong" in {
|
||||
val ex = intercept[InvocationTargetException] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(B, B) :+: (C, B) :+: TNil](null)
|
||||
|new ChannelRef[(A, B) :+: (B, C) :+: TNil](null) <-!- A
|
||||
""".stripMargin)
|
||||
}
|
||||
def cause(ex: Throwable): Throwable =
|
||||
if (ex.getCause == null) ex else cause(ex.getCause)
|
||||
cause(ex).getClass should be(classOf[NullPointerException])
|
||||
}
|
||||
|
||||
"not permit nonsensical channel declarations" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new Actor with Channels[TNil, (A, B) :+: TNil] {
|
||||
| channel[B] {
|
||||
| case (B, _) =>
|
||||
| }
|
||||
|}
|
||||
""".stripMargin)
|
||||
}.message should include("no channel defined for types akka.channels.ChannelSpec.B")
|
||||
}
|
||||
|
||||
"not permit subchannel replies" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new Actor with Channels[TNil, (A, B) :+: (A1.type, C) :+: TNil] {
|
||||
| channel[A] {
|
||||
| case (A1, x) => x <-!- C
|
||||
| }
|
||||
|}
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.C.type")
|
||||
}
|
||||
|
||||
"not permit Nothing children" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import akka.actor.Actor
|
||||
|import ChannelSpec._
|
||||
|new Actor with Channels[TNil, (A, B) :+: (C, D) :+: TNil] {
|
||||
| createChild(new Actor with Channels[Nothing, Nothing] {})
|
||||
|}
|
||||
""".stripMargin)
|
||||
}.message should include("Parent argument must not be Nothing")
|
||||
}
|
||||
|
||||
"not permit too demanding children" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import akka.actor.Actor
|
||||
|import ChannelSpec._
|
||||
|new Actor with Channels[TNil, (A, B) :+: (C, D) :+: TNil] {
|
||||
| createChild(new Actor with Channels[(B, Nothing) :+: TNil, TNil] {})
|
||||
|}
|
||||
""".stripMargin)
|
||||
}.message should include("This actor cannot support a child requiring channels akka.channels.ChannelSpec.B")
|
||||
}
|
||||
|
||||
"have a working selfChannel" in {
|
||||
val ref = ChannelExt(system).actorOf(new Children, "t10")
|
||||
ref <-!- A
|
||||
expectMsg(C)
|
||||
}
|
||||
|
||||
"have a working parentChannel" in {
|
||||
val parent = ChannelExt(system).actorOf(new Actor with Channels[TNil, (A, Nothing) :+: TNil] {
|
||||
createChild(new Actor with Channels[(A, Nothing) :+: TNil, TNil] {
|
||||
parentChannel <-!- A
|
||||
})
|
||||
channel[A] { (msg, snd) ⇒ testActor ! msg }
|
||||
}, "t11")
|
||||
expectMsg(A)
|
||||
}
|
||||
|
||||
"not permit top-level Actor with Channels which send to parent" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|null.asInstanceOf[ChannelExtension].actorOf(new Actor with Channels[(A, A) :+: TNil, (A, Nothing) :+: TNil] {}, "")
|
||||
""".stripMargin)
|
||||
}.message should include("type mismatch")
|
||||
}
|
||||
|
||||
"not permit sending wrong things to parents" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new Actor with Channels[TNil, (A, Nothing) :+: TNil] {
|
||||
| createChild(new Actor with Channels[(A, Nothing) :+: TNil, TNil] {
|
||||
| parentChannel <-!- B
|
||||
| })
|
||||
|}
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B.type")
|
||||
}
|
||||
|
||||
"support narrowing of references" in {
|
||||
val ref = new ChannelRef[(A, B) :+:(C, D) :+: TNil](null)
|
||||
val n: ChannelRef[(A1.type, B) :+: TNil] = ref.narrow[(A1.type, B) :+: TNil]
|
||||
}
|
||||
|
||||
"not allow narrowed refs to open new channels" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new ChannelRef[(A, C) :+: TNil](null).narrow[(A, C) :+: (B, C) :+: TNil]
|
||||
""".stripMargin)
|
||||
}.message should include("original ChannelRef does not support input type akka.channels.ChannelSpec.B")
|
||||
}
|
||||
|
||||
"not allow narrowed refs to widen channels" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new ChannelRef[(A1.type, C) :+: TNil](null).narrow[(A, C) :+: TNil]
|
||||
""".stripMargin)
|
||||
}.message should include("original ChannelRef does not support input type akka.channels.ChannelSpec.A")
|
||||
}
|
||||
|
||||
"not allow narrowed refs to miss reply channels" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new ChannelRef[(A, C) :+: (A, D) :+: TNil](null).narrow[(A, C) :+: TNil]
|
||||
""".stripMargin)
|
||||
}.message should include("reply types akka.channels.ChannelSpec.D not covered for channel akka.channels.ChannelSpec.A")
|
||||
}
|
||||
|
||||
"not allow narrowed refs to narrow reply channels" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new ChannelRef[(A, C) :+: (B, D) :+: TNil](null).narrow[(A, C) :+: (A, Nothing) :+: TNil]
|
||||
""".stripMargin)
|
||||
}.message should include("reply types Nothing are superfluous for channel akka.channels.ChannelSpec.A")
|
||||
}
|
||||
|
||||
"support narrowing ActorRefs" in {
|
||||
import Channels._
|
||||
val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t15")
|
||||
val ref = channel.actorRef
|
||||
implicit val t = Timeout(1.second.dilated)
|
||||
import system.dispatcher
|
||||
val r = Await.result(ref.narrow[(C, Nothing) :+: TNil], t.duration)
|
||||
r <-!- C
|
||||
expectMsg(C)
|
||||
}
|
||||
|
||||
"deny wrong narrowing of ActorRefs" in {
|
||||
val channel = ChannelExt(system).actorOf(new RecvC(testActor), "t16")
|
||||
val ref = channel.actorRef
|
||||
implicit val t = Timeout(1.second.dilated)
|
||||
import system.dispatcher
|
||||
val f = ref.narrow[(D, Nothing) :+: TNil]
|
||||
Await.ready(f, t.duration)
|
||||
f.value.get should be(Failure(NarrowingException("original ChannelRef does not support input type akka.channels.ChannelSpec.D")))
|
||||
}
|
||||
|
||||
"be equal according to its actor" in {
|
||||
val c1, c2 = new ChannelRef[TNil](testActor)
|
||||
c1 should equal(c2)
|
||||
}
|
||||
|
||||
"allow wrapping of ChannelRefs with pass-through" in {
|
||||
val target = ChannelExt(system).actorOf(new RecvC(testActor), "t17")
|
||||
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, Nothing](target), "t18")
|
||||
wrap <-!- C
|
||||
expectMsg(C)
|
||||
lastSender should be(target.actorRef)
|
||||
wrap <-!- D
|
||||
expectMsg(D)
|
||||
lastSender should be(wrap.actorRef)
|
||||
}
|
||||
|
||||
"allow wrapping of Actor with ChannelsRefs with replies" in {
|
||||
val probe = TestProbe()
|
||||
val target = ChannelExt(system).actorOf(new EchoTee(probe.ref), "t19")
|
||||
val wrap = ChannelExt(system).actorOf(new WriteOnly[C, C](target), "t20")
|
||||
C -!-> wrap
|
||||
expectMsg(C1)
|
||||
expectMsg(C1)
|
||||
probe.expectMsg(C1)
|
||||
}
|
||||
|
||||
"support typed ask" in {
|
||||
val t = ChannelExt(system).actorOf(new Tester, "t21")
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val r: Future[C] = t <-?- A
|
||||
Await.result(r, 1.second) should be(C)
|
||||
}
|
||||
|
||||
"support typed ask with multiple reply channels" in {
|
||||
val t = ChannelExt(system).actorOf(new SubChannels, "t22")
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val r: Future[Msg] = (t <-?- A1).lub
|
||||
Await.result(r, 1.second) should be(B)
|
||||
}
|
||||
|
||||
"check that channels do not erase to the same types" in {
|
||||
val m = intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|new Actor with Channels[TNil, (List[A], A) :+: (List[B], B) :+: TNil] {
|
||||
| channel[List[A]] { (x, s) ⇒ }
|
||||
| channel[List[B]] { (x, s) ⇒ }
|
||||
|}
|
||||
""".stripMargin)
|
||||
}.message
|
||||
m should include("erasure List[Any] overlaps with declared channels List[akka.channels.ChannelSpec.A]")
|
||||
m should include("erasure List[Any] overlaps with declared channels List[akka.channels.ChannelSpec.B]")
|
||||
}
|
||||
|
||||
"check that all channels were declared" in {
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
system.actorOf(Props(new Actor {
|
||||
context.actorOf(Props[MissingChannel])
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case ex: ActorInitializationException ⇒ testActor ! ex.getCause; Stop
|
||||
}
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
}
|
||||
}))
|
||||
}
|
||||
val m = expectMsgType[ActorInitializationException].getMessage
|
||||
m should include("missing declarations for channels")
|
||||
m should include("akka.channels.ChannelSpec.A")
|
||||
m should include("akka.channels.ChannelSpec.B")
|
||||
}
|
||||
|
||||
"be able to forward fully generic channels" in {
|
||||
val cd = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, D) :+: TNil] {
|
||||
channel[C] { (x, snd) ⇒ snd <-!- D }
|
||||
}, "t25")
|
||||
val t = ChannelExt(system).actorOf(new Poly(cd), "t26")
|
||||
t <-!- A
|
||||
expectMsg(A)
|
||||
t <-!- C
|
||||
expectMsg(D)
|
||||
lastSender should equal(t.actorRef)
|
||||
}
|
||||
|
||||
"not wrap Futures unnecessarily" in {
|
||||
val a = ChannelExt(system).actorOf(new Tester, "t26a")
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val c1: C = Await.result(a <-?- A, 1.second)
|
||||
val c2: C = Await.result(A -?-> a, 1.second)
|
||||
val fA = Future successful A
|
||||
val c3: C = Await.result(a <-?- fA, 1.second)
|
||||
val c4: C = Await.result(fA -?-> a, 1.second)
|
||||
}
|
||||
|
||||
"be able to transform Futures" in {
|
||||
val client = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
|
||||
val someActor = ChannelExt(system).actorOf(new Tester, "t26b")
|
||||
implicit val timeout = Timeout(1.second)
|
||||
implicit val ec = system.dispatcher
|
||||
A -?-> someActor -*-> (_ map { case C ⇒ B }) -?-> someActor -!-> client
|
||||
expectMsg(D)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"A WrappedMessage" must {
|
||||
|
||||
"be sendable to a ChannelRef" in {
|
||||
implicit val selfChannel = ChannelExt(system).actorOf(new Actor with Channels[TNil, (C, Nothing) :+:(D, Nothing) :+: TNil] {
|
||||
channel[C] { (c, snd) ⇒ testActor ! c }
|
||||
channel[D] { (d, snd) ⇒ testActor ! d }
|
||||
}, "t27")
|
||||
val t = ChannelExt(system).actorOf(new Tester, "t28")
|
||||
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
|
||||
val fa = Future successful a
|
||||
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
|
||||
val fb = Future successful b
|
||||
t <-!- a
|
||||
expectMsg(C)
|
||||
a -!-> t
|
||||
expectMsg(C)
|
||||
t <-!- b
|
||||
expectMsg(D)
|
||||
b -!-> t
|
||||
expectMsg(D)
|
||||
t <-!- fa
|
||||
expectMsg(C)
|
||||
fa -!-> t
|
||||
expectMsg(C)
|
||||
t <-!- fb
|
||||
expectMsg(D)
|
||||
fb -!-> t
|
||||
expectMsg(D)
|
||||
}
|
||||
|
||||
"not be sendable with wrong channels" when {
|
||||
"sending wrong first directly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|
||||
|new ChannelRef[(A, Nothing) :+: TNil](null) <-!- new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
"sending wrong first indirectly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|
||||
|new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null) -!-> new ChannelRef[(A, Nothing) :+: TNil](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
"sending wrong second directly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|
||||
|new ChannelRef[(A, Nothing) :+: TNil](null) <-!- new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
"sending wrong second indirectly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val s = new ChannelRef[(Any, Nothing) :+: TNil](null)
|
||||
|new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null) -!-> new ChannelRef[(A, Nothing) :+: TNil](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
}
|
||||
|
||||
"be askable to a ChannelRef" in {
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val t = ChannelExt(system).actorOf(new Tester, "t30")
|
||||
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
|
||||
val fa = Future successful a
|
||||
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
|
||||
val fb = Future successful b
|
||||
(Await.result(t <-?- a, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(C)
|
||||
(Await.result(a -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(C)
|
||||
(Await.result(t <-?- b, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(D)
|
||||
(Await.result(b -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(D)
|
||||
(Await.result(t <-?- fa, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(C)
|
||||
(Await.result(fa -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(C)
|
||||
(Await.result(t <-?- fb, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(D)
|
||||
(Await.result(fb -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value should be(D)
|
||||
}
|
||||
|
||||
"not be askable with wrong channels" when {
|
||||
"sending wrong first directly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val t = akka.util.Timeout(null)
|
||||
|new ChannelRef[(A, Nothing) :+: TNil](null) <-?- new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
"sending wrong first indirectly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val t = akka.util.Timeout(null)
|
||||
|new WrappedMessage[(B, Nothing) :+: (A, Nothing) :+: TNil, Msg](null) -?-> new ChannelRef[(A, Nothing) :+: TNil](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
"sending wrong second directly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val t = akka.util.Timeout(null)
|
||||
|new ChannelRef[(A, Nothing) :+: TNil](null) <-?- new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
"sending wrong second indirectly" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|implicit val t = akka.util.Timeout(null)
|
||||
|new WrappedMessage[(A, Nothing) :+: (B, Nothing) :+: TNil, Msg](null) -?-> new ChannelRef[(A, Nothing) :+: TNil](null)
|
||||
""".stripMargin)
|
||||
}.message should include("target ChannelRef does not support messages of types akka.channels.ChannelSpec.B (at depth 1)")
|
||||
}
|
||||
}
|
||||
|
||||
"be LUBbable within a Future" in {
|
||||
implicit val timeout = Timeout(1.second)
|
||||
val t = ChannelExt(system).actorOf(new Tester, "t31")
|
||||
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
|
||||
(Await.result((a -?-> t).lub, timeout.duration): Msg) should be(C)
|
||||
}
|
||||
|
||||
"not be LUBbable if not wrapped" in {
|
||||
intercept[ToolBoxError] {
|
||||
eval("""
|
||||
|import akka.channels._
|
||||
|import ChannelSpec._
|
||||
|import scala.concurrent.Future
|
||||
|null.asInstanceOf[Future[Int]].lub
|
||||
""".stripMargin)
|
||||
}.message should include("Cannot prove that Int <:< akka.channels.WrappedMessage")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import akka.actor.ExtensionKey
|
||||
import akka.actor.Extension
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import scala.reflect.runtime.universe._
|
||||
import akka.actor.Props
|
||||
import scala.reflect.ClassTag
|
||||
import scala.reflect.runtime.universe
|
||||
import akka.actor.Actor
|
||||
|
||||
object ChannelExt extends ExtensionKey[ChannelExtension]
|
||||
|
||||
class ChannelExtension(system: ExtendedActorSystem) extends Extension {
|
||||
|
||||
// FIXME: kick-start the universe (needed due to thread safety issues in runtime mirror)
|
||||
// see https://issues.scala-lang.org/browse/SI-6240
|
||||
private val t = typeTag[(Int, Int) :+: TNil]
|
||||
|
||||
def actorOf[Ch <: ChannelList](factory: ⇒ Actor with Channels[TNil, Ch], name: String): ChannelRef[Ch] =
|
||||
new ChannelRef[Ch](system.actorOf(Props(factory), name))
|
||||
}
|
||||
|
|
@ -1,43 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.{ macros ⇒ makkros }
|
||||
import akka.actor.ActorRef
|
||||
import scala.concurrent.Future
|
||||
|
||||
/**
|
||||
* A channel reference, holding a type list of all channels supported by the
|
||||
* underlying actor. This actor’s reference can be obtained as the `actorRef`
|
||||
* member.
|
||||
*/
|
||||
class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
|
||||
|
||||
/**
|
||||
* Send a message over this channel, “tell” semantics, returning the message.
|
||||
*/
|
||||
def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M]
|
||||
|
||||
/**
|
||||
* Eventually send the value contained in the future over this channel,
|
||||
* “tell” semantics, returning a Future which is completed after sending
|
||||
* with the value which was sent (“Future.andThen” semantics).
|
||||
*/
|
||||
def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M]
|
||||
|
||||
/**
|
||||
* Send a message over this channel, “ask” semantics, returning a Future
|
||||
* which will be completed with the reply message or a TimeoutException.
|
||||
* If the message is a Future itself, eventually send the Future’s value.
|
||||
*/
|
||||
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, Any, T, M]
|
||||
|
||||
/**
|
||||
* Narrow this ChannelRef by removing channels or narrowing input types or
|
||||
* widening output types.
|
||||
*/
|
||||
def narrow[C <: ChannelList]: ChannelRef[C] = macro macros.Narrow.impl[C, T]
|
||||
|
||||
}
|
||||
|
|
@ -1,181 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.{ macros ⇒ makkros }
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
import scala.reflect.api.Universe
|
||||
import scala.runtime.AbstractPartialFunction
|
||||
import akka.actor.Props
|
||||
import scala.collection.immutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.reflect.{ classTag, ClassTag }
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.AkkaException
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.ActorInitializationException
|
||||
|
||||
/**
|
||||
* Typed channels atop untyped actors.
|
||||
*
|
||||
* The idea is that the actor declares all its input types up front, including
|
||||
* what it expects the sender to handle wrt.replies, and then ChannelRef
|
||||
* carries this information for statically verifying that messages sent to an
|
||||
* actor have an actual chance of being processed.
|
||||
*
|
||||
* There are several implementation-imposed restrictions:
|
||||
*
|
||||
* - not two channels with different input types may have the same erased
|
||||
* type; this is currently not enforced at compile time (and leads to
|
||||
* channels being “ignored” at runtime)
|
||||
* - messages received by the actor are dispatched to channels based on the
|
||||
* erased type, which may be less precise than the actual channel type; this
|
||||
* can lead to ClassCastExceptions if sending through the untyped ActorRef
|
||||
*/
|
||||
trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒
|
||||
|
||||
import macros.Helpers._
|
||||
|
||||
/**
|
||||
* Create a child actor with properly typed ChannelRef, verifying that this
|
||||
* actor can handle everything which the child tries to send via its
|
||||
* `parent` ChannelRef.
|
||||
*/
|
||||
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch]): ChannelRef[Ch] = macro macros.CreateChild.impl[C, Pa, Ch]
|
||||
|
||||
/**
|
||||
* Create a child actor with properly typed ChannelRef and the given name,
|
||||
* verifying that this actor can handle everything which the child tries to
|
||||
* send via its `parent` ChannelRef.
|
||||
*/
|
||||
def createChild[Pa <: ChannelList, Ch <: ChannelList](factory: Actor with Channels[Pa, Ch], name: String): ChannelRef[Ch] = macro macros.CreateChild.implName[C, Pa, Ch]
|
||||
|
||||
/**
|
||||
* Properly typed ChannelRef for the context.parent.
|
||||
*/
|
||||
final def parentChannel: ChannelRef[P] = new ChannelRef(context.parent)
|
||||
|
||||
/**
|
||||
* The properly typed self-channel is used implicitly when sending to other
|
||||
* typed channels for verifying that replies can be handled.
|
||||
*/
|
||||
implicit final def selfChannel = new ChannelRef[C](self)
|
||||
|
||||
/*
|
||||
* This map holds the current behavior for each erasure-tagged channel; the
|
||||
* basic receive impl will dispatch incoming messages according to the most
|
||||
* specific erased type in this map.
|
||||
*/
|
||||
private var behavior = Map.empty[Class[_], FF]
|
||||
|
||||
/**
|
||||
* Functions for storage in the behavior, to get around erasure
|
||||
*/
|
||||
private trait FF
|
||||
private case class F1(f: (WrappedMessage[ChannelList, Any], ChannelRef[ChannelList]) ⇒ Unit) extends FF
|
||||
private case class F2(f: (Any, ChannelRef[ChannelList]) ⇒ Unit) extends FF
|
||||
|
||||
/**
|
||||
* Declare an input channel of the given type; the returned object takes a partial function:
|
||||
*
|
||||
* {{{
|
||||
* channel[A] {
|
||||
* case (a, s) =>
|
||||
* // a is of type A and
|
||||
* // s is a ChannelRef for the sender, capable of sending the declared reply type for A
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
def channel[T]: (Nothing ⇒ Unit) = macro macros.Channel.impl[Any, ChannelList, ChannelList, T, C, P]
|
||||
|
||||
def behaviorist[R, Ch: ru.TypeTag](wrapped: Boolean): (R ⇒ Unit) = new Behaviorist[R, Ch](wrapped)
|
||||
private class Behaviorist[-R, Ch: ru.TypeTag](wrapped: Boolean) extends (R ⇒ Unit) {
|
||||
private def ff(recv: R): FF =
|
||||
if (wrapped)
|
||||
F1(recv.asInstanceOf[(WrappedMessage[ChannelList, Any], ChannelRef[ChannelList]) ⇒ Unit])
|
||||
else
|
||||
F2(recv.asInstanceOf[(Any, ChannelRef[ChannelList]) ⇒ Unit])
|
||||
def apply(recv: R): Unit = {
|
||||
val tt = ru.typeTag[Ch]
|
||||
behavior ++= (for (t ← inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> ff(recv))
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* HORRIBLE HACK AHEAD
|
||||
*
|
||||
* I’d like to keep this a trait, but traits cannot have constructor
|
||||
* arguments, not even TypeTags.
|
||||
*/
|
||||
var channelListTypeTag: TypeTag[C] = _
|
||||
|
||||
/**
|
||||
* Sort so that subtypes always precede their supertypes, but without
|
||||
* obeying any order between unrelated subtypes (insert sort).
|
||||
*/
|
||||
private def sortClasses(in: Iterable[Class[_]]): immutable.Seq[Class[_]] =
|
||||
(new ArrayBuffer[Class[_]](in.size) /: in) { (buf, cls) ⇒
|
||||
buf.indexWhere(_ isAssignableFrom cls) match {
|
||||
case -1 ⇒ buf append cls
|
||||
case x ⇒ buf insert (x, cls)
|
||||
}
|
||||
buf
|
||||
}.to[immutable.IndexedSeq]
|
||||
|
||||
final lazy val receive = new AbstractPartialFunction[Any, Unit] {
|
||||
|
||||
val index = sortClasses(behavior.keys)
|
||||
|
||||
if (channelListTypeTag != null) verifyCompleteness()
|
||||
|
||||
private def verifyCompleteness() {
|
||||
val channels = inputChannels(ru)(channelListTypeTag.tpe)
|
||||
if (channels.isEmpty)
|
||||
throw ActorInitializationException("Actor with Channels cannot have no channels")
|
||||
val classes = channels groupBy (e ⇒ channelListTypeTag.mirror.runtimeClass(e.widen))
|
||||
val missing = classes.keySet -- behavior.keySet
|
||||
if (missing.nonEmpty) {
|
||||
val m = missing.map(classes).flatten
|
||||
throw ActorInitializationException(s"missing declarations for channels ${m mkString ", "}")
|
||||
}
|
||||
}
|
||||
|
||||
override def applyOrElse[A, B >: Unit](x: A, default: A ⇒ B): B = x match {
|
||||
case CheckType(tt) ⇒
|
||||
narrowCheck(ru)(channelListTypeTag.tpe, tt.tpe) match {
|
||||
case Nil ⇒ sender ! CheckTypeACK
|
||||
case err :: Nil ⇒ sender ! CheckTypeNAK(err)
|
||||
case list ⇒ sender ! CheckTypeNAK(list mkString ("multiple errors:\n - ", " - ", ""))
|
||||
}
|
||||
case _ ⇒
|
||||
val msgClass = x.getClass
|
||||
index find (_ isAssignableFrom msgClass) match {
|
||||
case None ⇒ default(x)
|
||||
case Some(cls) ⇒
|
||||
behavior(cls) match {
|
||||
case F1(f) ⇒ f(new WrappedMessage[ChannelList, Any](x), new ChannelRef(sender))
|
||||
case F2(f) ⇒ f(x, new ChannelRef(sender))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def isDefinedAt(x: Any): Boolean = x match {
|
||||
case c: CheckType[_] ⇒ true
|
||||
case _ ⇒
|
||||
val msgClass = x.getClass
|
||||
index exists (_ isAssignableFrom msgClass)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object Channels {
|
||||
|
||||
}
|
||||
|
|
@ -1,71 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels
|
||||
|
||||
import language.experimental.{ macros ⇒ makkros }
|
||||
import akka.actor.ActorRef
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import scala.util.Success
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
sealed trait ChannelList
|
||||
sealed trait TNil extends ChannelList
|
||||
sealed trait :+:[A <: (_, _), B <: ChannelList] extends ChannelList
|
||||
sealed trait ReplyChannels[T <: ChannelList] extends ChannelList
|
||||
|
||||
/**
|
||||
* This type is used to stand in for the unknown reply types of the fabricated
|
||||
* sender references; users don’t need to write it down, and if they do, they
|
||||
* know that they’re cheating (since these ref types must not escape their
|
||||
* defining actor context).
|
||||
*/
|
||||
sealed trait UnknownDoNotWriteMeDown
|
||||
|
||||
/**
|
||||
* This exception is used to signal errors when trying to `.narrow` an
|
||||
* ActorRef into a ChannelRef: if the actor finds the requested channel types
|
||||
* incompatible with its selfChannel, it will return errors in the same format
|
||||
* as would occur during compilation of a `ChannelRef.narrow` operation.
|
||||
*/
|
||||
case class NarrowingException(message: String) extends akka.AkkaException(message) with NoStackTrace
|
||||
|
||||
class ActorRefOps(val ref: ActorRef) extends AnyVal {
|
||||
import macros.Helpers._
|
||||
|
||||
/**
|
||||
* Send a query to the actor and check whether it supports the requested
|
||||
* channel types; the normal timeout semantics of the `ask` pattern apply.
|
||||
* The Future will be completed either with the desired ChannelRef or with
|
||||
* an exception (TimeoutException or NarrowingException).
|
||||
*/
|
||||
def narrow[C <: ChannelList](implicit timeout: Timeout, ec: ExecutionContext, tt: ru.TypeTag[C]): Future[ChannelRef[C]] = {
|
||||
import Channels._
|
||||
ref ? CheckType(tt) map {
|
||||
case CheckTypeACK ⇒ new ChannelRef[C](ref)
|
||||
case CheckTypeNAK(error) ⇒ throw NarrowingException(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FutureOps[T](val future: Future[T]) extends AnyVal {
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureOpsImpl[C, T]
|
||||
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[ChannelList, Any, C, T]
|
||||
def -*->[U](f: Future[T] ⇒ Future[U]): Future[U] = f(future)
|
||||
def lub[LUB](implicit ev: T <:< WrappedMessage[_, LUB]): Future[LUB] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
future map (ev(_).value)
|
||||
}
|
||||
}
|
||||
|
||||
class AnyOps[T](val value: T) extends AnyVal {
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): T = macro macros.Tell.opsImpl[C, T]
|
||||
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.opsImpl[ChannelList, Any, C, T]
|
||||
}
|
||||
|
||||
class WrappedMessage[T <: ChannelList, LUB](val value: LUB) extends AnyVal
|
||||
|
|
@ -1,173 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.concurrent.Future
|
||||
import akka.util.Timeout
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import scala.reflect.api.{ TypeCreator }
|
||||
|
||||
object Ask {
|
||||
import Helpers._
|
||||
|
||||
def impl[ //
|
||||
ReturnChannels <: ChannelList, // the precise type union describing the reply
|
||||
ReturnLUB, // the least-upper bound for the reply types
|
||||
ReturnT, // the return type if it is just a single type
|
||||
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
|
||||
Msg: c.WeakTypeTag // the message being sent down the channel
|
||||
](c: Context {
|
||||
type PrefixType = ChannelRef[Channel]
|
||||
})(msg: c.Expr[Msg]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
|
||||
val tpeChannel = weakTypeOf[Channel]
|
||||
val tpeMsg = weakTypeOf[Msg]
|
||||
val isFuture = tpeMsg <:< typeOf[Future[_]]
|
||||
val unwrapped =
|
||||
if (isFuture)
|
||||
tpeMsg match {
|
||||
case TypeRef(_, _, x :: _) ⇒ unwrapMsgType(c.universe)(x)
|
||||
}
|
||||
else unwrapMsgType(c.universe)(tpeMsg)
|
||||
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
|
||||
|
||||
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
|
||||
|
||||
implicit lazy val ttReturn = c.TypeTag[ReturnT](out.head)
|
||||
implicit lazy val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
|
||||
implicit lazy val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
|
||||
|
||||
out match {
|
||||
case x :: Nil if isFuture ⇒
|
||||
if (unwrapped <:< typeOf[ChannelList])
|
||||
reify(askFutureWrappedNoWrap[ReturnT](
|
||||
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
|
||||
else
|
||||
reify(askFutureNoWrap[ReturnT](
|
||||
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
|
||||
case x :: Nil ⇒
|
||||
reify(askOpsNoWrap[ReturnT](
|
||||
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
|
||||
case _ if isFuture ⇒
|
||||
if (unwrapped <:< typeOf[ChannelList])
|
||||
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
|
||||
else
|
||||
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
|
||||
case _ ⇒
|
||||
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
|
||||
}
|
||||
}
|
||||
|
||||
def opsImpl[ //
|
||||
ReturnChannels <: ChannelList, // the precise type union describing the reply
|
||||
ReturnLUB, // the least-upper bound for the reply types
|
||||
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
|
||||
Msg: c.WeakTypeTag // the message being sent down the channel
|
||||
](c: Context {
|
||||
type PrefixType = AnyOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
|
||||
val tpeChannel = weakTypeOf[Channel]
|
||||
val tpeMsg = weakTypeOf[Msg]
|
||||
val unwrapped = unwrapMsgType(c.universe)(tpeMsg)
|
||||
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
|
||||
|
||||
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
|
||||
|
||||
val msg = reify(c.prefix.splice.value)
|
||||
out match {
|
||||
case x :: Nil ⇒
|
||||
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x)
|
||||
reify(askOpsNoWrap[ReturnLUB](
|
||||
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
|
||||
case _ ⇒
|
||||
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
|
||||
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
|
||||
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
|
||||
}
|
||||
}
|
||||
|
||||
// this is the implementation for Future[_] -?-> ChannelRef[_]
|
||||
def futureImpl[ //
|
||||
ReturnChannels <: ChannelList, // the precise type union describing the reply
|
||||
ReturnLUB, // the least-upper bound for the reply types
|
||||
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
|
||||
Msg: c.WeakTypeTag // the message being sent down the channel
|
||||
](c: Context {
|
||||
type PrefixType = FutureOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = {
|
||||
import c.universe._
|
||||
|
||||
val tpeChannel = weakTypeOf[Channel]
|
||||
val tpeMsg = weakTypeOf[Msg]
|
||||
val unwrapped = unwrapMsgType(c.universe)(tpeMsg)
|
||||
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
|
||||
|
||||
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
|
||||
|
||||
out match {
|
||||
case x :: Nil ⇒
|
||||
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x)
|
||||
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
|
||||
reify(askFutureWrappedNoWrap[ReturnLUB](
|
||||
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
|
||||
else
|
||||
reify(askFutureNoWrap[ReturnLUB](
|
||||
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
|
||||
case _ ⇒
|
||||
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
|
||||
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
|
||||
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
|
||||
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
|
||||
else
|
||||
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
|
||||
}
|
||||
}
|
||||
|
||||
val wrapMessage = (m: Any) ⇒ (new WrappedMessage[TNil, Any](m): Any)
|
||||
|
||||
def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
akka.pattern.ask(target, msg).map(wrapMessage).asInstanceOf[Future[T]]
|
||||
}
|
||||
|
||||
def askFuture[T <: WrappedMessage[_, _]](target: ActorRef, future: Future[_])(implicit t: Timeout): Future[T] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
future flatMap (m ⇒ akka.pattern.ask(target, m).map(wrapMessage).asInstanceOf[Future[T]])
|
||||
}
|
||||
|
||||
def askFutureWrapped[T <: WrappedMessage[_, _]](target: ActorRef, future: Future[WrappedMessage[_, _]])(implicit t: Timeout): Future[T] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
future flatMap (w ⇒ akka.pattern.ask(target, w.value).map(wrapMessage).asInstanceOf[Future[T]])
|
||||
}
|
||||
|
||||
def askOpsNoWrap[T](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] =
|
||||
akka.pattern.ask(target, msg).asInstanceOf[Future[T]]
|
||||
|
||||
def askFutureNoWrap[T](target: ActorRef, future: Future[_])(implicit t: Timeout): Future[T] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
future flatMap (m ⇒ akka.pattern.ask(target, m).asInstanceOf[Future[T]])
|
||||
}
|
||||
|
||||
def askFutureWrappedNoWrap[T](target: ActorRef, future: Future[WrappedMessage[_, _]])(implicit t: Timeout): Future[T] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
future flatMap (w ⇒ akka.pattern.ask(target, w.value).asInstanceOf[Future[T]])
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,67 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.universe
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
|
||||
object Channel {
|
||||
import Helpers._
|
||||
|
||||
/**
|
||||
* This macro transforms a channel[] call which returns “some” Behaviorist
|
||||
* into a _channel[] call with precise reply channel descriptors, so that the
|
||||
* partial function it is applied to can enjoy proper type checking.
|
||||
*
|
||||
* T is the message type
|
||||
* C is the channel list of the enclosing Channels
|
||||
* P is the parent channel list
|
||||
*/
|
||||
def impl[LUB, ReplyChannels <: ChannelList, MsgTChan <: ChannelList, MsgT: c.WeakTypeTag, MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Channels[ParentChannels, MyChannels]
|
||||
}): c.Expr[(Nothing ⇒ Unit)] = {
|
||||
|
||||
val tpeMsgT = c.weakTypeOf[MsgT]
|
||||
val tpeMyChannels = c.weakTypeOf[MyChannels]
|
||||
|
||||
import c.universe._
|
||||
|
||||
val undefined = missingChannels(c.universe)(tpeMyChannels, inputChannels(c.universe)(tpeMsgT))
|
||||
if (undefined.nonEmpty) {
|
||||
c.abort(c.enclosingPosition, s"no channel defined for types ${undefined mkString ", "}")
|
||||
} else {
|
||||
checkUnique(c.universe)(tpeMsgT, tpeMyChannels) foreach (c.error(c.enclosingPosition, _))
|
||||
// need to calculate the intersection of the reply channel sets for all input channels
|
||||
val intersection = inputChannels(c.universe)(tpeMsgT) map (replyChannels(c.universe)(tpeMyChannels, _).toSet) reduce (_ intersect _)
|
||||
val channels = toChannels(c.universe)(intersection.toList, weakTypeOf[UnknownDoNotWriteMeDown])
|
||||
implicit val ttMyChannels = c.TypeTag[MyChannels](tpeMyChannels)
|
||||
implicit val ttReplyChannels = c.TypeTag[ReplyChannels](channels)
|
||||
implicit val ttMsgT = c.TypeTag[MsgT](tpeMsgT)
|
||||
val prepTree = reify(if (c.prefix.splice.channelListTypeTag == null)
|
||||
c.prefix.splice.channelListTypeTag = universe.typeTag[MyChannels])
|
||||
if (tpeMsgT <:< typeOf[ChannelList]) {
|
||||
implicit val ttMsgTChan = c.TypeTag[MsgTChan](tpeMsgT) // this is MsgT reinterpreted as <: ChannelList
|
||||
implicit val ttLUB = inputChannels(c.universe)(tpeMsgT) match {
|
||||
case x :: Nil ⇒ c.TypeTag[LUB](typeOf[Any])
|
||||
case xs ⇒ c.TypeTag[LUB](c.universe.lub(xs))
|
||||
}
|
||||
reify {
|
||||
prepTree.splice
|
||||
c.prefix.splice.behaviorist[(WrappedMessage[MsgTChan, LUB], ChannelRef[ReplyChannels]) ⇒ Unit, MsgT](
|
||||
bool(c, true).splice)(universe.typeTag[MsgT])
|
||||
}
|
||||
} else
|
||||
reify {
|
||||
prepTree.splice
|
||||
c.prefix.splice.behaviorist[(MsgT, ChannelRef[ReplyChannels]) ⇒ Unit, MsgT](
|
||||
bool(c, false).splice)(universe.typeTag[MsgT])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,56 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
|
||||
object CreateChild {
|
||||
import Helpers._
|
||||
|
||||
def impl[MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag, ChildChannels <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Actor with Channels[_, MyChannels]
|
||||
})(factory: c.Expr[Actor with Channels[ParentChannels, ChildChannels]]): c.Expr[ChannelRef[ChildChannels]] = {
|
||||
|
||||
import c.universe._
|
||||
|
||||
verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels])
|
||||
|
||||
implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels])
|
||||
reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice))))
|
||||
}
|
||||
|
||||
def implName[MyChannels <: ChannelList: c.WeakTypeTag, ParentChannels <: ChannelList: c.WeakTypeTag, ChildChannels <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = Actor with Channels[_, MyChannels]
|
||||
})(factory: c.Expr[Actor with Channels[ParentChannels, ChildChannels]], name: c.Expr[String]): c.Expr[ChannelRef[ChildChannels]] = {
|
||||
|
||||
import c.universe._
|
||||
|
||||
verify(c)(weakTypeOf[ParentChannels], weakTypeOf[ChildChannels], weakTypeOf[MyChannels])
|
||||
|
||||
implicit val t = c.TypeTag[ChildChannels](c.weakTypeOf[ChildChannels])
|
||||
reify(new ChannelRef[ChildChannels](c.prefix.splice.context.actorOf(Props(factory.splice), name.splice)))
|
||||
}
|
||||
|
||||
def verify(c: Context)(parent: c.Type, child: c.Type, mine: c.Type): Unit = {
|
||||
import c.universe._
|
||||
|
||||
val nothing = weakTypeOf[Nothing]
|
||||
if (parent =:= nothing) abort(c, "Parent argument must not be Nothing")
|
||||
if (child =:= nothing) abort(c, "channel list must not be Nothing")
|
||||
|
||||
val missing = missingChannels(c.universe)(mine, inputChannels(c.universe)(parent))
|
||||
if (missing.nonEmpty)
|
||||
abort(c, s"This actor cannot support a child requiring channels ${missing mkString ", "}")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,160 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels.macros
|
||||
|
||||
import akka.AkkaException
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import scala.reflect.api.TypeCreator
|
||||
|
||||
object Helpers {
|
||||
|
||||
type Recv[T, Ch <: ChannelList] = Function2[T, ChannelRef[Ch], Unit]
|
||||
|
||||
case class CheckType[T](tt: TypeTag[T])
|
||||
case object CheckTypeACK
|
||||
case class CheckTypeNAK(errors: String)
|
||||
|
||||
def error(c: Context, msg: String) = c.error(c.enclosingPosition, msg)
|
||||
def abort(c: Context, msg: String) = c.abort(c.enclosingPosition, msg)
|
||||
|
||||
def imp[T: c.WeakTypeTag](c: Context): c.Expr[T] = {
|
||||
import c.universe._
|
||||
c.Expr[T](TypeApply(Ident(newTermName("implicitly")), List(TypeTree().setType(weakTypeOf[T]))))
|
||||
}
|
||||
|
||||
def bool(c: Context, b: Boolean): c.Expr[Boolean] = c.Expr[Boolean](c.universe.Literal(c.universe.Constant(b)))
|
||||
|
||||
def checkUnique(u: Universe)(channel: u.Type, list: u.Type): Option[String] = {
|
||||
val channels = inputChannels(u)(list) groupBy (_.erasure)
|
||||
val dupes = channels.get(channel.erasure).getOrElse(Nil).filterNot(_ =:= channel)
|
||||
if (dupes.isEmpty) None
|
||||
else Some(s"erasure ${channel.erasure} overlaps with declared channels ${dupes mkString ", "}")
|
||||
}
|
||||
|
||||
/**
|
||||
* check that the original ChannelList is a subtype of the target ChannelList; return a list or error strings
|
||||
*/
|
||||
def narrowCheck(u: Universe)(orig: u.Type, target: u.Type): List[String] = {
|
||||
var errors = List.empty[String]
|
||||
for (in ← inputChannels(u)(target)) {
|
||||
val replies = replyChannels(u)(orig, in)
|
||||
if (replies.isEmpty) errors ::= s"original ChannelRef does not support input type $in"
|
||||
else {
|
||||
val targetReplies = replyChannels(u)(target, in)
|
||||
val unsatisfied = replies filterNot (r ⇒ targetReplies exists (r <:< _))
|
||||
if (unsatisfied.nonEmpty) errors ::= s"reply types ${unsatisfied mkString ", "} not covered for channel $in"
|
||||
val leftovers = targetReplies filterNot (t ⇒ replies exists (_ <:< t))
|
||||
if (leftovers.nonEmpty) errors ::= s"desired reply types ${leftovers mkString ", "} are superfluous for channel $in"
|
||||
}
|
||||
}
|
||||
errors.reverse
|
||||
}
|
||||
|
||||
/**
|
||||
* get all input channels from a ChannelList or return the given type
|
||||
*/
|
||||
final def inputChannels(u: Universe)(list: u.Type): List[u.Type] = {
|
||||
import u._
|
||||
val imp = u.mkImporter(ru)
|
||||
val tpeChannelList = imp.importType(ru.typeOf[ChannelList])
|
||||
val tpeTNil = imp.importType(ru.typeOf[TNil])
|
||||
def rec(l: u.Type, acc: List[u.Type]): List[u.Type] = l match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, if (acc contains in) acc else in :: acc)
|
||||
case TypeRef(_, _, ExistentialType(_, TypeRef(_, _, in :: _)) :: tail :: Nil) ⇒ rec(tail, if (acc contains in) acc else in :: acc)
|
||||
case ExistentialType(_, x) ⇒ rec(x, acc)
|
||||
case last ⇒ if (last =:= tpeTNil) acc.reverse else (last :: acc).reverse
|
||||
}
|
||||
if (list <:< tpeChannelList) rec(list, Nil)
|
||||
else List(list)
|
||||
}
|
||||
|
||||
/**
|
||||
* find all input channels matching the given message type and return a
|
||||
* list of their respective reply channels
|
||||
*/
|
||||
final def replyChannels(u: Universe)(list: u.Type, msg: u.Type): List[u.Type] = {
|
||||
import u._
|
||||
val imp = u.mkImporter(ru)
|
||||
val tpeReplyTypes = imp.importType(ru.typeOf[ReplyChannels[_]])
|
||||
val tpeTNil = imp.importType(ru.typeOf[TNil])
|
||||
val msgTypes = inputChannels(u)(msg)
|
||||
def rec(l: Type, acc: List[Type]): List[Type] = {
|
||||
l match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: out :: Nil) :: tail :: Nil) if msgTypes exists (_ <:< in) ⇒
|
||||
rec(tail, if (acc contains out) acc else out :: acc)
|
||||
case TypeRef(_, _, _ :: tail :: Nil) ⇒
|
||||
rec(tail, acc)
|
||||
case x if x =:= tpeTNil ⇒ acc.reverse
|
||||
case x if x <:< tpeReplyTypes ⇒ throw new IllegalArgumentException("cannot compute the ReplyChannels of a ReplyChannels type")
|
||||
case x @ TypeRef(NoPrefix, _, Nil) ⇒
|
||||
acc reverse_::: (if (msgTypes exists (_ <:< x)) appliedType(tpeReplyTypes.typeConstructor, List(x)) :: Nil else Nil)
|
||||
case x ⇒ throw new IllegalArgumentException(s"no idea what this type is: $x")
|
||||
}
|
||||
}
|
||||
val n = typeOf[Nothing]
|
||||
if (msg =:= n) List(n) else rec(list, Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* filter from the `required` list of types all which are subtypes of inputs of the ChannelList
|
||||
*/
|
||||
final def missingChannels(u: Universe)(channels: u.Type, required: List[u.Type]): List[u.Type] = {
|
||||
import u._
|
||||
// making the top-level method recursive blows up the compiler (when compiling the macro itself)
|
||||
def rec(ch: Type, req: List[Type]): List[Type] = {
|
||||
ch match {
|
||||
case TypeRef(_, _, TypeRef(_, _, in :: _) :: tail :: Nil) ⇒ rec(tail, req filterNot (_ <:< in))
|
||||
case last ⇒ req filterNot (_ <:< last)
|
||||
}
|
||||
}
|
||||
rec(channels, required)
|
||||
}
|
||||
|
||||
/**
|
||||
* convert a list of types List(<T1>, <T2>, ...) into a ChannelList
|
||||
* ( Channel[<T1>, Nothing] :=: Channel[<T2>, Nothing] :=: ... :=: TNil )
|
||||
*/
|
||||
final def toChannels(u: Universe)(list: List[u.Type], out: u.Type): u.Type = {
|
||||
import u._
|
||||
def rec(l: List[Type], acc: Type): Type = l match {
|
||||
case head :: (tail: List[Type]) ⇒
|
||||
if (head =:= weakTypeOf[Nothing]) rec(tail, acc)
|
||||
else
|
||||
rec(tail,
|
||||
appliedType(weakTypeOf[:+:[_, _]].typeConstructor, List(
|
||||
appliedType(weakTypeOf[Tuple2[_, _]].typeConstructor, List(
|
||||
head,
|
||||
out)),
|
||||
acc)))
|
||||
case _ ⇒ acc
|
||||
}
|
||||
rec(list.reverse, weakTypeOf[TNil])
|
||||
}
|
||||
|
||||
/**
|
||||
* takes a message tpe and tree and returns an expression which yields the
|
||||
* underlying message (i.e. unwraps WrappedMessage if necessary)
|
||||
*/
|
||||
final def toMsg(c: Context)(tree: c.Expr[Any], tpe: c.Type): c.Expr[Any] = {
|
||||
import c.universe._
|
||||
if (tpe <:< c.typeOf[WrappedMessage[_, _]])
|
||||
c.universe.reify(tree.splice.asInstanceOf[WrappedMessage[TNil, Any]].value)
|
||||
else tree
|
||||
}
|
||||
|
||||
final def unwrapMsgType(u: Universe)(msg: u.Type): u.Type = {
|
||||
import u._
|
||||
msg match {
|
||||
case TypeRef(_, _, x :: _) if msg <:< typeOf[WrappedMessage[_, _]] ⇒ x
|
||||
case x ⇒ x
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
|
||||
object Narrow {
|
||||
import Helpers._
|
||||
|
||||
def impl[Desired <: ChannelList: c.WeakTypeTag, MyChannels <: ChannelList: c.WeakTypeTag](
|
||||
c: Context {
|
||||
type PrefixType = ChannelRef[MyChannels]
|
||||
}): c.Expr[ChannelRef[Desired]] = {
|
||||
import c.{ universe ⇒ u }
|
||||
narrowCheck(u)(u.weakTypeOf[MyChannels], u.weakTypeOf[Desired]) match {
|
||||
case Nil ⇒ // okay
|
||||
case err :: Nil ⇒ c.error(c.enclosingPosition, err)
|
||||
case list ⇒ c.error(c.enclosingPosition, list mkString ("multiple errors:\n - ", "\n - ", ""))
|
||||
}
|
||||
u.reify(c.prefix.splice.asInstanceOf[ChannelRef[Desired]])
|
||||
}
|
||||
}
|
||||
|
|
@ -1,109 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.channels.macros
|
||||
|
||||
import akka.channels._
|
||||
import akka.actor._
|
||||
import scala.reflect.runtime.{ universe ⇒ ru }
|
||||
import ru.TypeTag
|
||||
import scala.reflect.macros.Context
|
||||
import scala.reflect.api.Universe
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Failure, Success }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
object Tell {
|
||||
import Helpers._
|
||||
|
||||
def impl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[MyChannels]
|
||||
})(msg: c.Expr[Msg]): c.Expr[Msg] =
|
||||
doTell(c)(c.universe.weakTypeOf[MyChannels], c.universe.weakTypeOf[Msg], msg, c.prefix)
|
||||
|
||||
def opsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = AnyOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = {
|
||||
import c.universe._
|
||||
doTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.value), channel)
|
||||
}
|
||||
|
||||
def doTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)(
|
||||
tpeMyChannels: c.Type, tpeMsg: c.Type, msg: c.Expr[Msg], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = {
|
||||
val (tpeSender, senderTree, sender) = getSenderChannel(c)
|
||||
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
|
||||
val cond = bool(c, tpeMsg <:< c.typeOf[WrappedMessage[_, _]])
|
||||
c.universe.reify {
|
||||
val $m = msg.splice
|
||||
target.splice.actorRef.tell(if (cond.splice) $m.asInstanceOf[WrappedMessage[TNil, Any]].value else $m, sender.splice)
|
||||
$m
|
||||
}
|
||||
}
|
||||
|
||||
def futureOpsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = FutureOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
|
||||
import c.universe._
|
||||
doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.future), channel)
|
||||
}
|
||||
|
||||
def futureImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[MyChannels]
|
||||
})(future: c.Expr[Future[Msg]]): c.Expr[Future[Msg]] = {
|
||||
import c.universe._
|
||||
doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], future, c.prefix)
|
||||
}
|
||||
|
||||
def doFutureTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)(
|
||||
tpeMyChannels: c.Type, tpeMsg: c.Type, future: c.Expr[Future[Msg]], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
|
||||
val (tpeSender, senderTree, sender) = getSenderChannel(c)
|
||||
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
|
||||
c.universe.reify(pipeTo[Msg](future.splice, target.splice, sender.splice))
|
||||
}
|
||||
|
||||
@inline def pipeTo[Msg](f: Future[Msg], c: ChannelRef[_], snd: ActorRef): Future[Msg] =
|
||||
f.future.andThen {
|
||||
case Success(s: WrappedMessage[_, _]) ⇒ c.actorRef.tell(s.value, snd)
|
||||
case Success(s) ⇒ c.actorRef.tell(s, snd)
|
||||
case _ ⇒
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
|
||||
def getSenderChannel(c: Context): (c.universe.Type, c.Tree, c.Expr[ActorRef]) = {
|
||||
val replyChannel = c.inferImplicitValue(c.typeOf[ChannelRef[_]])
|
||||
if (!replyChannel.isEmpty) {
|
||||
import c.universe._
|
||||
replyChannel.tpe match {
|
||||
case TypeRef(_, _, param :: Nil) ⇒
|
||||
(param, replyChannel, c.Expr(Select(replyChannel, newTermName("actorRef")))(c.universe.weakTypeTag[ActorRef]))
|
||||
}
|
||||
} else abort(c, "no implicit sender ChannelRef found")
|
||||
}
|
||||
|
||||
def verify(c: Context)(sender: c.universe.Tree, msgT: c.universe.Type, sndT: c.universe.Type, chT: c.universe.Type)(): Unit = {
|
||||
val unknown = c.universe.weakTypeOf[UnknownDoNotWriteMeDown]
|
||||
val nothing = c.universe.weakTypeOf[Nothing]
|
||||
def ignoreUnknown(in: c.universe.Type): c.universe.Type = if (in =:= unknown) nothing else in
|
||||
def rec(msg: Set[c.universe.Type], checked: Set[c.universe.Type], depth: Int): Unit =
|
||||
if (msg.nonEmpty) {
|
||||
val u: c.universe.type = c.universe
|
||||
val replies = msg map (m ⇒ m -> (replyChannels(u)(chT, m) map (t ⇒ ignoreUnknown(t))))
|
||||
val missing = replies collect { case (k, v) if v.isEmpty ⇒ k }
|
||||
if (missing.nonEmpty)
|
||||
error(c, s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextSend = replies.map(_._2).flatten map (m ⇒ m -> (replyChannels(u)(sndT, m) map (t ⇒ ignoreUnknown(t))))
|
||||
val nextMissing = nextSend collect { case (k, v) if v.isEmpty ⇒ k }
|
||||
if (nextMissing.nonEmpty)
|
||||
error(c, s"implicit sender `$sender` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextChecked = checked ++ msg
|
||||
val nextMsg = nextSend.map(_._2).flatten -- nextChecked
|
||||
rec(nextMsg, nextChecked, depth + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
rec(inputChannels(c.universe)(msgT).toSet, Set(c.universe.typeOf[Nothing]), 1)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
||||
import language.implicitConversions
|
||||
import akka.actor.ActorRef
|
||||
import scala.concurrent.Future
|
||||
|
||||
package object channels {
|
||||
implicit def actorRefOps(ref: ActorRef) = new ActorRefOps(ref)
|
||||
implicit def futureOps[T](f: Future[T]) = new FutureOps(f)
|
||||
implicit def anyOps[T](x: T) = new AnyOps(x)
|
||||
}
|
||||
|
|
@ -57,9 +57,6 @@ their API has not yet solidified enough in order to be considered frozen. You
|
|||
can help accelerating this process by giving feedback on these modules on our
|
||||
mailing list.
|
||||
|
||||
- ``akka-channels-experimental`` – Typed Channels on top of untyped Actors,
|
||||
using Scala 2.10 macros
|
||||
|
||||
- ``akka-contrib`` – an assortment of contributions which may or may not be
|
||||
moved into core modules, see :ref:`akka-contrib` for more details.
|
||||
|
||||
|
|
|
|||
|
|
@ -163,6 +163,13 @@ Transactor Module is Deprecated
|
|||
The integration between actors and STM in the module ``akka-transactor`` is deprecated and will be
|
||||
removed in a future version.
|
||||
|
||||
Typed Channels has been removed
|
||||
===============================
|
||||
|
||||
Typed channels were an experimental feature which we decided to remove: its implementation relied
|
||||
on an experimental feature of Scala for which there is no correspondence in Java and other languages and
|
||||
its usage was not intuitive.
|
||||
|
||||
Removed Deprecated Features
|
||||
===========================
|
||||
|
||||
|
|
|
|||
|
|
@ -1,221 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package docs.channels
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.channels._
|
||||
import akka.actor.Actor
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
object ChannelDocSpec {
|
||||
|
||||
//#motivation0
|
||||
trait Request
|
||||
case class Command(msg: String) extends Request
|
||||
|
||||
trait Reply
|
||||
case object CommandSuccess extends Reply
|
||||
case class CommandFailure(msg: String) extends Reply
|
||||
//#motivation0
|
||||
|
||||
//#child
|
||||
case class Stats(b: Request)
|
||||
case object GetChild
|
||||
case class ChildRef(child: ChannelRef[(Request, Reply) :+: TNil])
|
||||
|
||||
class Child extends Actor
|
||||
with Channels[(Stats, Nothing) :+: TNil, (Request, Reply) :+: TNil] {
|
||||
|
||||
channel[Request] { (x, snd) =>
|
||||
parentChannel <-!- Stats(x)
|
||||
snd <-!- CommandSuccess
|
||||
}
|
||||
}
|
||||
|
||||
class Parent extends Actor
|
||||
with Channels[TNil, (Stats, Nothing) :+: (GetChild.type, ChildRef) :+: TNil] {
|
||||
|
||||
val child = createChild(new Child)
|
||||
|
||||
channel[GetChild.type] { (_, snd) => ChildRef(child) -!-> snd }
|
||||
|
||||
channel[Stats] { (x, _) =>
|
||||
// collect some stats
|
||||
}
|
||||
}
|
||||
|
||||
//#child
|
||||
}
|
||||
|
||||
class ChannelDocSpec extends AkkaSpec {
|
||||
|
||||
import ChannelDocSpec._
|
||||
|
||||
trait Msg
|
||||
|
||||
class MsgA extends Msg
|
||||
class MsgB extends Msg
|
||||
class MsgC extends Msg
|
||||
class MsgD extends Msg
|
||||
|
||||
"demonstrate why Typed Channels" in {
|
||||
def someActor = testActor
|
||||
//#motivation1
|
||||
val requestProcessor = someActor
|
||||
requestProcessor ! Command
|
||||
//#motivation1
|
||||
expectMsg(Command)
|
||||
|
||||
/*
|
||||
//#motivation2
|
||||
val requestProcessor = new ChannelRef[(Request, Reply) :+: TNil](someActor)
|
||||
requestProcessor <-!- Command // this does not compile
|
||||
//#motivation2
|
||||
*/
|
||||
|
||||
type Example = //
|
||||
//#motivation-types
|
||||
(MsgA, MsgB) :+: (MsgC, MsgD) :+: TNil
|
||||
//#motivation-types
|
||||
}
|
||||
|
||||
// only compile test
|
||||
"demonstrate channels creation" ignore {
|
||||
//#declaring-channels
|
||||
class AC extends Actor with Channels[TNil, (Request, Reply) :+: TNil] {
|
||||
channel[Request] { (req, snd) =>
|
||||
req match {
|
||||
case Command("ping") => snd <-!- CommandSuccess
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
//#declaring-channels
|
||||
|
||||
//#declaring-subchannels
|
||||
class ACSub extends Actor with Channels[TNil, (Request, Reply) :+: TNil] {
|
||||
channel[Command] { (cmd, snd) => snd <-!- CommandSuccess }
|
||||
channel[Request] { (req, snd) =>
|
||||
if (ThreadLocalRandom.current.nextBoolean) snd <-!- CommandSuccess
|
||||
else snd <-!- CommandFailure("no luck")
|
||||
}
|
||||
}
|
||||
//#declaring-subchannels
|
||||
}
|
||||
|
||||
// only compile test
|
||||
"demonstrating message sending" ignore {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
//#sending
|
||||
implicit val dummySender: ChannelRef[(Any, Nothing) :+: TNil] = ???
|
||||
implicit val timeout: Timeout = ??? // for the ask operations
|
||||
|
||||
val channelA: ChannelRef[(MsgA, MsgB) :+: TNil] = ???
|
||||
val channelA2: ChannelRef[(MsgA, MsgB) :+: (MsgA, MsgC) :+: TNil] = ???
|
||||
val channelB: ChannelRef[(MsgB, MsgC) :+: TNil] = ???
|
||||
val channelC: ChannelRef[(MsgC, MsgD) :+: TNil] = ???
|
||||
|
||||
val a = new MsgA
|
||||
val fA = Future { new MsgA }
|
||||
|
||||
channelA <-!- a // send a to channelA
|
||||
a -!-> channelA // same thing as above
|
||||
|
||||
channelA <-!- fA // eventually send the future’s value to channelA
|
||||
fA -!-> channelA // same thing as above
|
||||
|
||||
val fB: Future[MsgB] = channelA <-?- a // ask the actor
|
||||
a -?-> channelA // same thing as above
|
||||
|
||||
// ask the actor with multiple reply types
|
||||
// return type given in full for illustration
|
||||
val fM: Future[WrappedMessage[ //
|
||||
(MsgB, Nothing) :+: (MsgC, Nothing) :+: TNil, Msg]] = channelA2 <-?- a
|
||||
val fMunwrapped: Future[Msg] = fM.lub
|
||||
|
||||
channelA <-?- fA // eventually ask the actor, return the future
|
||||
fA -?-> channelA // same thing as above
|
||||
|
||||
// chaining works as well
|
||||
a -?-> channelA -?-> channelB -!-> channelC
|
||||
//#sending
|
||||
}
|
||||
|
||||
"demonstrate message forwarding" in {
|
||||
//#forwarding
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
|
||||
class Latch[T1: TypeTag, T2: TypeTag](target: ChannelRef[(T1, T2) :+: TNil])
|
||||
extends Actor with Channels[TNil, (Request, Reply) :+: (T1, T2) :+: TNil] {
|
||||
|
||||
implicit val timeout = Timeout(5.seconds)
|
||||
|
||||
//#become
|
||||
channel[Request] {
|
||||
|
||||
case (Command("close"), snd) =>
|
||||
channel[T1] { (t, s) => t -?-> target -!-> s }
|
||||
snd <-!- CommandSuccess
|
||||
|
||||
case (Command("open"), snd) =>
|
||||
channel[T1] { (_, _) => }
|
||||
snd <-!- CommandSuccess
|
||||
}
|
||||
|
||||
//#become
|
||||
channel[T1] { (t, snd) => t -?-> target -!-> snd }
|
||||
}
|
||||
//#forwarding
|
||||
|
||||
val probe = TestProbe()
|
||||
val _target = new ChannelRef[(String, Int) :+: TNil](probe.ref)
|
||||
val _self = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
|
||||
//#usage
|
||||
implicit val selfChannel: ChannelRef[(Any, Nothing) :+: TNil] = _self
|
||||
val target: ChannelRef[(String, Int) :+: TNil] = _target // some actor
|
||||
|
||||
// type given just for demonstration purposes
|
||||
val latch: ChannelRef[(Request, Reply) :+: (String, Int) :+: TNil] =
|
||||
ChannelExt(system).actorOf(new Latch(target), "latch")
|
||||
|
||||
"hello" -!-> latch
|
||||
//#processing
|
||||
probe.expectMsg("hello")
|
||||
probe.reply(5)
|
||||
//#processing
|
||||
expectMsg(5) // this is a TestKit-based example
|
||||
|
||||
Command("open") -!-> latch
|
||||
expectMsg(CommandSuccess)
|
||||
|
||||
"world" -!-> latch
|
||||
//#processing
|
||||
probe.expectNoMsg(500.millis)
|
||||
//#processing
|
||||
expectNoMsg(500.millis)
|
||||
//#usage
|
||||
}
|
||||
|
||||
"demonstrate child creation" in {
|
||||
implicit val selfChannel = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
|
||||
//#child
|
||||
//
|
||||
// then it is used somewhat like this:
|
||||
//
|
||||
|
||||
val parent = ChannelExt(system).actorOf(new Parent, "parent")
|
||||
parent <-!- GetChild
|
||||
val child = expectMsgType[ChildRef].child // this assumes TestKit context
|
||||
|
||||
child <-!- Command("hey there")
|
||||
expectMsg(CommandSuccess)
|
||||
//#child
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -5,7 +5,6 @@ Actors
|
|||
:maxdepth: 2
|
||||
|
||||
actors
|
||||
typed-channels
|
||||
typed-actors
|
||||
fault-tolerance
|
||||
dispatchers
|
||||
|
|
|
|||
|
|
@ -1,548 +0,0 @@
|
|||
.. _typed-channels:
|
||||
|
||||
#############################
|
||||
Typed Channels (EXPERIMENTAL)
|
||||
#############################
|
||||
|
||||
.. note::
|
||||
|
||||
*This is a preview of the upcoming Typed Channels support, its API may change
|
||||
during development up to the released version where the EXPERIMENTAL label is
|
||||
removed.*
|
||||
|
||||
Motivation
|
||||
==========
|
||||
|
||||
Actors derive great strength from their strong encapsulation, which enables
|
||||
internal restarts as well as changing behavior and also composition. The last
|
||||
one is enabled by being able to inject an actor into a message exchange
|
||||
transparently, because all either side ever sees is an :class:`ActorRef`. The
|
||||
straight-forward way to implement this encapsulation is to keep the actor
|
||||
references untyped, and before the advent of macros in Scala 2.10 this was the
|
||||
only tractable way.
|
||||
|
||||
As a motivation for change consider the following simple example:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation0
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation1
|
||||
|
||||
This is an error which is quite common, and the reason is that the compiler
|
||||
does not catch it and cannot warn about it. Now if there were some type
|
||||
restrictions on which messages the ``commandProcessor`` can process, that would
|
||||
be a different story:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation2
|
||||
|
||||
The :class:`ChannelRef` wraps a normal untyped :class:`ActorRef`, but it
|
||||
expresses a type constraint, namely that this channel accepts only messages of
|
||||
type :class:`Request`, to which it may reply with messages of type
|
||||
:class:`Reply`. The types do not express any guarantees on how many messages
|
||||
will be exchanged, whether they will be received or processed, or whether a
|
||||
reply will actually be sent. They only restrict those actions which are known
|
||||
to be doomed already at compile time. In this case the second line would flag
|
||||
an error, since the companion object ``Command`` is not an instance of type
|
||||
:class:`Request`.
|
||||
|
||||
While this example looks pretty simple, the implications are profound. In order
|
||||
to be useful, the system must be as reliable as you would expect a type system
|
||||
to be. This means that unless you step outside of it (i.e. doing the
|
||||
equivalent of ``.asInstanceOf[_]``) you shall be protected, failures shall be
|
||||
recognized and flagged. There are a number of challenges included in this
|
||||
requirement, which are discussed in `The Design Background`_ below.
|
||||
|
||||
Terminology
|
||||
===========
|
||||
|
||||
.. describe:: type Channel[I, O] = (I, O)
|
||||
|
||||
A Channel is a pair of an input type and and output type. The input type is
|
||||
the type of message accepted by the channel, the output type is the possible
|
||||
reply type and may be ``Nothing`` to signify that no reply is sent. The input
|
||||
type cannot be ``Nothing``.
|
||||
|
||||
.. describe:: type ChannelList
|
||||
|
||||
A ChannelList is an ordered collection of Channels, without further
|
||||
restriction on the input or output types of these. This means that a single
|
||||
input type may be associated with multiple output types within the same
|
||||
ChannelList.
|
||||
|
||||
.. describe:: type TNil <: ChannelList
|
||||
|
||||
The empty ChannelList.
|
||||
|
||||
.. describe:: type :+:[Channel, ChannelList] <: ChannelList
|
||||
|
||||
This binary type constructor is used to build up lists of Channels, for which
|
||||
infix notation will be most convenient:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation-types
|
||||
|
||||
.. describe:: class ChannelRef[T <: ChannelList]
|
||||
|
||||
A ChannelRef is what is referred to above as the channel reference, it bears
|
||||
the ChannelList which describes all input and output types and their relation
|
||||
for the referenced actor. It also contains the underlying :class:`ActorRef`.
|
||||
|
||||
.. describe:: trait Channels[P <: ChannelList, C <: ChannelList]
|
||||
|
||||
A mixin for the :class:`Actor` trait which is parameterized in the channel
|
||||
requirements this actor has for its parentChannel (P) and its selfChannel (C)
|
||||
(corresponding to ``context.parent`` and ``self`` for untyped Actors,
|
||||
respectively).
|
||||
|
||||
.. describe:: selfChannel
|
||||
|
||||
An ``Actor with Channels[P, C]`` has a ``selfChannel`` of type
|
||||
``ChannelRef[C]``. This is the same type of channel reference which is
|
||||
obtained by creating an instance of this actor.
|
||||
|
||||
.. describe:: parentChannel
|
||||
|
||||
An ``Actor with Channels[P, C]`` has a ``parentChannel`` of type
|
||||
``ChannelRef[P]``.
|
||||
|
||||
.. describe:: type ReplyChannels[T <: ChannelList] <: ChannelList
|
||||
|
||||
Within an ``Actor with Channels[_, _]`` which takes a fully generic channel,
|
||||
i.e. a type argument ``T <: ChannelList`` which is part of its selfChannel
|
||||
type, this channel’s reply types are not known. The definition of this
|
||||
channel uses the ReplyChannels type to abstractly refer to this unknown set
|
||||
of channels in order to forward a reply from a ``ChannelRef[T]`` back to the
|
||||
original sender. This operation’s type-safety is ensured at the sender’s site
|
||||
by way of the ping-pong analysis described below.
|
||||
|
||||
.. describe:: class WrappedMessage[T <: ChannelList, LUB]
|
||||
|
||||
Scala’s type system cannot directly express type unions. Asking an actor with
|
||||
a given input type may result in multiple possible reply types, hence the
|
||||
:class:`Future` holding this reply will contain the value wrapped inside a
|
||||
container which carries this type (only at compile-time). The type parameter
|
||||
LUB is the least upper bound of all input channels contained in the
|
||||
ChannelList T.
|
||||
|
||||
Sending Messages across Channels
|
||||
================================
|
||||
|
||||
Sending messages is best demonstrated in a quick overview of the basic operations:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#sending
|
||||
|
||||
The first line is included so that the code compiles, since all message sends
|
||||
including ``!`` will check the implicitly found selfChannel for compatibility
|
||||
with the target channel’s reply types. In this case we want to demonstrate just
|
||||
the syntax of sending, hence the dummy sender which accepts everything and
|
||||
replies never.
|
||||
|
||||
Presupposing three channel references of chainable types (and a fourth one for
|
||||
demonstrating multiple reply type), an input value ``a`` and a Future holding
|
||||
such a value, we demonstrate the two basic operations which are well known from
|
||||
untyped actors: tell/! and ask/?. The type of the Future returned by the ask
|
||||
operation on ``channelA2`` may seem surprising at first, but keeping track of
|
||||
all possible reply types is necessary to enable sending of replies to other
|
||||
actors which do support all possibilities. This is especially handy in
|
||||
situations like the one demonstrated on the last line. What the last line does
|
||||
is the following:
|
||||
|
||||
* it asks channelA, which returns a Future
|
||||
|
||||
* a callback is installed on the Future which will use the reply value of
|
||||
channelA and ask channelB with it, returning another Future
|
||||
|
||||
* a callback is installed on that Future to send the reply value of channelB to
|
||||
channelC, returning a Future with that previously sent value (using ``andThen``)
|
||||
|
||||
This example also motivates the introduction of the “turned-around” syntax
|
||||
where messages flow more naturally from left to right, instead of the standard
|
||||
object-oriented view of having the tell method operate on the ActorRef given to
|
||||
the left.
|
||||
|
||||
This example informally introduced what is more precisely specified in the
|
||||
following subsection.
|
||||
|
||||
The Rules
|
||||
---------
|
||||
|
||||
Operations on typed channels are composable and obey a few simple rules:
|
||||
|
||||
* the message to be sent can be one of three things:
|
||||
|
||||
* a :class:`Future[_]`, in which case the contained value will be sent once
|
||||
available; the value will be unwrapped if it is a :class:`WrappedMessage[_, _]`
|
||||
|
||||
* a :class:`WrappedMessage[_, _]`, which will be unwrapped (i.e. only the
|
||||
value is sent)
|
||||
|
||||
* everything else is sent as is
|
||||
|
||||
* the operators are fully symmetric, i.e. ``-!->`` and ``<-!-`` do the same
|
||||
thing provided the arguments also switch places
|
||||
|
||||
* sending with ``-?->`` or ``<-?-`` returns a ``Future[WrappedMessage[_, _]]``
|
||||
representing all possible reply channels if there is more than one (use
|
||||
``.lub`` to get a :class:`Future[_]` with the most precise single type for
|
||||
the value)
|
||||
|
||||
* sending a :class:`Future[_]` with ``-!->`` or ``<-!-`` returns a new
|
||||
:class:`Future[_]` which will be completed with the value after it has been
|
||||
sent; sending a strict value returns that value
|
||||
|
||||
Declaring an Actor with Channels
|
||||
================================
|
||||
|
||||
The declaration of an Actor with Channels is done like this:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#declaring-channels
|
||||
|
||||
It should be noted that it is impossible to declare channels which are not part
|
||||
of the channel list given as the second type argument to the :class:`Channels`
|
||||
trait. It is also checked—albeit at runtime—that when the actor’s construction
|
||||
is complete (i.e. its constructor and ``preStart`` hook have run) every channel
|
||||
listed in the selfChannel type parameter has been declared. This can in general
|
||||
not be done at compile time, both due to the possibility of overriding
|
||||
subclasses as well as the problem that the compiler cannot determine whether a
|
||||
``channel[]`` statement will be called in the course of execution due to
|
||||
external inputs (e.g. if conditionally executed).
|
||||
|
||||
It should also be noted that the type of ``req`` in this example is
|
||||
``Request``, hence it would be a compile-time error to try to match against the
|
||||
``Command`` companion object. The ``snd`` reference is the sender channel
|
||||
reference, which in this example is of type
|
||||
``ChannelRef[(Reply, UnknownDoNotWriteMeDown) :+: TNil]``, meaning that sending
|
||||
back a reply which is not of type ``Reply`` would be a compile-time error.
|
||||
|
||||
The last thing to note is that an actor is not obliged to reply to an incoming
|
||||
message, even if that was successfully delivered to it: it might not be
|
||||
appropriate, or it might be impossible, the actor might have failed before
|
||||
executing the replying message send, etc. And as always, the ``snd`` reference
|
||||
may be used more than once, and even stored away for later. It must not leave
|
||||
the actor within it was created, however, because that would defeat the
|
||||
ping-pong check; this is the reason for the curious name of the fabricated
|
||||
reply type ``UnknownDoNotWriteMeDown``; if you find yourself declaring that
|
||||
type as part of a message or similar you know that you are cheating.
|
||||
|
||||
Declaration of Subchannels
|
||||
--------------------------
|
||||
|
||||
It can be convenient to carve out subchannels for special treatment like so:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#declaring-subchannels
|
||||
|
||||
This means that all ``Command`` requests will be positively answered while all
|
||||
others may or may not be lucky. This dispatching between the two declarations
|
||||
does not depend on their order but is solely done based on which type is more
|
||||
specific—but see the restrictions imposed by JVM type erasure below.
|
||||
|
||||
Forwarding Messages
|
||||
-------------------
|
||||
|
||||
Forwarding messages has been hinted at in the last sample already, but here is
|
||||
a more complete sample actor:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#forwarding
|
||||
:exclude: become
|
||||
|
||||
This actor declares a single-Channel parametric type which it forwards to a
|
||||
target actor, handing replies back to the original sender using the ask/pipe
|
||||
pattern.
|
||||
|
||||
.. note::
|
||||
|
||||
It is important not to forget the ``TypeTag`` context bound for all type
|
||||
arguments which are used in channel declarations, otherwise the not very
|
||||
helpful error “Predef is not an enclosing class” will haunt you.
|
||||
|
||||
Changing Behavior at Runtime
|
||||
----------------------------
|
||||
|
||||
The actor from the previous example gets a lot more interesting when
|
||||
implementing its control channel:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#forwarding
|
||||
|
||||
This shows all elements of the toolkit in action: calling ``channel[T1]`` again
|
||||
during the lifetime of the actor will alter its behavior on that channel. In
|
||||
this case a latch or gate is modeled which when closed will permit the messages
|
||||
to flow through and when not will drop the messages to the floor.
|
||||
|
||||
Creating Actors with Channels
|
||||
-----------------------------
|
||||
|
||||
Creating top-level actors with channels is done using the ``ChannelExt`` extension:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala
|
||||
:include: usage
|
||||
:exclude: processing
|
||||
|
||||
Inside an actor with channels children are created using the ``createChild`` method:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#child
|
||||
|
||||
In this example we create a simple child actor which responds to requests, but
|
||||
also keeps its parent informed about what it is doing. The parent channel
|
||||
within the child is thus declared to accept :class:`Stats` messages, and the
|
||||
parent must consequently declare such a channel in order to be able to create
|
||||
such a child. The parent’s job then is to create the child, make it available
|
||||
to the outside via properly typed messages and collect the statistics coming in
|
||||
from the child.
|
||||
|
||||
Stepping Outside of Type-Safety
|
||||
-------------------------------
|
||||
|
||||
In much the same was as Scala’s type system can be circumvented by using
|
||||
``.asInstanceOf[_]`` typed channels can also be circumvented. Casting them to
|
||||
alter the type arguments would be an obvious way of doing that, but there are
|
||||
less obvious ways which are therefore enumerated here:
|
||||
|
||||
* explicitly constructing :class:`ChannelRef` instances by hand allows using
|
||||
arbitrary types as arguments
|
||||
|
||||
* sending to the ``actorRef`` member of the :class:`ChannelRef`; this is a
|
||||
normal untyped actor reference without any compile-time checks, which is the
|
||||
reason for choosing visibly different operator names for typed and untyped
|
||||
message send operations
|
||||
|
||||
* using the ``context.parent`` reference instead of ``parentChannel``
|
||||
|
||||
* using the untyped ``sender`` reference instead of the second argument to a
|
||||
channel’s behavior function
|
||||
|
||||
Sending unforeseen messages will be flagged as a type error as long as none of
|
||||
these techniques are used within an application.
|
||||
|
||||
Implementation Restrictions
|
||||
---------------------------
|
||||
|
||||
As described below, incoming messages are dispatched to declared channels based
|
||||
on their runtime class information. This erasure-based dispatch of messages
|
||||
requires all declared channels to have unique JVM type representations, i.e. it
|
||||
is not possible to have two channel declarations with types ``List[A]`` and
|
||||
``List[B]`` because both would at runtime only be known as ``List[_]``.
|
||||
|
||||
The specific dispatch mechanism also requires the declaration of all channels or
|
||||
subchannels during the actor’s construction, independent of whether they shall
|
||||
later change behavior or not. Changing behavior for a subchannel is only
|
||||
possible if that subchannel was declared up-front.
|
||||
|
||||
TypeTags are currently (Scala 2.10.0) not serializable, hence narrowing of
|
||||
:class:`ActorRef` does not work for remote references.
|
||||
|
||||
The Design Background
|
||||
=====================
|
||||
|
||||
This section outlines the most prominent challenges encountered during the
|
||||
development of Typed Channels and the rationale for their solutions. It is not
|
||||
necessary to understand this material in order to use Typed Channels, but it
|
||||
may be useful to explain why certain things are as they are.
|
||||
|
||||
The Type Pollution Problem
|
||||
--------------------------
|
||||
|
||||
What if an actor accepts two different types of messages? It might be a main
|
||||
communications channel which is forwarded to worker actors for performing some
|
||||
long-running and/or dangerous task, plus an administrative channel for the
|
||||
routing of requests. Or it might be a generic message throttler which accepts a
|
||||
generic channel for passing it through (which delay where appropriate) and a
|
||||
management channel for setting the throttling rate. In the second case it is
|
||||
especially easy to see that those two channels will probably not be related,
|
||||
their types will not be derived from a meaningful common supertype; instead the
|
||||
least upper bound will probably be :class:`AnyRef`. If a typed channel
|
||||
reference only had the capability to express a single type, this type would
|
||||
then be no restriction anymore. This loss of type safety caused by the need of
|
||||
handling multiple disjoint sets of types is called “type pollution”, the term
|
||||
was coined by Prof. Philip Wadler.
|
||||
|
||||
One solution to this is to never expose references describing more than one
|
||||
channel at a time. But where would these references come from? It would be very
|
||||
difficult to make this construction process type-safe, and it would also be an
|
||||
inconvenient restriction, since message ordering guarantees only apply for the
|
||||
same sender–receive pair: if there are relations between the messages sent
|
||||
on multiple channels then implementing this mixed-channel communication would
|
||||
incur programmatic and runtime overhead compared to just sending to the same
|
||||
untyped reference.
|
||||
|
||||
The other solution thus is to express multiple channel types by a single
|
||||
channel reference, which requires the implementation of type lists and
|
||||
computations on these. And as we will see below it also requires the
|
||||
specification of possibly multiple reply channels per input type, hence a type
|
||||
map. The implementation chosen uses type lists like this:
|
||||
|
||||
.. includecode:: code/docs/channels/ChannelDocSpec.scala#motivation-types
|
||||
|
||||
This type expresses two channels: type ``A`` may stimulate replies of type
|
||||
``B``, while type ``C`` may evoke replies of type ``D``. The type operator
|
||||
``:+:`` is a binary type constructor which forms a list of these channel
|
||||
definitions, and like every good list it ends with an empty tail ``TNil``.
|
||||
|
||||
The Reply Problem
|
||||
-----------------
|
||||
|
||||
Akka actors have the power to reply to any message they receive, which is also
|
||||
a message send and shall also be covered by typed channels. Since the sending
|
||||
actor is the one which will also receive the reply, this needs to be verified.
|
||||
The solution to this problem is that in addition to the ``self`` reference,
|
||||
which is implicitly picked up as the sender for untyped actor interactions,
|
||||
there is also a ``selfChannel`` which describes the typed channels handled by
|
||||
this actor. Thus at the call site of the message send it must be verified that
|
||||
this actor can actually handle the reply for that given message send.
|
||||
|
||||
The Sender Ping-Pong Problem
|
||||
----------------------------
|
||||
|
||||
After successfully sending a message to an actor over a typed channel, that
|
||||
actor will have a reference to the message’s sender, because normal Akka
|
||||
message processing rules apply. For this sender reference there must exist a
|
||||
typed channel reference which describes the possible reply types which are
|
||||
applicable for each of the incoming message channels. We will see below how
|
||||
this reference is provided in the code, the problem we want to highlight here
|
||||
is a different one: the nature of any sender reference is that it is highly
|
||||
dynamic, the compiler cannot possibly know who sent the message we are
|
||||
currently processing.
|
||||
|
||||
But this does not mean that all hope is lost: the solution is to do *all*
|
||||
type-checking at the call site of the message send. The receiving actor just
|
||||
needs to declare its channel descriptions in its own type, and channel
|
||||
references are derived at construction from this type (implying the existence
|
||||
of a typed ``actorOf``). Then the actor knows for each received message type
|
||||
which the allowed reply types are. The typed channel for the sender reference
|
||||
hence has the reply types for the current input channel as its own input types,
|
||||
but what should the reply types be? This is the ping-pong problem:
|
||||
|
||||
* ActorA sends MsgA to ActorB
|
||||
|
||||
* ActorB replies with MsgB
|
||||
|
||||
* ActorA replies with MsgC
|
||||
|
||||
Every “reply” uses the sender channel, which is dynamic and hence only known
|
||||
partially. But ActorB did not know who sent the message it just replied to and
|
||||
hence it cannot check that it can process the possible replies following that
|
||||
message send. Only ActorA could have known, because it knows its own channels
|
||||
as well as ActorB’s channels completely. The solution is thus to recursively
|
||||
verify the message send, following all reply channels until all possible
|
||||
message types to be sent have been verified. This sounds horribly complex, but
|
||||
the algorithm for doing so actually has a worst-case complexity of O(N) where N
|
||||
is the number of input channels of ActorA or ActorB, whoever has fewer.
|
||||
|
||||
The Parent Problem
|
||||
------------------
|
||||
|
||||
There is one other actor reference which is available to every actor: its
|
||||
parent. Since the child–parent relationship is established permanently when the
|
||||
child is created by the parent, this problem is easily solvable by encoding the
|
||||
requirements of the child for its parent channel in its type signature and
|
||||
having the typed variant of ``actorOf`` verify this against the
|
||||
``selfChannel``.
|
||||
|
||||
Anecdotally, since the guardian actor does not care at all about messages sent
|
||||
to it, top-level actors with typed channels must declare their parent channel
|
||||
to be empty.
|
||||
|
||||
The Exposure/Restriction Problem
|
||||
--------------------------------
|
||||
|
||||
An actor may provide more than one service, either itself or by proxy, each
|
||||
with their own set of channels. Only having references for the full set of
|
||||
channels leads to a too wide spread of capabilities: in the example of the
|
||||
message rate throttling actor its management channel is only meant to be used
|
||||
by the actor which inserted it, not by the two actors between it was inserted.
|
||||
Hence the manager will have to create a channel reference which excludes the
|
||||
management channels before handing out the reference to other actors.
|
||||
|
||||
Another variant of this problem is an actor which handles a channel whose input
|
||||
type is a supertype for a number of derived channels. It should be allowed to
|
||||
use the “superchannel” in place of any of the subchannels, but not the other
|
||||
way around. The intuitive approach would be to model this by making the channel
|
||||
reference contravariant in its channel types and define those channel types
|
||||
accordingly. This does not work nicely, however, because Scala’s type system is
|
||||
not well-suited to modeling such calculations on unordered type lists; it might
|
||||
be possible but its implementation would be forbiddingly complex.
|
||||
|
||||
Therefore this topic gained traction as macros became available: being able to
|
||||
write down type calculations using standard collections and their
|
||||
transformations reduces the implementation to a handful of lines. The “narrow”
|
||||
operation implemented this way allows narrowing of input channels and
|
||||
widening of output channels down to ``(Nothing, Any)`` (which is to say that
|
||||
channels may be narrowed or just plain removed from a channel list).
|
||||
|
||||
The Forwarding Problem
|
||||
----------------------
|
||||
|
||||
One important feature of actors mentioned above is their composability which is
|
||||
enabled by being able to forward or delegate messages. It is the nature of this
|
||||
process that the sending party is not aware of the true destination of the
|
||||
message, it only sees the façade in front of it. Above we have seen that the
|
||||
sender ping-pong problem requires all verification to be performed at the
|
||||
sender’s end, but if the sender does not know the final recipient, how can it
|
||||
check that the message exchange is type-safe?
|
||||
|
||||
The forwarding party—the middle-man—is also not in the position to make this
|
||||
call, since all it has is the incomplete sender channel which is lacking reply
|
||||
type information. The problem which arises lies precisely in these reply
|
||||
sequences: the ping-pong scheme was verified against the middle-man, and if the
|
||||
final recipient would reply to the forwarded request, that sender reference
|
||||
would belong to a different channel and there is no single location in the
|
||||
source code where all these pieces are known at compile time.
|
||||
|
||||
The solution to this problem is to not allow forwarding in the normal untyped
|
||||
:class:`ActorRef` sense. Replies must always be sent by the recipient of the
|
||||
original message in order for the type checks at the sender site to be
|
||||
effective. Since forwarding is an important communication pattern among actors,
|
||||
support for it is thus provided in the form of the :meth:`ask` pattern combined
|
||||
with the :meth:`pipe` pattern, which both are not add-ons but fully integrated
|
||||
operations among typed channels.
|
||||
|
||||
The JVM Erasure Problem
|
||||
-----------------------
|
||||
|
||||
When an actor with typed channels receives a message, this message needs to be
|
||||
dispatched internally to the right channel, so that the right sender channel
|
||||
can be presented and so on. This dispatch needs to work with the information
|
||||
contained in the message, which due to the erasure of generic type information
|
||||
is an incomplete image of the true channel types. Those full types exist only
|
||||
at compile-time and reifying them into TypeTags at runtime for every message
|
||||
send would be prohibitively expensive. This means that channels which erase to
|
||||
the same JVM type cannot coexist within the same actor, messages would not be
|
||||
routable reliably in that case.
|
||||
|
||||
The Actor Lookup Problem
|
||||
------------------------
|
||||
|
||||
Everything up to this point has assumed that channel references are passed from
|
||||
their point of creation to their point of use directly and in the regime of
|
||||
strong, unerased types. This can also happen between actors by embedding them
|
||||
in case classes with proper type information. But one particular useful feature
|
||||
of Akka actors is that they have a stable identity by which they can be found,
|
||||
a unique name. This name is represented as a :class:`String` and naturally does
|
||||
not bear any type information concerning the actor’s channels. Thus, when
|
||||
looking up an actor with ``system.actorSelection(...)`` followed by an ``Identify``
|
||||
request you will only get an untyped :class:`ActorRef` and not a channel reference.
|
||||
This :class:`ActorRef` can of course manually be wrapped in a channel reference
|
||||
bearing the desired channels, but this is not a type-safe operation.
|
||||
|
||||
The solution in this case must be a runtime check. There is an operation to
|
||||
“narrow” an :class:`ActorRef` to a channel reference of given type, which
|
||||
behind the scenes will send a message to the designated actor with a TypeTag
|
||||
representing the requested channels. The actor will check these against its own
|
||||
TypeTag and reply with the verification result. This check uses the same code
|
||||
as the compile-time “narrow” operation introduced above.
|
||||
|
||||
How to read The Types
|
||||
=====================
|
||||
|
||||
In case of errors in your code the compiler will try to inform you in the most
|
||||
precise way it can, and that will then contain types like this::
|
||||
|
||||
akka.channels.:+:[(com.example.Request, com.example.Reply),
|
||||
akka.channels.:+:[(com.example.Command, Nothing), TNil]]
|
||||
|
||||
These types look unwieldy because of two things: they use fully qualified names
|
||||
for all the types (thankfully using the ``()`` sugar for :class:`Tuple2`), and
|
||||
they do not employ infix notation. That same type there might look like this in
|
||||
your source code::
|
||||
|
||||
(Request, Reply) :+: (Command, Nothing) :+: TNil
|
||||
|
||||
As soon as someone finds the time, it would be nice if the IDEs learned to
|
||||
print types making use of the file’s import statements and infix notation.
|
||||
|
|
@ -52,12 +52,12 @@ object AkkaBuild extends Build {
|
|||
testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean,
|
||||
parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean,
|
||||
Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository",
|
||||
unidocExclude := Seq(samples.id, channelsTests.id, remoteTests.id),
|
||||
unidocExclude := Seq(samples.id, remoteTests.id),
|
||||
sources in JavaDoc <<= junidocSources,
|
||||
javacOptions in JavaDoc := Seq(),
|
||||
artifactName in packageDoc in JavaDoc := ((sv, mod, art) => "" + mod.name + "_" + sv.binary + "-" + mod.revision + "-javadoc.jar"),
|
||||
packageDoc in Compile <<= packageDoc in JavaDoc,
|
||||
Dist.distExclude := Seq(actorTests.id, docs.id, samples.id, osgi.id, osgiAries.id, channelsTests.id),
|
||||
Dist.distExclude := Seq(actorTests.id, docs.id, samples.id, osgi.id, osgiAries.id),
|
||||
// generate online version of docs
|
||||
sphinxInputs in Sphinx <<= sphinxInputs in Sphinx in LocalProject(docs.id) map { inputs => inputs.copy(tags = inputs.tags :+ "online") },
|
||||
// don't regenerate the pdf, just reuse the akka-docs version
|
||||
|
|
@ -74,8 +74,7 @@ object AkkaBuild extends Build {
|
|||
|
||||
),
|
||||
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor,
|
||||
persistence, mailboxes, zeroMQ, kernel, osgi, osgiAries, docs, contrib, samples, channels, channelsTests,
|
||||
multiNodeTestkit)
|
||||
persistence, mailboxes, zeroMQ, kernel, osgi, osgiAries, docs, contrib, samples, multiNodeTestkit)
|
||||
)
|
||||
|
||||
lazy val akkaScalaNightly = Project(
|
||||
|
|
@ -83,8 +82,7 @@ object AkkaBuild extends Build {
|
|||
base = file("akka-scala-nightly"),
|
||||
// remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings)
|
||||
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j,
|
||||
persistence, mailboxes, kernel, osgi, osgiAries, contrib, samples, channels, channelsTests,
|
||||
multiNodeTestkit)
|
||||
persistence, mailboxes, kernel, osgi, osgiAries, contrib, samples, multiNodeTestkit)
|
||||
)
|
||||
|
||||
// this detached pseudo-project is used for running the tests against a different Scala version than the one used for compilation
|
||||
|
|
@ -602,7 +600,7 @@ object AkkaBuild extends Build {
|
|||
lazy val docs = Project(
|
||||
id = "akka-docs",
|
||||
base = file("akka-docs"),
|
||||
dependencies = Seq(actor, testkit % "test->test", channels,
|
||||
dependencies = Seq(actor, testkit % "test->test",
|
||||
remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, osgiAries,
|
||||
persistence % "compile;test->test"),
|
||||
settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
|
||||
|
|
@ -652,32 +650,12 @@ object AkkaBuild extends Build {
|
|||
)
|
||||
) configs (MultiJvm)
|
||||
|
||||
lazy val channels = Project(
|
||||
id = "akka-channels-experimental",
|
||||
base = file("akka-channels"),
|
||||
dependencies = Seq(actor),
|
||||
settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ experimentalSettings ++ Seq(
|
||||
libraryDependencies +=("org.scala-lang" % "scala-reflect" % scalaVersion.value),
|
||||
reportBinaryIssues := () // disable bin comp check
|
||||
)
|
||||
)
|
||||
|
||||
// // this issue will be fixed in M8, for now we need to exclude M6, M7 modules used to compile the compiler
|
||||
def excludeOldModules(m: ModuleID) = List("M6", "M7").foldLeft(m) { (mID, mStone) =>
|
||||
val version = s"2.11.0-$mStone"
|
||||
mID.exclude("org.scala-lang.modules", s"scala-parser-combinators_$version").exclude("org.scala-lang.modules", s"scala-xml_$version")
|
||||
}
|
||||
|
||||
lazy val channelsTests = Project(
|
||||
id = "akka-channels-tests",
|
||||
base = file("akka-channels-tests"),
|
||||
dependencies = Seq(channels, testkit % "compile;test->test"),
|
||||
settings = defaultSettings ++ formatSettings ++ experimentalSettings ++ Seq(
|
||||
publishArtifact in Compile := false,
|
||||
libraryDependencies += excludeOldModules("org.scala-lang" % "scala-compiler" % scalaVersion.value),
|
||||
reportBinaryIssues := () // disable bin comp check
|
||||
)
|
||||
)
|
||||
|
||||
// Settings
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue