Remove actor dsl (#26784)

* Removals of actor dsl
* Mima for actor dsl removal
* Remove inbox doc test
* Keep main in echo server example
This commit is contained in:
Christopher Batey 2019-04-25 14:53:28 +01:00 committed by Patrik Nordwall
parent 8b3fbe8107
commit 39b344c508
15 changed files with 20 additions and 1649 deletions

View file

@ -1,28 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor;
import java.util.concurrent.TimeoutException;
import java.time.Duration;
import org.junit.ClassRule;
import org.junit.Test;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.scalatest.junit.JUnitSuite;
public class InboxJavaAPITest extends JUnitSuite {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("InboxJavaAPITest", AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
@Test(expected = TimeoutException.class)
public void mustBeAbleToThrowTimeoutException() throws TimeoutException {
Inbox inbox = Inbox.create(system);
inbox.receive(Duration.ofMillis(10));
}
}

View file

@ -1,272 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor.ActorDSL._
import akka.event.Logging.Warning
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
import akka.testkit.TimingTest
import com.github.ghik.silencer.silent
class ActorDSLDummy {
//#import
import akka.actor.ActorSystem
implicit val system = ActorSystem("demo")
//#import
}
@silent
class ActorDSLSpec extends AkkaSpec {
val echo = system.actorOf(Props(new Actor {
def receive = {
case x => sender() ! x
}
}))
"An Inbox" must {
"function as implicit sender" in {
//#inbox
import akka.actor.ActorDSL._
implicit val i = inbox()
echo ! "hello"
i.receive() should ===("hello")
//#inbox
}
"support watch" in {
//#watch
val target = // some actor
//#watch
actor(new Act {})
//#watch
val i = inbox()
i.watch(target)
//#watch
target ! PoisonPill
i.receive(1.second) should ===(Terminated(target)(true, false))
}
"support queueing multiple queries" in {
val i = inbox()
import system.dispatcher
val res =
Future.sequence(Seq(Future { i.receive() }.recover { case x => x }, Future {
Thread.sleep(100); i.select() { case "world" => 1 }
}.recover { case x => x }, Future { Thread.sleep(200); i.select() { case "hello" => 2 } }.recover {
case x => x
}))
Thread.sleep(1000)
res.isCompleted should ===(false)
i.receiver ! 42
i.receiver ! "hello"
i.receiver ! "world"
Await.result(res, 5 second) should ===(Seq(42, 1, 2))
}
"support selective receives" in {
val i = inbox()
i.receiver ! "hello"
i.receiver ! "world"
val result = i.select() {
case "world" => true
}
result should ===(true)
i.receive() should ===("hello")
}
"have a maximum queue size" taggedAs TimingTest in {
val i = inbox()
system.eventStream.subscribe(testActor, classOf[Warning])
try {
for (_ <- 1 to 1000) i.receiver ! 0
expectNoMessage(1 second)
EventFilter.warning(start = "dropping message", occurrences = 1).intercept {
i.receiver ! 42
}
expectMsgType[Warning]
i.receiver ! 42
expectNoMessage(1 second)
val gotit = for (_ <- 1 to 1000) yield i.receive()
gotit should ===((1 to 1000).map(_ => 0))
intercept[TimeoutException] {
i.receive(1 second)
}
} finally {
system.eventStream.unsubscribe(testActor, classOf[Warning])
}
}
"have a default and custom timeouts" taggedAs TimingTest in {
val i = inbox()
within(5 seconds, 6 seconds) {
intercept[TimeoutException](i.receive())
}
within(1 second) {
intercept[TimeoutException](i.receive(100 millis))
}
}
}
"A lightweight creator" must {
"support creating regular actors" in {
//#simple-actor
val a = actor(new Act {
become {
case "hello" => sender() ! "hi"
}
})
//#simple-actor
implicit val i = inbox()
a ! "hello"
i.receive() should ===("hi")
}
"support becomeStacked" in {
//#becomeStacked
val a = actor(new Act {
become { // this will replace the initial (empty) behavior
case "info" => sender() ! "A"
case "switch" =>
becomeStacked { // this will stack upon the "A" behavior
case "info" => sender() ! "B"
case "switch" => unbecome() // return to the "A" behavior
}
case "lobotomize" => unbecome() // OH NOES: Actor.emptyBehavior
}
})
//#becomeStacked
implicit val sender = testActor
a ! "info"
expectMsg("A")
a ! "switch"
a ! "info"
expectMsg("B")
a ! "switch"
a ! "info"
expectMsg("A")
}
"support setup/teardown" in {
//#simple-start-stop
val a = actor(new Act {
whenStarting { testActor ! "started" }
whenStopping { testActor ! "stopped" }
})
//#simple-start-stop
system.stop(a)
expectMsg("started")
expectMsg("stopped")
}
"support restart" in {
//#failing-actor
val a = actor(new Act {
become {
case "die" => throw new Exception
}
whenFailing { case m @ (_, _) => testActor ! m }
whenRestarted { cause =>
testActor ! cause
}
})
//#failing-actor
EventFilter[Exception](occurrences = 1).intercept {
a ! "die"
}
expectMsgPF() { case (_: Exception, Some("die")) => }
expectMsgPF() { case _: Exception => }
}
"support superviseWith" in {
val a = actor(new Act {
val system = null // shadow the implicit system
//#supervise-with
superviseWith(OneForOneStrategy() {
case e: Exception if e.getMessage == "hello" => Stop
case _: Exception => Resume
})
//#supervise-with
val child = actor("child")(new Act {
whenFailing { (_, _) =>
}
become {
case ref: ActorRef => whenStopping(ref ! "stopped")
case ex: Exception => throw ex
}
})
become {
case x => child ! x
}
})
a ! testActor
EventFilter.warning("hi", occurrences = 1).intercept {
a ! new Exception("hi")
}
expectNoMessage(1 second)
EventFilter[Exception]("hello", occurrences = 1).intercept {
a ! new Exception("hello")
}
expectMsg("stopped")
}
"supported nested declaration" in {
val system = this.system
//#nested-actor
// here we pass in the ActorRefFactory explicitly as an example
val a = actor(system, "fred")(new Act {
val b = actor("barney")(new Act {
whenStarting { context.parent ! ("hello from " + self.path) }
})
become {
case x => testActor ! x
}
})
//#nested-actor
expectMsg("hello from akka://ActorDSLSpec/user/fred/barney")
lastSender should ===(a)
}
"support Stash" in {
//#act-with-stash
val a = actor(new ActWithStash {
become {
case 1 => stash()
case 2 =>
testActor ! 2; unstashAll()
becomeStacked {
case 1 => testActor ! 1; unbecome()
}
}
})
//#act-with-stash
a ! 1
a ! 2
expectMsg(2)
expectMsg(1)
a ! 1
a ! 2
expectMsg(2)
expectMsg(1)
}
}
}

View file

@ -187,35 +187,4 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa
expectMsg("terminated")
}
}
"An ActWithStash" must {
"allow using whenRestarted" in {
import ActorDSL._
val a = actor(new ActWithStash {
become {
case "die" => throw new RuntimeException("dying")
}
whenRestarted { _ =>
testActor ! "restarted"
}
})
EventFilter[RuntimeException]("dying", occurrences = 1).intercept {
a ! "die"
}
expectMsg("restarted")
}
"allow using whenStopping" in {
import ActorDSL._
val a = actor(new ActWithStash {
whenStopping {
testActor ! "stopping"
}
})
a ! PoisonPill
expectMsg("stopping")
}
}
}

