pekko/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala

215 lines
No EOL
5.6 KiB
Scala
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.channels
import akka.testkit.AkkaSpec
import akka.channels._
import akka.actor.Actor
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.util.Timeout
import akka.testkit.TestProbe
object ChannelDocSpec {
//#motivation0
trait Request
case class Command(msg: String) extends Request
trait Reply
case object CommandSuccess extends Reply
case class CommandFailure(msg: String) extends Reply
//#motivation0
//#child
case class Stats(b: Request)
case object GetChild
case class ChildRef(child: ChannelRef[(Request, Reply) :+: TNil])
class Child extends Actor
with Channels[(Stats, Nothing) :+: TNil, (Request, Reply) :+: TNil] {
channel[Request] { (x, snd)
parentChannel <-!- Stats(x)
snd <-!- CommandSuccess
}
}
class Parent extends Actor
with Channels[TNil, (Stats, Nothing) :+: (GetChild.type, ChildRef) :+: TNil] {
val child = createChild(new Child)
channel[GetChild.type] { (_, snd) ChildRef(child) -!-> snd }
channel[Stats] { (x, _)
// collect some stats
}
}
//#child
}
class ChannelDocSpec extends AkkaSpec {
import ChannelDocSpec._
class A
class B
class C
class D
"demonstrate why Typed Channels" in {
def someActor = testActor
//#motivation1
val requestProcessor = someActor
requestProcessor ! Command
//#motivation1
expectMsg(Command)
/*
//#motivation2
val requestProcessor = new ChannelRef[(Request, Reply) :+: TNil](someActor)
requestProcessor <-!- Command // this does not compile
//#motivation2
*/
type Example = //
//#motivation-types
(A, B) :+: (C, D) :+: TNil
//#motivation-types
}
// only compile test
"demonstrate channels creation" ignore {
//#declaring-channels
class AC extends Actor with Channels[TNil, (Request, Reply) :+: TNil] {
channel[Request] { (req, snd)
req match {
case Command("ping") snd <-!- CommandSuccess
case _
}
}
}
//#declaring-channels
//#declaring-subchannels
class ACSub extends Actor with Channels[TNil, (Request, Reply) :+: TNil] {
channel[Command] { (cmd, snd) snd <-!- CommandSuccess }
channel[Request] { (req, snd)
if (ThreadLocalRandom.current.nextBoolean) snd <-!- CommandSuccess
else snd <-!- CommandFailure("no luck")
}
}
//#declaring-subchannels
}
// only compile test
"demonstrating message sending" ignore {
import scala.concurrent.ExecutionContext.Implicits.global
//#sending
implicit val dummySender: ChannelRef[(Any, Nothing) :+: TNil] = ???
implicit val timeout: Timeout = ??? // for the ask operations
val channelA: ChannelRef[(A, B) :+: TNil] = ???
val channelB: ChannelRef[(B, C) :+: TNil] = ???
val channelC: ChannelRef[(C, D) :+: TNil] = ???
val a = new A
val fA = Future { new A }
channelA <-!- a // send a to channelA
a -!-> channelA // same thing as above
channelA <-!- fA // eventually send the futures value to channelA
fA -!-> channelA // same thing as above
// ask the actor; return type given in full for illustration
val fB: Future[WrappedMessage[(B, Nothing) :+: TNil, B]] = channelA <-?- a
val fBunwrapped: Future[B] = fB.lub
a -?-> channelA // same thing as above
channelA <-?- fA // eventually ask the actor, return the future
fA -?-> channelA // same thing as above
// chaining works as well
a -?-> channelA -?-> channelB -!-> channelC
//#sending
}
"demonstrate message forwarding" in {
//#forwarding
import scala.reflect.runtime.universe.TypeTag
class Latch[T1: TypeTag, T2: TypeTag](target: ChannelRef[(T1, T2) :+: TNil])
extends Actor with Channels[TNil, (Request, Reply) :+: (T1, T2) :+: TNil] {
implicit val timeout = Timeout(5.seconds)
//#become
channel[Request] {
case (Command("close"), snd)
channel[T1] { (t, s) t -?-> target -!-> s }
snd <-!- CommandSuccess
case (Command("open"), snd)
channel[T1] { (_, _) }
snd <-!- CommandSuccess
}
//#become
channel[T1] { (t, snd) t -?-> target -!-> snd }
}
//#forwarding
val probe = TestProbe()
val _target = new ChannelRef[(String, Int) :+: TNil](probe.ref)
val _self = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
//#usage
implicit val selfChannel: ChannelRef[(Any, Nothing) :+: TNil] = _self
val target: ChannelRef[(String, Int) :+: TNil] = _target // some actor
// type given just for demonstration purposes
val latch: ChannelRef[(Request, Reply) :+: (String, Int) :+: TNil] =
ChannelExt(system).actorOf(new Latch(target))
"hello" -!-> latch
//#processing
probe.expectMsg("hello")
probe.reply(5)
//#processing
expectMsg(5) // this is a TestKit-based example
Command("open") -!-> latch
expectMsg(CommandSuccess)
"world" -!-> latch
//#processing
probe.expectNoMsg(500.millis)
//#processing
expectNoMsg(500.millis)
//#usage
}
"demonstrate child creation" in {
implicit val selfChannel = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
//#child
//
// then it is used somewhat like this:
//
val parent = ChannelExt(system).actorOf(new Parent)
parent <-!- GetChild
val child = expectMsgType[ChildRef].child // this assumes TestKit context
child <-!- Command("hey there")
expectMsg(CommandSuccess)
//#child
}
}