View file

@ -0,0 +1,6 @@
ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox$")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$*")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.dsl.*")

View file

@ -749,16 +749,6 @@ akka {
"akka.serialization.JavaSerializer" = 1
"akka.serialization.ByteArraySerializer" = 4
}
# Configuration items which are used by the akka.actor.ActorDSL._ methods
dsl {
# Maximum queue size of the actor created by newInbox(); this protects
# against faulty programs which use select() and consistently miss messages
inbox-size = 1000
# Default timeout to assume for operations like Inbox.receive et al
default-timeout = 5s
}
}
# Used to set the behavior of the scheduler.

View file

@ -1,171 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import scala.concurrent.duration._
import akka.pattern.ask
import scala.concurrent.Await
import akka.util.Helpers.ConfigOps
import akka.util.JavaDurationConverters._
import com.github.ghik.silencer.silent
/**
* This object contains elements which make writing actors and related code
* more concise, e.g. when trying out actors in the REPL.
*
* For the communication of non-actor code with actors, you may use anonymous
* actors tailored to this job:
*
* {{{
* import ActorDSL._
* import scala.concurrent.util.duration._
*
* implicit val system: ActorSystem = ...
*
* implicit val i = inbox()
* someActor ! someMsg // replies will go to `i`
*
* val reply = i.receive()
* val transformedReply = i.select(5 seconds) {
* case x: Int => 2 * x
* }
* }}}
*
* The `receive` and `select` methods are synchronous, i.e. they block the
* calling thread until an answer from the actor is received or the timeout
* expires. The default timeout is taken from configuration item
* `akka.actor.dsl.default-timeout`.
*
* When defining actors in the REPL, say, you may want to have a look at the
* `Act` trait:
*
* {{{
* import ActorDSL._
*
* val system: ActorSystem = ...
*
* val a = actor(system, "fred")(new Act {
* val b = actor("barney")(new Act {
* ...
* })
*
* become {
* case msg => ...
* }
* })
* }}}
*
* Note that `actor` can be used with an implicit [[akka.actor.ActorRefFactory]]
* as shown with `"barney"` (where the [[akka.actor.ActorContext]] serves this
* purpose), but since nested declarations share the same
* lexical context `"fred"`s ActorContext would be ambiguous
* if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also
* be circumvented by shadowing the name `system` within `"fred"`).
*
* <b>Note:</b> If you want to use an `Act with Stash`, you should use the
* `ActWithStash` trait in order to have the actor get the necessary deque-based
* mailbox setting.
*
* @deprecated Use the normal `actorOf` methods defined on `ActorSystem` and `ActorContext` to create Actors instead.
*/
@deprecated(
"deprecated Use the normal `actorOf` methods defined on `ActorSystem` and `ActorContext` to create Actors instead.",
since = "2.5.0")
object ActorDSL extends dsl.Inbox with dsl.Creators {
protected object Extension extends ExtensionId[Extension] with ExtensionIdProvider {
override def lookup = Extension
override def createExtension(system: ExtendedActorSystem): Extension = new Extension(system)
/**
* Java API: retrieve the ActorDSL extension for the given system.
*/
override def get(system: ActorSystem): Extension = super.get(system)
}
protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension {
private case class MkChild(props: Props, name: String) extends NoSerializationVerificationNeeded
private val boss = system
.systemActorOf(Props(new Actor {
def receive = {
case MkChild(props, name) => sender() ! context.actorOf(props, name)
case any => sender() ! any
}
}), "dsl")
.asInstanceOf[RepointableActorRef]
lazy val config = system.settings.config.getConfig("akka.actor.dsl")
val DSLDefaultTimeout = config.getMillisDuration("default-timeout")
def mkChild(p: Props, name: String): ActorRef =
if (boss.isStarted)
boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true)
else {
implicit val timeout = system.settings.CreationTimeout
Await.result(boss ? MkChild(p, name), timeout.duration).asInstanceOf[ActorRef]
}
}
}
/**
* An Inbox is an actor-like object which is interrogated from the outside.
* It contains an actor whose reference can be passed to other actors as
* usual and it can watch other actors lifecycle.
*/
abstract class Inbox {
/**
* Receive the next message from this Inbox. This call will return immediately
* if the internal actor previously received a message, or it will block for
* up to the specified duration to await reception of a message. If no message
* is received a [[java.util.concurrent.TimeoutException]] will be raised.
*/
@throws(classOf[java.util.concurrent.TimeoutException])
def receive(max: FiniteDuration): Any
/**
* Receive the next message from this Inbox. This call will return immediately
* if the internal actor previously received a message, or it will block for
* up to the specified duration to await reception of a message. If no message
* is received a [[java.util.concurrent.TimeoutException]] will be raised.
*/
@throws(classOf[java.util.concurrent.TimeoutException])
def receive(max: java.time.Duration): Any = receive(max.asScala)
/**
* Have the internal actor watch the target actor. When the target actor
* terminates a [[Terminated]] message will be received.
*/
def watch(target: ActorRef): Unit
/**
* Obtain a reference to the internal actor, which can then for example be
* registered with the event stream or whatever else you may want to do with
* an [[ActorRef]].
*/
def getRef(): ActorRef
/**
* Have the internal actor act as the sender of the given message which will
* be sent to the given target. This means that should the target actor reply
* then those replies will be received by this Inbox.
*/
def send(target: ActorRef, msg: AnyRef): Unit
}
object Inbox {
/**
* Create a new Inbox within the given system.
*/
@silent
def create(system: ActorSystem): Inbox = ActorDSL.inbox()(system)
}

View file

@ -1,213 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.dsl
import akka.actor._
import scala.reflect.ClassTag
trait Creators { this: ActorDSL.type =>
/**
* This trait provides a DSL for writing the inner workings of an actor, e.g.
* for quickly trying things out in the REPL. It makes the following keywords
* available:
*
* - `become` mapped to `context.become(_, discardOld = true)`
*
* - `becomeStacked` mapped to `context.become(_, discardOld = false)`
*
* - `unbecome` mapped to `context.unbecome`
*
* - `setup` for implementing `preStart()`
*
* - `whenFailing` for implementing `preRestart()`
*
* - `whenRestarted` for implementing `postRestart()`
*
* - `teardown` for implementing `postStop`
*
* Using the life-cycle keywords multiple times results in replacing the
* content of the respective hook.
*/
trait Act extends Actor {
private[this] var preStartFun: () => Unit = null
private[this] var postStopFun: () => Unit = null
private[this] var preRestartFun: (Throwable, Option[Any]) => Unit = null
private[this] var postRestartFun: Throwable => Unit = null
private[this] var strategy: SupervisorStrategy = null
/**
* @see [[akka.actor.OneForOneStrategy]]
*/
def OneForOneStrategy = akka.actor.OneForOneStrategy
/**
* @see [[akka.actor.AllForOneStrategy]]
*/
def AllForOneStrategy = akka.actor.AllForOneStrategy
/**
* @see [[akka.actor.SupervisorStrategy]]
*/
def Stop = SupervisorStrategy.Stop
/**
* @see [[akka.actor.SupervisorStrategy]]
*/
def Restart = SupervisorStrategy.Restart
/**
* @see [[akka.actor.SupervisorStrategy]]
*/
def Resume = SupervisorStrategy.Resume
/**
* @see [[akka.actor.SupervisorStrategy]]
*/
def Escalate = SupervisorStrategy.Escalate
/**
* Add the given behavior on top of the behavior stack for this actor. This
* stack is cleared upon restart. Use `unbecome()` to pop an element off
* this stack.
*/
def becomeStacked(r: Receive) = context.become(r, discardOld = false)
/**
* Replace the behavior at the top of the behavior stack for this actor. The
* stack is cleared upon restart. Use `unbecome()` to pop an element off
* this stack or `becomeStacked()` to push a new element on top of it.
*/
def become(r: Receive) = context.become(r, discardOld = true)
/**
* Pop the active behavior from the behavior stack of this actor. This stack
* is cleared upon restart.
*/
def unbecome(): Unit = context.unbecome()
/**
* Set the supervisor strategy of this actor, i.e. how it supervises its children.
*/
def superviseWith(s: SupervisorStrategy): Unit = strategy = s
/**
* Replace the `preStart` action with the supplied thunk. Default action
* is to call `super.preStart()`
*/
def whenStarting(body: => Unit): Unit = preStartFun = () => body
/**
* Replace the `preRestart` action with the supplied function. Default
* action is to call `super.preRestart()`, which will kill all children
* and invoke `postStop()`.
*/
def whenFailing(body: (Throwable, Option[Any]) => Unit): Unit = preRestartFun = body
/**
* Replace the `postRestart` action with the supplied function. Default
* action is to call `super.postRestart` which will call `preStart()`.
*/
def whenRestarted(body: Throwable => Unit): Unit = postRestartFun = body
/**
* Replace the `postStop` action with the supplied thunk. Default action
* is to call `super.postStop`.
*/
def whenStopping(body: => Unit): Unit = postStopFun = () => body
override def preStart(): Unit = if (preStartFun != null) preStartFun() else super.preStart()
override def preRestart(cause: Throwable, msg: Option[Any]): Unit =
if (preRestartFun != null) preRestartFun(cause, msg) else super.preRestart(cause, msg)
override def postRestart(cause: Throwable): Unit =
if (postRestartFun != null) postRestartFun(cause) else super.postRestart(cause)
override def postStop(): Unit = if (postStopFun != null) postStopFun() else super.postStop()
override def supervisorStrategy: SupervisorStrategy = if (strategy != null) strategy else super.supervisorStrategy
/**
* Default behavior of the actor is empty, use `become` to change this.
*/
override def receive: Receive = Actor.emptyBehavior
}
/**
* Use this trait when defining an [[akka.actor.Actor]] with [[akka.actor.Stash]],
* since just using `actor()(new Act with Stash{})` will not be able to see the
* Stash component due to type erasure.
*/
trait ActWithStash extends Act with Stash
private def mkProps(classOfActor: Class[_], ctor: () => Actor): Props =
Props(classOf[TypedCreatorFunctionConsumer], classOfActor, ctor)
/**
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]].
*
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
* factory; <b>do not make the generated object accessible to code
* outside and do not return the same object upon subsequent invocations.</b>
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
* where the latter is always implicitly available within an [[akka.actor.Actor]].
*/
def actor[T <: Actor: ClassTag](ctor: => T)(implicit factory: ActorRefFactory): ActorRef = {
// configure dispatcher/mailbox based on runtime class
val classOfActor = implicitly[ClassTag[T]].runtimeClass
val props = mkProps(classOfActor, () => ctor)
factory.actorOf(props)
}
/**
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]].
*
* @param name is the name, which must be unique within the context of its
* parent.
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
* factory; <b>do not make the generated object accessible to code
* outside and do not return the same object upon subsequent invocations.</b>
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
* where the latter is always implicitly available within an [[akka.actor.Actor]].
*/
def actor[T <: Actor: ClassTag](name: String)(ctor: => T)(implicit factory: ActorRefFactory): ActorRef = {
// configure dispatcher/mailbox based on runtime class
val classOfActor = implicitly[ClassTag[T]].runtimeClass
val props = mkProps(classOfActor, () => ctor)
if (name == null) factory.actorOf(props)
else factory.actorOf(props, name)
}
/**
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]].
*
* @param name is the name, which must be unique within the context of its
* parent; defaults to `null` which will assign a name automatically.
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
* factory; <b>do not make the generated object accessible to code
* outside and do not return the same object upon subsequent invocations.</b>
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
* where the latter is always implicitly available within an [[akka.actor.Actor]].
*/
def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: => T): ActorRef =
actor(name)(ctor)(implicitly[ClassTag[T]], factory)
/**
* Create an actor with an automatically generated name from the given thunk
* which must produce an [[akka.actor.Actor]].
*
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
* factory; <b>do not make the generated object accessible to code
* outside and do not return the same object upon subsequent invocations.</b>
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
* where the latter is always implicitly available within an [[akka.actor.Actor]].
*/
def actor[T <: Actor: ClassTag](factory: ActorRefFactory)(ctor: => T): ActorRef =
actor(null: String)(ctor)(implicitly[ClassTag[T]], factory)
}

View file

@ -1,232 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.dsl
import scala.concurrent.Await
import akka.actor.ActorLogging
import scala.collection.immutable.TreeSet
import scala.concurrent.duration._
import akka.actor.Cancellable
import akka.actor.Actor
import scala.collection.mutable.Queue
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.util.Timeout
import akka.actor.Status
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask
import akka.actor.ActorDSL
import akka.actor.Props
import com.github.ghik.silencer.silent
/**
* INTERNAL API
*/
private[akka] object Inbox {
private sealed trait Query {
def deadline: Deadline
def withClient(c: ActorRef): Query
def client: ActorRef
}
private final case class Get(deadline: Deadline, client: ActorRef = null) extends Query {
def withClient(c: ActorRef) = copy(client = c)
}
private final case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null)
extends Query {
def withClient(c: ActorRef) = copy(client = c)
}
private final case class StartWatch(target: ActorRef)
private case object Kick
}
@silent
trait Inbox { this: ActorDSL.type =>
import Inbox._
protected trait InboxExtension { this: Extension =>
val DSLInboxQueueSize = config.getInt("inbox-size")
val inboxNr = new AtomicInteger
val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize)
def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet)
}
private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] {
def compare(left: Query, right: Query): Int = left.deadline.time.compare(right.deadline.time)
}
private class InboxActor(size: Int) extends Actor with ActorLogging {
var clients = Queue.empty[Query]
val messages = Queue.empty[Any]
var clientsByTimeout = TreeSet.empty[Query]
var printedWarning = false
def enqueueQuery(q: Query): Unit = {
val query = q.withClient(sender())
clients.enqueue(query)
clientsByTimeout += query
}
def enqueueMessage(msg: Any): Unit = {
if (messages.size < size) messages.enqueue(msg)
else {
if (!printedWarning) {
log.warning(
"dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size)
printedWarning = true
}
}
}
var currentMsg: Any = _
val clientPredicate: (Query) => Boolean = {
case _: Get => true
case Select(_, p, _) => p.isDefinedAt(currentMsg)
case _ => false
}
var currentSelect: Select = _
val messagePredicate: (Any => Boolean) = (msg) => currentSelect.predicate.isDefinedAt(msg)
var currentDeadline: Option[(Deadline, Cancellable)] = None
def receive =
({
case g: Get =>
if (messages.isEmpty) enqueueQuery(g)
else sender() ! messages.dequeue()
case s: Select =>
if (messages.isEmpty) enqueueQuery(s)
else {
currentSelect = s
messages.dequeueFirst(messagePredicate) match {
case Some(msg) => sender() ! msg
case None => enqueueQuery(s)
}
currentSelect = null
}
case StartWatch(target) => context.watch(target)
case Kick =>
val now = Deadline.now
val pred = (q: Query) => q.deadline.time < now.time
val overdue = clientsByTimeout.iterator.takeWhile(pred)
while (overdue.hasNext) {
val toKick = overdue.next()
toKick.client ! Status.Failure(new TimeoutException("deadline passed"))
}
clients = clients.filterNot(pred)
clientsByTimeout = clientsByTimeout.from(Get(now))
case msg =>
if (clients.isEmpty) enqueueMessage(msg)
else {
currentMsg = msg
clients.dequeueFirst(clientPredicate) match {
case Some(q) => { clientsByTimeout -= q; q.client ! msg }
case None => enqueueMessage(msg)
}
currentMsg = null
}
}: Receive).andThen { _ =>
if (clients.isEmpty) {
if (currentDeadline.isDefined) {
currentDeadline.get._2.cancel()
currentDeadline = None
}
} else {
val next = clientsByTimeout.head.deadline
import context.dispatcher
if (currentDeadline.isEmpty) {
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
} else {
// must not rely on the Scheduler to not fire early (for robustness)
currentDeadline.get._2.cancel()
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
}
}
}
}
/*
* make sure that AskTimeout does not accidentally mess up message reception
* by adding this extra time to the real timeout
*/
private val extraTime = 1.minute
/**
* Create a new actor which will internally queue up messages it gets so that
* they can be interrogated with the `akka.actor.dsl.Inbox!.Inbox!.receive`
* and `akka.actor.dsl.Inbox!.Inbox!.select` methods. It will be created as
* a system actor in the ActorSystem which is implicitly (or explicitly)
* supplied.
*/
def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system)
class Inbox(system: ActorSystem) extends akka.actor.Inbox {
val receiver: ActorRef = Extension(system).newReceiver
// Java API
def getRef: ActorRef = receiver
def send(target: ActorRef, msg: AnyRef): Unit = target.tell(msg, receiver)
private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout
/**
* Receive a single message from the internal `receiver` actor. The supplied
* timeout is used for cleanup purposes and its precision is subject to the
* resolution of the systems scheduler (usually 100ms, but configurable).
*
* <b>Warning:</b> This method blocks the current thread until a message is
* received, thus it can introduce dead-locks (directly as well as
* indirectly by causing starvation of the thread pool). <b>Do not use
* this method within an actor!</b>
*/
def receive(timeout: FiniteDuration = defaultTimeout): Any = {
implicit val t = Timeout(timeout + extraTime)
Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf)
}
/**
* Receive a single message for which the given partial function is defined
* and return the transformed result, using the internal `receiver` actor.
* The supplied timeout is used for cleanup purposes and its precision is
* subject to the resolution of the systems scheduler (usually 100ms, but
* configurable).
*
* <b>Warning:</b> This method blocks the current thread until a message is
* received, thus it can introduce dead-locks (directly as well as
* indirectly by causing starvation of the thread pool). <b>Do not use
* this method within an actor!</b>
*/
def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = {
implicit val t = Timeout(timeout + extraTime)
predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf))
}
/**
* Make the inboxs actor watch the target actor such that reception of the
* Terminated message can then be awaited.
*/
def watch(target: ActorRef): Unit = receiver ! StartWatch(target)
/**
* Overridden finalizer which will try to stop the actor once this Inbox
* is no longer referenced.
*/
override def finalize(): Unit = {
system.stop(receiver)
}
}
implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver
}

View file

@ -1,565 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.circuitbreaker
import akka.actor.{ ActorRef, PoisonPill }
import akka.contrib.circuitbreaker.CircuitBreakerProxy._
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.Timeout
import org.scalatest.GivenWhenThen
import scala.concurrent.duration._
import scala.language.postfixOps
class CircuitBreakerProxySpec extends AkkaSpec() with GivenWhenThen {
val baseCircuitBreakerPropsBuilder =
CircuitBreakerPropsBuilder(maxFailures = 2, callTimeout = 200 millis, resetTimeout = 1 second, failureDetector = {
_ == "FAILURE"
})
trait CircuitBreakerScenario {
val sender = TestProbe()
val eventListener = TestProbe()
val receiver = TestProbe()
def circuitBreaker: ActorRef
def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
def receiverRespondsWithFailureToRequest(request: Any) = {
sender.send(circuitBreaker, request)
receiver.expectMsg(request)
receiver.reply("FAILURE")
sender.expectMsg("FAILURE")
}
def receiverRespondsToRequestWith(request: Any, reply: Any) = {
sender.send(circuitBreaker, request)
receiver.expectMsg(request)
receiver.reply(reply)
sender.expectMsg(reply)
}
def circuitBreakerReceivesSelfNotificationMessage() =
receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration / 4)
def resetTimeoutExpires() =
receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration + 100.millis)
def callTimeoutExpiresWithoutResponse() =
sender.expectNoMsg(baseCircuitBreakerPropsBuilder.callTimeout.duration + 100.millis)
def messageIsRejectedWithOpenCircuitNotification(message: Any) = {
sender.send(circuitBreaker, message)
sender.expectMsg(CircuitOpenFailure(message))
}
}
"CircuitBreakerActor" should {
"act as a transparent proxy in case of successful requests-replies - forward to target" in {
Given("A circuit breaker proxy pointing to a target actor")
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
When("A message is sent to the proxy actor")
TestProbe().send(circuitBreaker, "test message")
Then("The target actor receives the message")
receiver.expectMsg("test message")
}
"act as a transparent proxy in case of successful requests-replies - full cycle" in {
Given("A circuit breaker proxy pointing to a target actor")
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
When("A sender sends a message to the target actor via the proxy actor")
val sender = TestProbe()
sender.send(circuitBreaker, "test message")
receiver.expectMsg("test message")
And("The target actor replies to the message")
receiver.reply("response")
Then("The reply is sent to the sender")
sender.expectMsg("response")
}
"forward further messages before receiving the response of the first one" in {
Given("A circuit breaker proxy pointing to a target actor")
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
When("A batch of messages is sent to the target actor via the proxy")
val sender = TestProbe()
sender.send(circuitBreaker, "test message1")
sender.send(circuitBreaker, "test message2")
sender.send(circuitBreaker, "test message3")
And("The receiver doesn't reply to any of those messages")
Then("All the messages in the batch are sent")
receiver.expectMsg("test message1")
receiver.expectMsg("test message2")
receiver.expectMsg("test message3")
}
"send responses to the right sender" in {
Given("A circuit breaker proxy pointing to a target actor")
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
And("Two different senders actors")
val sender1 = TestProbe()
val sender2 = TestProbe()
When("The two actors are sending messages to the target actor through the proxy")
sender1.send(circuitBreaker, "test message1")
sender2.send(circuitBreaker, "test message2")
And("The target actor replies to those messages")
receiver.expectMsg("test message1")
receiver.reply("response1")
receiver.expectMsg("test message2")
receiver.reply("response2")
Then("The replies are forwarded to the correct sender")
sender1.expectMsg("response1")
sender2.expectMsg("response2")
}
"return failed responses too" in {
Given("A circuit breaker proxy pointing to a target actor")
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
When("A sender sends a request to the target actor through the proxy")
val sender = TestProbe()
sender.send(circuitBreaker, "request")
And("The target actor replies with a failure response")
receiver.expectMsg("request")
receiver.reply("FAILURE")
Then("The failure response is returned ")
sender.expectMsg("FAILURE")
}
"enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario {
Given("A circuit breaker proxy pointing to a target actor")
val circuitBreaker = defaultCircuitBreaker
When("A number of consecutive request equal to the maxFailures configuration of the circuit breaker is failing")
(1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index =>
receiverRespondsWithFailureToRequest(s"request$index")
}
circuitBreakerReceivesSelfNotificationMessage()
Then("The circuit is in Open stage: If a further message is sent it is not forwarded")
sender.send(circuitBreaker, "request in open state")
receiver.expectNoMsg
}
"respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario {
Given("A circuit breaker proxy pointing to a target actor")
val circuitBreaker = defaultCircuitBreaker
When("A number of consecutive request equal to the maxFailures configuration of the circuit breaker is failing")
(1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index =>
receiverRespondsWithFailureToRequest(s"request$index")
}
circuitBreakerReceivesSelfNotificationMessage()
Then("The circuit is in Open stage: any further request is replied-to with a CircuitOpenFailure response")
sender.send(circuitBreaker, "request in open state")
sender.expectMsg(CircuitOpenFailure("request in open state"))
}
"respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario {
Given(
"A circuit breaker proxy pointing to a target actor built with a function to convert CircuitOpenFailure response into a String response")
val circuitBreaker = system.actorOf(
baseCircuitBreakerPropsBuilder
.copy(openCircuitFailureConverter = { failureMsg =>
s"NOT SENT: ${failureMsg.failedMsg}"
})
.props(receiver.ref))
When("A number of consecutive request equal to the maxFailures configuration of the circuit breaker is failing")
(1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index =>
receiverRespondsWithFailureToRequest(s"request$index")
}
circuitBreakerReceivesSelfNotificationMessage()
Then("Any further request receives instead of the CircuitOpenFailure response the converted one")
sender.send(circuitBreaker, "request in open state")
sender.expectMsg("NOT SENT: request in open state")
}
"enter open state after reaching the threshold of timed-out responses" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When(
"A number of request equal to the timed-out responses threashold is done without receiving response within the configured timeout")
sender.send(circuitBreaker, "request1")
sender.send(circuitBreaker, "request2")
callTimeoutExpiresWithoutResponse()
receiver.expectMsg("request1")
receiver.reply("this should be timed out 1")
receiver.expectMsg("request2")
receiver.reply("this should be timed out 2")
circuitBreakerReceivesSelfNotificationMessage()
Then("The circuit is in Open stage: any further request is replied-to with a CircuitOpenFailure response")
sender.send(circuitBreaker, "request in open state")
receiver.expectNoMsg
}
"enter HALF OPEN state after the given state timeout, sending the first message only" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("ENTERING OPEN STATE")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
circuitBreakerReceivesSelfNotificationMessage()
Then("Messages are ignored")
messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1")
messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2")
When("ENTERING HALF OPEN STATE")
resetTimeoutExpires()
Then("First message should be forwarded, following ones ignored if the failure persist")
sender.send(circuitBreaker, "First message in half-open state, should be forwarded")
sender.send(circuitBreaker, "Second message in half-open state, should be ignored")
receiver.expectMsg("First message in half-open state, should be forwarded")
receiver.expectNoMsg()
sender.expectMsg(CircuitOpenFailure("Second message in half-open state, should be ignored"))
}
"return to CLOSED state from HALF-OPEN if a successful message response notification is received" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Entering HALF OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
resetTimeoutExpires()
And("Receiving a successful response")
receiverRespondsToRequestWith(
"First message in half-open state, should be forwarded",
"This should close the circuit")
circuitBreakerReceivesSelfNotificationMessage()
Then("circuit is re-closed")
sender.send(circuitBreaker, "request1")
receiver.expectMsg("request1")
sender.send(circuitBreaker, "request2")
receiver.expectMsg("request2")
}
"return to OPEN state from HALF-OPEN if a FAILURE message response is received" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Entering HALF OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
resetTimeoutExpires()
And("Receiving a failure response")
receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded")
circuitBreakerReceivesSelfNotificationMessage()
Then("circuit is opened again")
sender.send(circuitBreaker, "this should be ignored")
receiver.expectNoMsg()
sender.expectMsg(CircuitOpenFailure("this should be ignored"))
}
"notify an event status change listener when changing state" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
override val circuitBreaker = system.actorOf(
baseCircuitBreakerPropsBuilder
.copy(circuitEventListener = Some(eventListener.ref))
.props(target = receiver.ref))
When("Entering OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
circuitBreakerReceivesSelfNotificationMessage()
Then("An event is sent")
eventListener.expectMsg(CircuitOpen(circuitBreaker))
When("Entering HALF OPEN state")
resetTimeoutExpires()
Then("An event is sent")
eventListener.expectMsg(CircuitHalfOpen(circuitBreaker))
When("Entering CLOSED state")
receiverRespondsToRequestWith(
"First message in half-open state, should be forwarded",
"This should close the circuit")
Then("An event is sent")
eventListener.expectMsg(CircuitClosed(circuitBreaker))
}
"stop if the target actor terminates itself" in new CircuitBreakerScenario {
Given("An actor that will terminate when receiving a message")
import akka.actor.ActorDSL._
val suicidalActor = actor(new Act {
become {
case anyMessage =>
sender() ! "dying now"
context.stop(self)
}
})
And("A circuit breaker actor proxying another actor")
val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = suicidalActor))
val suicidalActorWatch = TestProbe()
suicidalActorWatch.watch(suicidalActor)
val circuitBreakerWatch = TestProbe()
circuitBreakerWatch.watch(circuitBreaker)
When("The target actor stops")
sender.send(circuitBreaker, "this message will kill the target")
sender.expectMsg("dying now")
suicidalActorWatch.expectTerminated(suicidalActor)
Then("The circuit breaker proxy actor is terminated too")
circuitBreakerWatch.expectTerminated(circuitBreaker)
}
"stop if the target actor is stopped" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
val receiverActorWatch = TestProbe()
receiverActorWatch.watch(receiver.ref)
val circuitBreakerWatch = TestProbe()
circuitBreakerWatch.watch(circuitBreaker)
When("The target actor stops")
sender.send(circuitBreaker, Passthrough(PoisonPill))
receiverActorWatch.expectTerminated(receiver.ref)
Then("The circuit breaker proxy actor is terminated too")
circuitBreakerWatch.expectTerminated(circuitBreaker)
}
"send a any message enveloped into a TellOnly case class without expecting a response in closed state" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When(
"A number of request equal to the timed-out responses wrapped in a TellOnly threashold is done without receiving response within the configured timeout")
sender.send(circuitBreaker, TellOnly("Fire and forget 1"))
sender.send(circuitBreaker, TellOnly("Fire and forget 2"))
receiver.expectMsg("Fire and forget 1")
receiver.expectMsg("Fire and forget 2")
And("No response is received")
callTimeoutExpiresWithoutResponse()
Then("The circuit is still closed")
sender.send(circuitBreaker, "This should be received too")
receiver.expectMsg("This should be received too")
}
"block messages wrapped in TellOnly when in open state" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Circuit enters OPEN state")
(1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index =>
receiverRespondsWithFailureToRequest(s"request$index")
}
circuitBreakerReceivesSelfNotificationMessage()
Then("A TellOnly wrapped message is not sent")
sender.send(circuitBreaker, TellOnly("This should NOT be received"))
receiver.expectNoMsg()
}
"send a any message enveloped into a Passthrough case class without expecting a response even in closed state" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Circuit enters OPEN state")
(1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index =>
receiverRespondsWithFailureToRequest(s"request$index")
}
circuitBreakerReceivesSelfNotificationMessage()
Then("A Passthrough wrapped message is sent")
sender.send(circuitBreaker, Passthrough("This should be received"))
receiver.expectMsg("This should be received")
And("The circuit is still closed for ordinary messages")
sender.send(circuitBreaker, "This should NOT be received")
receiver.expectNoMsg()
}
}
"Ask Extension" should {
import Implicits.askWithCircuitBreaker
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout: Timeout = 2.seconds
"work as a ASK pattern if circuit is closed" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Doing a askWithCircuitBreaker request")
val responseFuture = circuitBreaker.askWithCircuitBreaker("request")
Then("The message is sent to the target actor")
receiver.expectMsg("request")
When("Then target actor replies")
receiver.reply("response")
Then("The response is available as result of the future returned by the askWithCircuitBreaker method")
whenReady(responseFuture) { response =>
response should be("response")
}
}
"transform the response into a failure with CircuitOpenException cause if circuit is open" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("The circuit breaker proxy enters OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
circuitBreakerReceivesSelfNotificationMessage()
And("Doing a askWithCircuitBreaker request")
val responseFuture = circuitBreaker.askWithCircuitBreaker("request")
Then("The message is NOT sent to the target actor")
receiver.expectNoMsg()
And("The response is converted into a failure")
whenReady(responseFuture.failed) { failure =>
failure shouldBe a[OpenCircuitException]
}
}
}
"Future Extension" should {
import Implicits.futureExtensions
import akka.pattern.ask
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout: Timeout = 2.seconds
"work as a ASK pattern if circuit is closed" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Doing a askWithCircuitBreaker request")
val responseFuture = (circuitBreaker ? "request").failForOpenCircuit
Then("The message is sent to the target actor")
receiver.expectMsg("request")
When("Then target actor replies")
receiver.reply("response")
Then("The response is available as result of the future returned by the askWithCircuitBreaker method")
whenReady(responseFuture) { response =>
response should be("response")
}
}
"transform the response into a failure with CircuitOpenException cause if circuit is open" in new CircuitBreakerScenario {
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("The circuit breaker proxy enters OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
circuitBreakerReceivesSelfNotificationMessage()
And("Doing a askWithCircuitBreaker request")
val responseFuture = (circuitBreaker ? "request").failForOpenCircuit
Then("The message is NOT sent to the target actor")
receiver.expectNoMsg()
And("The response is converted into a failure")
whenReady(responseFuture.failed) { failure =>
failure shouldBe a[OpenCircuitException]
}
}
"transform the response into a failure with the given exception as cause if circuit is open" in new CircuitBreakerScenario {
class MyException(message: String) extends Exception(message)
Given("A circuit breaker actor proxying a test probe")
val circuitBreaker = defaultCircuitBreaker
When("The circuit breaker proxy enters OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
circuitBreakerReceivesSelfNotificationMessage()
And("Doing a askWithCircuitBreaker request")
val responseFuture = (circuitBreaker ? "request").failForOpenCircuitWith(new MyException("Circuit is open"))
Then("The message is NOT sent to the target actor")
receiver.expectNoMsg()
And("The response is converted into a failure")
whenReady(responseFuture.failed) { failure =>
failure shouldBe a[MyException]
failure.getMessage() should be("Circuit is open")
}
}
}
}

View file

@ -292,42 +292,6 @@ are described in more depth in the
[Using Akka with Dependency Injection](http://letitcrash.com/post/55958814293/akka-dependency-injection)
guideline and the [Akka Java Spring](https://github.com/typesafehub/activator-akka-java-spring) tutorial.
### The Inbox
When writing code outside of actors which shall communicate with actors, the
`ask` pattern can be a solution (see below), but there are two things it
cannot do: receiving multiple replies (e.g. by subscribing an `ActorRef`
to a notification service) and watching other actors lifecycle. For these
purposes there is the `Inbox` class:
Scala
: @@snip [ActorDSLSpec.scala](/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala) { #inbox }
Java
: @@snip [InboxDocTest.java](/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java) { #inbox }
@@@ div { .group-scala }
There is an implicit conversion from inbox to actor reference which means that
in this example the sender reference will be that of the actor hidden away
within the inbox. This allows the reply to be received on the last line.
Watching an actor is quite simple as well:
@@snip [ActorDSLSpec.scala](/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala) { #watch }
@@@
@@@ div { .group-java }
The `send` method wraps a normal `tell` and supplies the internal
actors reference as the sender. This allows the reply to be received on the
last line. Watching an actor is quite simple as well:
@@snip [InboxDocTest.java](/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java) { #watch }
@@@
## Actor API
@scala[The `Actor` trait defines only one abstract method, the above mentioned

View file

@ -3,3 +3,9 @@
## Scala 2.11 no longer supported
If you are still using Scala 2.11 then you must upgrade to 2.12 or 2.13
### Actor DSL removal
Actor DSL is a rarely used feature and has been deprecated since `2.5.0`.
Use plain `system.actorOf` instead of the DSL to create Actors if you have been using it.

View file

@ -1,64 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.actor;
import akka.testkit.AkkaJUnitActorSystemResource;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule;
import org.junit.Test;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Inbox;
import akka.actor.PoisonPill;
import akka.actor.Terminated;
import akka.testkit.AkkaSpec;
import java.time.Duration;
public class InboxDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("InboxDocTest", AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
@Test
public void demonstrateInbox() {
final TestKit probe = new TestKit(system);
final ActorRef target = probe.getRef();
// #inbox
final Inbox inbox = Inbox.create(system);
inbox.send(target, "hello");
// #inbox
probe.expectMsgEquals("hello");
probe.send(probe.getLastSender(), "world");
// #inbox
try {
assert inbox.receive(Duration.ofSeconds(1)).equals("world");
} catch (java.util.concurrent.TimeoutException e) {
// timeout
}
// #inbox
}
@Test
public void demonstrateWatch() {
final TestKit probe = new TestKit(system);
final ActorRef target = probe.getRef();
// #watch
final Inbox inbox = Inbox.create(system);
inbox.watch(target);
target.tell(PoisonPill.getInstance(), ActorRef.noSender());
try {
assert inbox.receive(Duration.ofSeconds(1)) instanceof Terminated;
} catch (java.util.concurrent.TimeoutException e) {
// timeout
}
// #watch
}
}

View file

@ -724,14 +724,6 @@ class ActorDocSpec extends AkkaSpec("""
lastSender.path.toStringWithoutAddress should be("/user")
}
"using ActorDSL outside of akka.actor package" in {
import akka.actor.ActorDSL._
actor(new Act {
superviseWith(OneForOneStrategy() { case _ => Stop; Restart; Resume; Escalate })
superviseWith(AllForOneStrategy() { case _ => Stop; Restart; Resume; Escalate })
})
}
"using CoordinatedShutdown" in {
val someActor = system.actorOf(Props(classOf[Replier], this))
//#coordinated-shutdown-addTask

View file

@ -6,34 +6,24 @@ package docs.io
import java.net.InetSocketAddress
import scala.concurrent.duration.DurationInt
import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorDSL, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy }
import akka.actor.ActorDSL.inbox
import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy }
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import scala.io.StdIn
object EchoServer extends App {
val config = ConfigFactory.parseString("akka.loglevel = DEBUG")
implicit val system = ActorSystem("EchoServer", config)
// make sure to stop the system so that the application stops
try run()
finally system.terminate()
def run(): Unit = {
import ActorDSL._
// create two EchoManager and stop the application once one dies
val watcher = inbox()
watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo"))
watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple"))
watcher.receive(10.minutes)
}
system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo")
system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple")
println("Press enter to exit...")
StdIn.readLine()
system.terminate()
}
class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging {

View file

@ -133,7 +133,6 @@ object AkkaBuild {
initialCommands :=
"""|import language.postfixOps
|import akka.actor._
|import ActorDSL._
|import scala.concurrent._
|import com.typesafe.config.ConfigFactory
|import scala.concurrent.duration._