From 39b344c50871a8013808ee41159d1aaf51f3f315 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 25 Apr 2019 14:53:28 +0100 Subject: [PATCH] Remove actor dsl (#26784) * Removals of actor dsl * Mima for actor dsl removal * Remove inbox doc test * Keep main in echo server example --- .../java/akka/actor/InboxJavaAPITest.java | 28 - .../test/scala/akka/actor/ActorDSLSpec.scala | 272 --------- .../scala/akka/actor/ActorWithStashSpec.scala | 31 - .../mima-filters/2.5.x.backwards.excludes | 6 + akka-actor/src/main/resources/reference.conf | 10 - .../src/main/scala/akka/actor/ActorDSL.scala | 171 ------ .../main/scala/akka/actor/dsl/Creators.scala | 213 ------- .../src/main/scala/akka/actor/dsl/Inbox.scala | 232 ------- .../CircuitBreakerProxySpec.scala | 565 ------------------ akka-docs/src/main/paradox/actors.md | 36 -- .../project/migration-guide-2.5.x-2.6.x.md | 6 + .../test/java/jdocs/actor/InboxDocTest.java | 64 -- .../test/scala/docs/actor/ActorDocSpec.scala | 8 - .../src/test/scala/docs/io/EchoServer.scala | 26 +- project/AkkaBuild.scala | 1 - 15 files changed, 20 insertions(+), 1649 deletions(-) delete mode 100644 akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java delete mode 100644 akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala create mode 100644 akka-actor/src/main/mima-filters/2.5.x.backwards.excludes delete mode 100644 akka-actor/src/main/scala/akka/actor/ActorDSL.scala delete mode 100644 akka-actor/src/main/scala/akka/actor/dsl/Creators.scala delete mode 100644 akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerProxySpec.scala delete mode 100644 akka-docs/src/test/java/jdocs/actor/InboxDocTest.java diff --git a/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java b/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java deleted file mode 100644 index 023caadf97..0000000000 --- a/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.actor; - -import java.util.concurrent.TimeoutException; -import java.time.Duration; -import org.junit.ClassRule; -import org.junit.Test; -import akka.testkit.AkkaJUnitActorSystemResource; -import akka.testkit.AkkaSpec; -import org.scalatest.junit.JUnitSuite; - -public class InboxJavaAPITest extends JUnitSuite { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("InboxJavaAPITest", AkkaSpec.testConf()); - - private final ActorSystem system = actorSystemResource.getSystem(); - - @Test(expected = TimeoutException.class) - public void mustBeAbleToThrowTimeoutException() throws TimeoutException { - Inbox inbox = Inbox.create(system); - inbox.receive(Duration.ofMillis(10)); - } -} diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala deleted file mode 100644 index e7e7218521..0000000000 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.actor - -import language.postfixOps -import akka.testkit.{ AkkaSpec, EventFilter } -import akka.actor.ActorDSL._ -import akka.event.Logging.Warning - -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ -import java.util.concurrent.TimeoutException - -import akka.testkit.TimingTest -import com.github.ghik.silencer.silent - -class ActorDSLDummy { - //#import - import akka.actor.ActorSystem - - implicit val system = ActorSystem("demo") - //#import -} - -@silent -class ActorDSLSpec extends AkkaSpec { - - val echo = system.actorOf(Props(new Actor { - def receive = { - case x => sender() ! x - } - })) - - "An Inbox" must { - - "function as implicit sender" in { - //#inbox - import akka.actor.ActorDSL._ - - implicit val i = inbox() - echo ! "hello" - i.receive() should ===("hello") - //#inbox - } - - "support watch" in { - //#watch - val target = // some actor - //#watch - actor(new Act {}) - //#watch - val i = inbox() - i.watch(target) - //#watch - target ! PoisonPill - i.receive(1.second) should ===(Terminated(target)(true, false)) - } - - "support queueing multiple queries" in { - val i = inbox() - import system.dispatcher - val res = - Future.sequence(Seq(Future { i.receive() }.recover { case x => x }, Future { - Thread.sleep(100); i.select() { case "world" => 1 } - }.recover { case x => x }, Future { Thread.sleep(200); i.select() { case "hello" => 2 } }.recover { - case x => x - })) - Thread.sleep(1000) - res.isCompleted should ===(false) - i.receiver ! 42 - i.receiver ! "hello" - i.receiver ! "world" - Await.result(res, 5 second) should ===(Seq(42, 1, 2)) - } - - "support selective receives" in { - val i = inbox() - i.receiver ! "hello" - i.receiver ! "world" - val result = i.select() { - case "world" => true - } - result should ===(true) - i.receive() should ===("hello") - } - - "have a maximum queue size" taggedAs TimingTest in { - val i = inbox() - system.eventStream.subscribe(testActor, classOf[Warning]) - try { - for (_ <- 1 to 1000) i.receiver ! 0 - expectNoMessage(1 second) - EventFilter.warning(start = "dropping message", occurrences = 1).intercept { - i.receiver ! 42 - } - expectMsgType[Warning] - i.receiver ! 42 - expectNoMessage(1 second) - val gotit = for (_ <- 1 to 1000) yield i.receive() - gotit should ===((1 to 1000).map(_ => 0)) - intercept[TimeoutException] { - i.receive(1 second) - } - } finally { - system.eventStream.unsubscribe(testActor, classOf[Warning]) - } - } - - "have a default and custom timeouts" taggedAs TimingTest in { - val i = inbox() - within(5 seconds, 6 seconds) { - intercept[TimeoutException](i.receive()) - } - within(1 second) { - intercept[TimeoutException](i.receive(100 millis)) - } - } - - } - - "A lightweight creator" must { - - "support creating regular actors" in { - //#simple-actor - val a = actor(new Act { - become { - case "hello" => sender() ! "hi" - } - }) - //#simple-actor - - implicit val i = inbox() - a ! "hello" - i.receive() should ===("hi") - } - - "support becomeStacked" in { - //#becomeStacked - val a = actor(new Act { - become { // this will replace the initial (empty) behavior - case "info" => sender() ! "A" - case "switch" => - becomeStacked { // this will stack upon the "A" behavior - case "info" => sender() ! "B" - case "switch" => unbecome() // return to the "A" behavior - } - case "lobotomize" => unbecome() // OH NOES: Actor.emptyBehavior - } - }) - //#becomeStacked - - implicit val sender = testActor - a ! "info" - expectMsg("A") - a ! "switch" - a ! "info" - expectMsg("B") - a ! "switch" - a ! "info" - expectMsg("A") - } - - "support setup/teardown" in { - //#simple-start-stop - val a = actor(new Act { - whenStarting { testActor ! "started" } - whenStopping { testActor ! "stopped" } - }) - //#simple-start-stop - - system.stop(a) - expectMsg("started") - expectMsg("stopped") - } - - "support restart" in { - //#failing-actor - val a = actor(new Act { - become { - case "die" => throw new Exception - } - whenFailing { case m @ (_, _) => testActor ! m } - whenRestarted { cause => - testActor ! cause - } - }) - //#failing-actor - - EventFilter[Exception](occurrences = 1).intercept { - a ! "die" - } - expectMsgPF() { case (_: Exception, Some("die")) => } - expectMsgPF() { case _: Exception => } - } - - "support superviseWith" in { - val a = actor(new Act { - val system = null // shadow the implicit system - //#supervise-with - superviseWith(OneForOneStrategy() { - case e: Exception if e.getMessage == "hello" => Stop - case _: Exception => Resume - }) - //#supervise-with - val child = actor("child")(new Act { - whenFailing { (_, _) => - } - become { - case ref: ActorRef => whenStopping(ref ! "stopped") - case ex: Exception => throw ex - } - }) - become { - case x => child ! x - } - }) - a ! testActor - EventFilter.warning("hi", occurrences = 1).intercept { - a ! new Exception("hi") - } - expectNoMessage(1 second) - EventFilter[Exception]("hello", occurrences = 1).intercept { - a ! new Exception("hello") - } - expectMsg("stopped") - } - - "supported nested declaration" in { - val system = this.system - //#nested-actor - // here we pass in the ActorRefFactory explicitly as an example - val a = actor(system, "fred")(new Act { - val b = actor("barney")(new Act { - whenStarting { context.parent ! ("hello from " + self.path) } - }) - become { - case x => testActor ! x - } - }) - //#nested-actor - expectMsg("hello from akka://ActorDSLSpec/user/fred/barney") - lastSender should ===(a) - } - - "support Stash" in { - //#act-with-stash - val a = actor(new ActWithStash { - become { - case 1 => stash() - case 2 => - testActor ! 2; unstashAll() - becomeStacked { - case 1 => testActor ! 1; unbecome() - } - } - }) - //#act-with-stash - - a ! 1 - a ! 2 - expectMsg(2) - expectMsg(1) - a ! 1 - a ! 2 - expectMsg(2) - expectMsg(1) - } - - } -} diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala index 02260c5e60..412ffd042c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala @@ -187,35 +187,4 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa expectMsg("terminated") } } - - "An ActWithStash" must { - - "allow using whenRestarted" in { - import ActorDSL._ - val a = actor(new ActWithStash { - become { - case "die" => throw new RuntimeException("dying") - } - whenRestarted { _ => - testActor ! "restarted" - } - }) - EventFilter[RuntimeException]("dying", occurrences = 1).intercept { - a ! "die" - } - expectMsg("restarted") - } - - "allow using whenStopping" in { - import ActorDSL._ - val a = actor(new ActWithStash { - whenStopping { - testActor ! "stopping" - } - }) - a ! PoisonPill - expectMsg("stopping") - } - - } } diff --git a/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes new file mode 100644 index 0000000000..27f84ce4bb --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes @@ -0,0 +1,6 @@ +ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox$") +ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox") +ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$") +ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL") +ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$*") +ProblemFilters.exclude[MissingClassProblem]("akka.actor.dsl.*") diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 45f83b9f59..b7cf98381c 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -749,16 +749,6 @@ akka { "akka.serialization.JavaSerializer" = 1 "akka.serialization.ByteArraySerializer" = 4 } - - # Configuration items which are used by the akka.actor.ActorDSL._ methods - dsl { - # Maximum queue size of the actor created by newInbox(); this protects - # against faulty programs which use select() and consistently miss messages - inbox-size = 1000 - - # Default timeout to assume for operations like Inbox.receive et al - default-timeout = 5s - } } # Used to set the behavior of the scheduler. diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala deleted file mode 100644 index 787d240808..0000000000 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.actor - -import scala.concurrent.duration._ -import akka.pattern.ask - -import scala.concurrent.Await -import akka.util.Helpers.ConfigOps -import akka.util.JavaDurationConverters._ -import com.github.ghik.silencer.silent - -/** - * This object contains elements which make writing actors and related code - * more concise, e.g. when trying out actors in the REPL. - * - * For the communication of non-actor code with actors, you may use anonymous - * actors tailored to this job: - * - * {{{ - * import ActorDSL._ - * import scala.concurrent.util.duration._ - * - * implicit val system: ActorSystem = ... - * - * implicit val i = inbox() - * someActor ! someMsg // replies will go to `i` - * - * val reply = i.receive() - * val transformedReply = i.select(5 seconds) { - * case x: Int => 2 * x - * } - * }}} - * - * The `receive` and `select` methods are synchronous, i.e. they block the - * calling thread until an answer from the actor is received or the timeout - * expires. The default timeout is taken from configuration item - * `akka.actor.dsl.default-timeout`. - * - * When defining actors in the REPL, say, you may want to have a look at the - * `Act` trait: - * - * {{{ - * import ActorDSL._ - * - * val system: ActorSystem = ... - * - * val a = actor(system, "fred")(new Act { - * val b = actor("barney")(new Act { - * ... - * }) - * - * become { - * case msg => ... - * } - * }) - * }}} - * - * Note that `actor` can be used with an implicit [[akka.actor.ActorRefFactory]] - * as shown with `"barney"` (where the [[akka.actor.ActorContext]] serves this - * purpose), but since nested declarations share the same - * lexical context `"fred"`’s ActorContext would be ambiguous - * if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also - * be circumvented by shadowing the name `system` within `"fred"`). - * - * Note: If you want to use an `Act with Stash`, you should use the - * `ActWithStash` trait in order to have the actor get the necessary deque-based - * mailbox setting. - * - * @deprecated Use the normal `actorOf` methods defined on `ActorSystem` and `ActorContext` to create Actors instead. - */ -@deprecated( - "deprecated Use the normal `actorOf` methods defined on `ActorSystem` and `ActorContext` to create Actors instead.", - since = "2.5.0") -object ActorDSL extends dsl.Inbox with dsl.Creators { - - protected object Extension extends ExtensionId[Extension] with ExtensionIdProvider { - - override def lookup = Extension - - override def createExtension(system: ExtendedActorSystem): Extension = new Extension(system) - - /** - * Java API: retrieve the ActorDSL extension for the given system. - */ - override def get(system: ActorSystem): Extension = super.get(system) - } - - protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension { - - private case class MkChild(props: Props, name: String) extends NoSerializationVerificationNeeded - private val boss = system - .systemActorOf(Props(new Actor { - def receive = { - case MkChild(props, name) => sender() ! context.actorOf(props, name) - case any => sender() ! any - } - }), "dsl") - .asInstanceOf[RepointableActorRef] - - lazy val config = system.settings.config.getConfig("akka.actor.dsl") - - val DSLDefaultTimeout = config.getMillisDuration("default-timeout") - - def mkChild(p: Props, name: String): ActorRef = - if (boss.isStarted) - boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true) - else { - implicit val timeout = system.settings.CreationTimeout - Await.result(boss ? MkChild(p, name), timeout.duration).asInstanceOf[ActorRef] - } - } - -} - -/** - * An Inbox is an actor-like object which is interrogated from the outside. - * It contains an actor whose reference can be passed to other actors as - * usual and it can watch other actors’ lifecycle. - */ -abstract class Inbox { - - /** - * Receive the next message from this Inbox. This call will return immediately - * if the internal actor previously received a message, or it will block for - * up to the specified duration to await reception of a message. If no message - * is received a [[java.util.concurrent.TimeoutException]] will be raised. - */ - @throws(classOf[java.util.concurrent.TimeoutException]) - def receive(max: FiniteDuration): Any - - /** - * Receive the next message from this Inbox. This call will return immediately - * if the internal actor previously received a message, or it will block for - * up to the specified duration to await reception of a message. If no message - * is received a [[java.util.concurrent.TimeoutException]] will be raised. - */ - @throws(classOf[java.util.concurrent.TimeoutException]) - def receive(max: java.time.Duration): Any = receive(max.asScala) - - /** - * Have the internal actor watch the target actor. When the target actor - * terminates a [[Terminated]] message will be received. - */ - def watch(target: ActorRef): Unit - - /** - * Obtain a reference to the internal actor, which can then for example be - * registered with the event stream or whatever else you may want to do with - * an [[ActorRef]]. - */ - def getRef(): ActorRef - - /** - * Have the internal actor act as the sender of the given message which will - * be sent to the given target. This means that should the target actor reply - * then those replies will be received by this Inbox. - */ - def send(target: ActorRef, msg: AnyRef): Unit -} - -object Inbox { - - /** - * Create a new Inbox within the given system. - */ - @silent - def create(system: ActorSystem): Inbox = ActorDSL.inbox()(system) -} diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala deleted file mode 100644 index 2f4d95d308..0000000000 --- a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.actor.dsl - -import akka.actor._ -import scala.reflect.ClassTag - -trait Creators { this: ActorDSL.type => - - /** - * This trait provides a DSL for writing the inner workings of an actor, e.g. - * for quickly trying things out in the REPL. It makes the following keywords - * available: - * - * - `become` mapped to `context.become(_, discardOld = true)` - * - * - `becomeStacked` mapped to `context.become(_, discardOld = false)` - * - * - `unbecome` mapped to `context.unbecome` - * - * - `setup` for implementing `preStart()` - * - * - `whenFailing` for implementing `preRestart()` - * - * - `whenRestarted` for implementing `postRestart()` - * - * - `teardown` for implementing `postStop` - * - * Using the life-cycle keywords multiple times results in replacing the - * content of the respective hook. - */ - trait Act extends Actor { - - private[this] var preStartFun: () => Unit = null - private[this] var postStopFun: () => Unit = null - private[this] var preRestartFun: (Throwable, Option[Any]) => Unit = null - private[this] var postRestartFun: Throwable => Unit = null - private[this] var strategy: SupervisorStrategy = null - - /** - * @see [[akka.actor.OneForOneStrategy]] - */ - def OneForOneStrategy = akka.actor.OneForOneStrategy - - /** - * @see [[akka.actor.AllForOneStrategy]] - */ - def AllForOneStrategy = akka.actor.AllForOneStrategy - - /** - * @see [[akka.actor.SupervisorStrategy]] - */ - def Stop = SupervisorStrategy.Stop - - /** - * @see [[akka.actor.SupervisorStrategy]] - */ - def Restart = SupervisorStrategy.Restart - - /** - * @see [[akka.actor.SupervisorStrategy]] - */ - def Resume = SupervisorStrategy.Resume - - /** - * @see [[akka.actor.SupervisorStrategy]] - */ - def Escalate = SupervisorStrategy.Escalate - - /** - * Add the given behavior on top of the behavior stack for this actor. This - * stack is cleared upon restart. Use `unbecome()` to pop an element off - * this stack. - */ - def becomeStacked(r: Receive) = context.become(r, discardOld = false) - - /** - * Replace the behavior at the top of the behavior stack for this actor. The - * stack is cleared upon restart. Use `unbecome()` to pop an element off - * this stack or `becomeStacked()` to push a new element on top of it. - */ - def become(r: Receive) = context.become(r, discardOld = true) - - /** - * Pop the active behavior from the behavior stack of this actor. This stack - * is cleared upon restart. - */ - def unbecome(): Unit = context.unbecome() - - /** - * Set the supervisor strategy of this actor, i.e. how it supervises its children. - */ - def superviseWith(s: SupervisorStrategy): Unit = strategy = s - - /** - * Replace the `preStart` action with the supplied thunk. Default action - * is to call `super.preStart()` - */ - def whenStarting(body: => Unit): Unit = preStartFun = () => body - - /** - * Replace the `preRestart` action with the supplied function. Default - * action is to call `super.preRestart()`, which will kill all children - * and invoke `postStop()`. - */ - def whenFailing(body: (Throwable, Option[Any]) => Unit): Unit = preRestartFun = body - - /** - * Replace the `postRestart` action with the supplied function. Default - * action is to call `super.postRestart` which will call `preStart()`. - */ - def whenRestarted(body: Throwable => Unit): Unit = postRestartFun = body - - /** - * Replace the `postStop` action with the supplied thunk. Default action - * is to call `super.postStop`. - */ - def whenStopping(body: => Unit): Unit = postStopFun = () => body - - override def preStart(): Unit = if (preStartFun != null) preStartFun() else super.preStart() - override def preRestart(cause: Throwable, msg: Option[Any]): Unit = - if (preRestartFun != null) preRestartFun(cause, msg) else super.preRestart(cause, msg) - override def postRestart(cause: Throwable): Unit = - if (postRestartFun != null) postRestartFun(cause) else super.postRestart(cause) - override def postStop(): Unit = if (postStopFun != null) postStopFun() else super.postStop() - override def supervisorStrategy: SupervisorStrategy = if (strategy != null) strategy else super.supervisorStrategy - - /** - * Default behavior of the actor is empty, use `become` to change this. - */ - override def receive: Receive = Actor.emptyBehavior - } - - /** - * Use this trait when defining an [[akka.actor.Actor]] with [[akka.actor.Stash]], - * since just using `actor()(new Act with Stash{})` will not be able to see the - * Stash component due to type erasure. - */ - trait ActWithStash extends Act with Stash - - private def mkProps(classOfActor: Class[_], ctor: () => Actor): Props = - Props(classOf[TypedCreatorFunctionConsumer], classOfActor, ctor) - - /** - * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. - * - * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] - * factory; do not make the generated object accessible to code - * outside and do not return the same object upon subsequent invocations. - * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can - * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], - * where the latter is always implicitly available within an [[akka.actor.Actor]]. - */ - def actor[T <: Actor: ClassTag](ctor: => T)(implicit factory: ActorRefFactory): ActorRef = { - // configure dispatcher/mailbox based on runtime class - val classOfActor = implicitly[ClassTag[T]].runtimeClass - val props = mkProps(classOfActor, () => ctor) - factory.actorOf(props) - } - - /** - * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. - * - * @param name is the name, which must be unique within the context of its - * parent. - * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] - * factory; do not make the generated object accessible to code - * outside and do not return the same object upon subsequent invocations. - * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can - * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], - * where the latter is always implicitly available within an [[akka.actor.Actor]]. - */ - def actor[T <: Actor: ClassTag](name: String)(ctor: => T)(implicit factory: ActorRefFactory): ActorRef = { - // configure dispatcher/mailbox based on runtime class - val classOfActor = implicitly[ClassTag[T]].runtimeClass - val props = mkProps(classOfActor, () => ctor) - - if (name == null) factory.actorOf(props) - else factory.actorOf(props, name) - } - - /** - * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. - * - * @param name is the name, which must be unique within the context of its - * parent; defaults to `null` which will assign a name automatically. - * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] - * factory; do not make the generated object accessible to code - * outside and do not return the same object upon subsequent invocations. - * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can - * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], - * where the latter is always implicitly available within an [[akka.actor.Actor]]. - */ - def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: => T): ActorRef = - actor(name)(ctor)(implicitly[ClassTag[T]], factory) - - /** - * Create an actor with an automatically generated name from the given thunk - * which must produce an [[akka.actor.Actor]]. - * - * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] - * factory; do not make the generated object accessible to code - * outside and do not return the same object upon subsequent invocations. - * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can - * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], - * where the latter is always implicitly available within an [[akka.actor.Actor]]. - */ - def actor[T <: Actor: ClassTag](factory: ActorRefFactory)(ctor: => T): ActorRef = - actor(null: String)(ctor)(implicitly[ClassTag[T]], factory) - -} diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala deleted file mode 100644 index c707bee1a1..0000000000 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.actor.dsl - -import scala.concurrent.Await -import akka.actor.ActorLogging - -import scala.collection.immutable.TreeSet -import scala.concurrent.duration._ -import akka.actor.Cancellable -import akka.actor.Actor - -import scala.collection.mutable.Queue -import akka.actor.ActorSystem -import akka.actor.ActorRef -import akka.util.Timeout -import akka.actor.Status -import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicInteger - -import akka.pattern.ask -import akka.actor.ActorDSL -import akka.actor.Props -import com.github.ghik.silencer.silent - -/** - * INTERNAL API - */ -private[akka] object Inbox { - - private sealed trait Query { - def deadline: Deadline - def withClient(c: ActorRef): Query - def client: ActorRef - } - private final case class Get(deadline: Deadline, client: ActorRef = null) extends Query { - def withClient(c: ActorRef) = copy(client = c) - } - private final case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) - extends Query { - def withClient(c: ActorRef) = copy(client = c) - } - private final case class StartWatch(target: ActorRef) - private case object Kick - -} - -@silent -trait Inbox { this: ActorDSL.type => - - import Inbox._ - - protected trait InboxExtension { this: Extension => - val DSLInboxQueueSize = config.getInt("inbox-size") - - val inboxNr = new AtomicInteger - val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize) - - def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet) - } - - private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { - def compare(left: Query, right: Query): Int = left.deadline.time.compare(right.deadline.time) - } - - private class InboxActor(size: Int) extends Actor with ActorLogging { - var clients = Queue.empty[Query] - val messages = Queue.empty[Any] - var clientsByTimeout = TreeSet.empty[Query] - var printedWarning = false - - def enqueueQuery(q: Query): Unit = { - val query = q.withClient(sender()) - clients.enqueue(query) - clientsByTimeout += query - } - - def enqueueMessage(msg: Any): Unit = { - if (messages.size < size) messages.enqueue(msg) - else { - if (!printedWarning) { - log.warning( - "dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size) - printedWarning = true - } - } - } - - var currentMsg: Any = _ - val clientPredicate: (Query) => Boolean = { - case _: Get => true - case Select(_, p, _) => p.isDefinedAt(currentMsg) - case _ => false - } - - var currentSelect: Select = _ - val messagePredicate: (Any => Boolean) = (msg) => currentSelect.predicate.isDefinedAt(msg) - - var currentDeadline: Option[(Deadline, Cancellable)] = None - - def receive = - ({ - case g: Get => - if (messages.isEmpty) enqueueQuery(g) - else sender() ! messages.dequeue() - case s: Select => - if (messages.isEmpty) enqueueQuery(s) - else { - currentSelect = s - messages.dequeueFirst(messagePredicate) match { - case Some(msg) => sender() ! msg - case None => enqueueQuery(s) - } - currentSelect = null - } - case StartWatch(target) => context.watch(target) - case Kick => - val now = Deadline.now - val pred = (q: Query) => q.deadline.time < now.time - val overdue = clientsByTimeout.iterator.takeWhile(pred) - while (overdue.hasNext) { - val toKick = overdue.next() - toKick.client ! Status.Failure(new TimeoutException("deadline passed")) - } - clients = clients.filterNot(pred) - clientsByTimeout = clientsByTimeout.from(Get(now)) - case msg => - if (clients.isEmpty) enqueueMessage(msg) - else { - currentMsg = msg - clients.dequeueFirst(clientPredicate) match { - case Some(q) => { clientsByTimeout -= q; q.client ! msg } - case None => enqueueMessage(msg) - } - currentMsg = null - } - }: Receive).andThen { _ => - if (clients.isEmpty) { - if (currentDeadline.isDefined) { - currentDeadline.get._2.cancel() - currentDeadline = None - } - } else { - val next = clientsByTimeout.head.deadline - import context.dispatcher - if (currentDeadline.isEmpty) { - currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) - } else { - // must not rely on the Scheduler to not fire early (for robustness) - currentDeadline.get._2.cancel() - currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) - } - } - } - } - - /* - * make sure that AskTimeout does not accidentally mess up message reception - * by adding this extra time to the real timeout - */ - private val extraTime = 1.minute - - /** - * Create a new actor which will internally queue up messages it gets so that - * they can be interrogated with the `akka.actor.dsl.Inbox!.Inbox!.receive` - * and `akka.actor.dsl.Inbox!.Inbox!.select` methods. It will be created as - * a system actor in the ActorSystem which is implicitly (or explicitly) - * supplied. - */ - def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system) - - class Inbox(system: ActorSystem) extends akka.actor.Inbox { - - val receiver: ActorRef = Extension(system).newReceiver - - // Java API - def getRef: ActorRef = receiver - def send(target: ActorRef, msg: AnyRef): Unit = target.tell(msg, receiver) - - private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout - - /** - * Receive a single message from the internal `receiver` actor. The supplied - * timeout is used for cleanup purposes and its precision is subject to the - * resolution of the system’s scheduler (usually 100ms, but configurable). - * - * Warning: This method blocks the current thread until a message is - * received, thus it can introduce dead-locks (directly as well as - * indirectly by causing starvation of the thread pool). Do not use - * this method within an actor! - */ - def receive(timeout: FiniteDuration = defaultTimeout): Any = { - implicit val t = Timeout(timeout + extraTime) - Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf) - } - - /** - * Receive a single message for which the given partial function is defined - * and return the transformed result, using the internal `receiver` actor. - * The supplied timeout is used for cleanup purposes and its precision is - * subject to the resolution of the system’s scheduler (usually 100ms, but - * configurable). - * - * Warning: This method blocks the current thread until a message is - * received, thus it can introduce dead-locks (directly as well as - * indirectly by causing starvation of the thread pool). Do not use - * this method within an actor! - */ - def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = { - implicit val t = Timeout(timeout + extraTime) - predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) - } - - /** - * Make the inbox’s actor watch the target actor such that reception of the - * Terminated message can then be awaited. - */ - def watch(target: ActorRef): Unit = receiver ! StartWatch(target) - - /** - * Overridden finalizer which will try to stop the actor once this Inbox - * is no longer referenced. - */ - override def finalize(): Unit = { - system.stop(receiver) - } - } - - implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver -} diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerProxySpec.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerProxySpec.scala deleted file mode 100644 index eda603ef46..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerProxySpec.scala +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.contrib.circuitbreaker - -import akka.actor.{ ActorRef, PoisonPill } -import akka.contrib.circuitbreaker.CircuitBreakerProxy._ -import akka.testkit.{ AkkaSpec, TestProbe } -import akka.util.Timeout -import org.scalatest.GivenWhenThen - -import scala.concurrent.duration._ -import scala.language.postfixOps - -class CircuitBreakerProxySpec extends AkkaSpec() with GivenWhenThen { - - val baseCircuitBreakerPropsBuilder = - CircuitBreakerPropsBuilder(maxFailures = 2, callTimeout = 200 millis, resetTimeout = 1 second, failureDetector = { - _ == "FAILURE" - }) - - trait CircuitBreakerScenario { - val sender = TestProbe() - val eventListener = TestProbe() - val receiver = TestProbe() - - def circuitBreaker: ActorRef - - def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) - - def receiverRespondsWithFailureToRequest(request: Any) = { - sender.send(circuitBreaker, request) - receiver.expectMsg(request) - receiver.reply("FAILURE") - sender.expectMsg("FAILURE") - } - - def receiverRespondsToRequestWith(request: Any, reply: Any) = { - sender.send(circuitBreaker, request) - receiver.expectMsg(request) - receiver.reply(reply) - sender.expectMsg(reply) - } - - def circuitBreakerReceivesSelfNotificationMessage() = - receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration / 4) - - def resetTimeoutExpires() = - receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration + 100.millis) - - def callTimeoutExpiresWithoutResponse() = - sender.expectNoMsg(baseCircuitBreakerPropsBuilder.callTimeout.duration + 100.millis) - - def messageIsRejectedWithOpenCircuitNotification(message: Any) = { - sender.send(circuitBreaker, message) - sender.expectMsg(CircuitOpenFailure(message)) - } - - } - - "CircuitBreakerActor" should { - - "act as a transparent proxy in case of successful requests-replies - forward to target" in { - Given("A circuit breaker proxy pointing to a target actor") - val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) - - When("A message is sent to the proxy actor") - TestProbe().send(circuitBreaker, "test message") - - Then("The target actor receives the message") - receiver.expectMsg("test message") - } - - "act as a transparent proxy in case of successful requests-replies - full cycle" in { - Given("A circuit breaker proxy pointing to a target actor") - val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) - - When("A sender sends a message to the target actor via the proxy actor") - val sender = TestProbe() - sender.send(circuitBreaker, "test message") - - receiver.expectMsg("test message") - - And("The target actor replies to the message") - receiver.reply("response") - - Then("The reply is sent to the sender") - sender.expectMsg("response") - } - - "forward further messages before receiving the response of the first one" in { - Given("A circuit breaker proxy pointing to a target actor") - val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) - - When("A batch of messages is sent to the target actor via the proxy") - val sender = TestProbe() - sender.send(circuitBreaker, "test message1") - sender.send(circuitBreaker, "test message2") - sender.send(circuitBreaker, "test message3") - - And("The receiver doesn't reply to any of those messages") - - Then("All the messages in the batch are sent") - receiver.expectMsg("test message1") - receiver.expectMsg("test message2") - receiver.expectMsg("test message3") - } - - "send responses to the right sender" in { - Given("A circuit breaker proxy pointing to a target actor") - val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) - - And("Two different senders actors") - val sender1 = TestProbe() - val sender2 = TestProbe() - - When("The two actors are sending messages to the target actor through the proxy") - sender1.send(circuitBreaker, "test message1") - sender2.send(circuitBreaker, "test message2") - - And("The target actor replies to those messages") - receiver.expectMsg("test message1") - receiver.reply("response1") - - receiver.expectMsg("test message2") - receiver.reply("response2") - - Then("The replies are forwarded to the correct sender") - sender1.expectMsg("response1") - sender2.expectMsg("response2") - } - - "return failed responses too" in { - Given("A circuit breaker proxy pointing to a target actor") - val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) - - When("A sender sends a request to the target actor through the proxy") - val sender = TestProbe() - sender.send(circuitBreaker, "request") - - And("The target actor replies with a failure response") - receiver.expectMsg("request") - receiver.reply("FAILURE") - - Then("The failure response is returned ") - sender.expectMsg("FAILURE") - } - - "enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario { - Given("A circuit breaker proxy pointing to a target actor") - val circuitBreaker = defaultCircuitBreaker - - When("A number of consecutive request equal to the maxFailures configuration of the circuit breaker is failing") - (1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index => - receiverRespondsWithFailureToRequest(s"request$index") - } - - circuitBreakerReceivesSelfNotificationMessage() - - Then("The circuit is in Open stage: If a further message is sent it is not forwarded") - sender.send(circuitBreaker, "request in open state") - receiver.expectNoMsg - } - - "respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario { - Given("A circuit breaker proxy pointing to a target actor") - val circuitBreaker = defaultCircuitBreaker - - When("A number of consecutive request equal to the maxFailures configuration of the circuit breaker is failing") - (1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index => - receiverRespondsWithFailureToRequest(s"request$index") - } - - circuitBreakerReceivesSelfNotificationMessage() - - Then("The circuit is in Open stage: any further request is replied-to with a CircuitOpenFailure response") - sender.send(circuitBreaker, "request in open state") - sender.expectMsg(CircuitOpenFailure("request in open state")) - } - - "respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario { - Given( - "A circuit breaker proxy pointing to a target actor built with a function to convert CircuitOpenFailure response into a String response") - val circuitBreaker = system.actorOf( - baseCircuitBreakerPropsBuilder - .copy(openCircuitFailureConverter = { failureMsg => - s"NOT SENT: ${failureMsg.failedMsg}" - }) - .props(receiver.ref)) - - When("A number of consecutive request equal to the maxFailures configuration of the circuit breaker is failing") - (1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index => - receiverRespondsWithFailureToRequest(s"request$index") - } - - circuitBreakerReceivesSelfNotificationMessage() - - Then("Any further request receives instead of the CircuitOpenFailure response the converted one") - sender.send(circuitBreaker, "request in open state") - sender.expectMsg("NOT SENT: request in open state") - } - - "enter open state after reaching the threshold of timed-out responses" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When( - "A number of request equal to the timed-out responses threashold is done without receiving response within the configured timeout") - sender.send(circuitBreaker, "request1") - sender.send(circuitBreaker, "request2") - - callTimeoutExpiresWithoutResponse() - - receiver.expectMsg("request1") - receiver.reply("this should be timed out 1") - - receiver.expectMsg("request2") - receiver.reply("this should be timed out 2") - - circuitBreakerReceivesSelfNotificationMessage() - - Then("The circuit is in Open stage: any further request is replied-to with a CircuitOpenFailure response") - sender.send(circuitBreaker, "request in open state") - receiver.expectNoMsg - } - - "enter HALF OPEN state after the given state timeout, sending the first message only" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("ENTERING OPEN STATE") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - circuitBreakerReceivesSelfNotificationMessage() - - Then("Messages are ignored") - messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1") - messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2") - - When("ENTERING HALF OPEN STATE") - resetTimeoutExpires() - - Then("First message should be forwarded, following ones ignored if the failure persist") - sender.send(circuitBreaker, "First message in half-open state, should be forwarded") - sender.send(circuitBreaker, "Second message in half-open state, should be ignored") - - receiver.expectMsg("First message in half-open state, should be forwarded") - receiver.expectNoMsg() - - sender.expectMsg(CircuitOpenFailure("Second message in half-open state, should be ignored")) - - } - - "return to CLOSED state from HALF-OPEN if a successful message response notification is received" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("Entering HALF OPEN state") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - resetTimeoutExpires() - - And("Receiving a successful response") - receiverRespondsToRequestWith( - "First message in half-open state, should be forwarded", - "This should close the circuit") - - circuitBreakerReceivesSelfNotificationMessage() - - Then("circuit is re-closed") - sender.send(circuitBreaker, "request1") - receiver.expectMsg("request1") - - sender.send(circuitBreaker, "request2") - receiver.expectMsg("request2") - - } - - "return to OPEN state from HALF-OPEN if a FAILURE message response is received" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("Entering HALF OPEN state") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - resetTimeoutExpires() - - And("Receiving a failure response") - receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded") - - circuitBreakerReceivesSelfNotificationMessage() - - Then("circuit is opened again") - sender.send(circuitBreaker, "this should be ignored") - receiver.expectNoMsg() - sender.expectMsg(CircuitOpenFailure("this should be ignored")) - - } - - "notify an event status change listener when changing state" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - override val circuitBreaker = system.actorOf( - baseCircuitBreakerPropsBuilder - .copy(circuitEventListener = Some(eventListener.ref)) - .props(target = receiver.ref)) - - When("Entering OPEN state") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - circuitBreakerReceivesSelfNotificationMessage() - - Then("An event is sent") - eventListener.expectMsg(CircuitOpen(circuitBreaker)) - - When("Entering HALF OPEN state") - resetTimeoutExpires() - - Then("An event is sent") - eventListener.expectMsg(CircuitHalfOpen(circuitBreaker)) - - When("Entering CLOSED state") - receiverRespondsToRequestWith( - "First message in half-open state, should be forwarded", - "This should close the circuit") - Then("An event is sent") - eventListener.expectMsg(CircuitClosed(circuitBreaker)) - - } - - "stop if the target actor terminates itself" in new CircuitBreakerScenario { - Given("An actor that will terminate when receiving a message") - import akka.actor.ActorDSL._ - val suicidalActor = actor(new Act { - become { - case anyMessage => - sender() ! "dying now" - context.stop(self) - } - }) - - And("A circuit breaker actor proxying another actor") - val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = suicidalActor)) - - val suicidalActorWatch = TestProbe() - suicidalActorWatch.watch(suicidalActor) - - val circuitBreakerWatch = TestProbe() - circuitBreakerWatch.watch(circuitBreaker) - - When("The target actor stops") - sender.send(circuitBreaker, "this message will kill the target") - sender.expectMsg("dying now") - suicidalActorWatch.expectTerminated(suicidalActor) - - Then("The circuit breaker proxy actor is terminated too") - circuitBreakerWatch.expectTerminated(circuitBreaker) - } - - "stop if the target actor is stopped" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - val receiverActorWatch = TestProbe() - receiverActorWatch.watch(receiver.ref) - - val circuitBreakerWatch = TestProbe() - circuitBreakerWatch.watch(circuitBreaker) - - When("The target actor stops") - sender.send(circuitBreaker, Passthrough(PoisonPill)) - receiverActorWatch.expectTerminated(receiver.ref) - - Then("The circuit breaker proxy actor is terminated too") - circuitBreakerWatch.expectTerminated(circuitBreaker) - } - - "send a any message enveloped into a TellOnly case class without expecting a response in closed state" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When( - "A number of request equal to the timed-out responses wrapped in a TellOnly threashold is done without receiving response within the configured timeout") - sender.send(circuitBreaker, TellOnly("Fire and forget 1")) - sender.send(circuitBreaker, TellOnly("Fire and forget 2")) - receiver.expectMsg("Fire and forget 1") - receiver.expectMsg("Fire and forget 2") - - And("No response is received") - callTimeoutExpiresWithoutResponse() - - Then("The circuit is still closed") - sender.send(circuitBreaker, "This should be received too") - receiver.expectMsg("This should be received too") - } - - "block messages wrapped in TellOnly when in open state" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("Circuit enters OPEN state") - (1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index => - receiverRespondsWithFailureToRequest(s"request$index") - } - - circuitBreakerReceivesSelfNotificationMessage() - - Then("A TellOnly wrapped message is not sent") - sender.send(circuitBreaker, TellOnly("This should NOT be received")) - receiver.expectNoMsg() - } - - "send a any message enveloped into a Passthrough case class without expecting a response even in closed state" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("Circuit enters OPEN state") - (1 to baseCircuitBreakerPropsBuilder.maxFailures).foreach { index => - receiverRespondsWithFailureToRequest(s"request$index") - } - - circuitBreakerReceivesSelfNotificationMessage() - - Then("A Passthrough wrapped message is sent") - sender.send(circuitBreaker, Passthrough("This should be received")) - receiver.expectMsg("This should be received") - - And("The circuit is still closed for ordinary messages") - sender.send(circuitBreaker, "This should NOT be received") - receiver.expectNoMsg() - } - } - - "Ask Extension" should { - import Implicits.askWithCircuitBreaker - - import scala.concurrent.ExecutionContext.Implicits.global - implicit val timeout: Timeout = 2.seconds - - "work as a ASK pattern if circuit is closed" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("Doing a askWithCircuitBreaker request") - val responseFuture = circuitBreaker.askWithCircuitBreaker("request") - - Then("The message is sent to the target actor") - receiver.expectMsg("request") - - When("Then target actor replies") - receiver.reply("response") - - Then("The response is available as result of the future returned by the askWithCircuitBreaker method") - whenReady(responseFuture) { response => - response should be("response") - } - } - - "transform the response into a failure with CircuitOpenException cause if circuit is open" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("The circuit breaker proxy enters OPEN state") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - circuitBreakerReceivesSelfNotificationMessage() - - And("Doing a askWithCircuitBreaker request") - val responseFuture = circuitBreaker.askWithCircuitBreaker("request") - - Then("The message is NOT sent to the target actor") - receiver.expectNoMsg() - - And("The response is converted into a failure") - whenReady(responseFuture.failed) { failure => - failure shouldBe a[OpenCircuitException] - } - } - } - - "Future Extension" should { - import Implicits.futureExtensions - import akka.pattern.ask - - import scala.concurrent.ExecutionContext.Implicits.global - implicit val timeout: Timeout = 2.seconds - - "work as a ASK pattern if circuit is closed" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("Doing a askWithCircuitBreaker request") - val responseFuture = (circuitBreaker ? "request").failForOpenCircuit - - Then("The message is sent to the target actor") - receiver.expectMsg("request") - - When("Then target actor replies") - receiver.reply("response") - - Then("The response is available as result of the future returned by the askWithCircuitBreaker method") - whenReady(responseFuture) { response => - response should be("response") - } - } - - "transform the response into a failure with CircuitOpenException cause if circuit is open" in new CircuitBreakerScenario { - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("The circuit breaker proxy enters OPEN state") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - circuitBreakerReceivesSelfNotificationMessage() - - And("Doing a askWithCircuitBreaker request") - val responseFuture = (circuitBreaker ? "request").failForOpenCircuit - - Then("The message is NOT sent to the target actor") - receiver.expectNoMsg() - - And("The response is converted into a failure") - whenReady(responseFuture.failed) { failure => - failure shouldBe a[OpenCircuitException] - } - } - - "transform the response into a failure with the given exception as cause if circuit is open" in new CircuitBreakerScenario { - class MyException(message: String) extends Exception(message) - - Given("A circuit breaker actor proxying a test probe") - val circuitBreaker = defaultCircuitBreaker - - When("The circuit breaker proxy enters OPEN state") - receiverRespondsWithFailureToRequest("request1") - receiverRespondsWithFailureToRequest("request2") - - circuitBreakerReceivesSelfNotificationMessage() - - And("Doing a askWithCircuitBreaker request") - val responseFuture = (circuitBreaker ? "request").failForOpenCircuitWith(new MyException("Circuit is open")) - - Then("The message is NOT sent to the target actor") - receiver.expectNoMsg() - - And("The response is converted into a failure") - whenReady(responseFuture.failed) { failure => - failure shouldBe a[MyException] - failure.getMessage() should be("Circuit is open") - } - } - } - -} diff --git a/akka-docs/src/main/paradox/actors.md b/akka-docs/src/main/paradox/actors.md index a21fd0c401..fe86592d6e 100644 --- a/akka-docs/src/main/paradox/actors.md +++ b/akka-docs/src/main/paradox/actors.md @@ -292,42 +292,6 @@ are described in more depth in the [Using Akka with Dependency Injection](http://letitcrash.com/post/55958814293/akka-dependency-injection) guideline and the [Akka Java Spring](https://github.com/typesafehub/activator-akka-java-spring) tutorial. -### The Inbox - -When writing code outside of actors which shall communicate with actors, the -`ask` pattern can be a solution (see below), but there are two things it -cannot do: receiving multiple replies (e.g. by subscribing an `ActorRef` -to a notification service) and watching other actors’ lifecycle. For these -purposes there is the `Inbox` class: - -Scala -: @@snip [ActorDSLSpec.scala](/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala) { #inbox } - -Java -: @@snip [InboxDocTest.java](/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java) { #inbox } - - -@@@ div { .group-scala } - -There is an implicit conversion from inbox to actor reference which means that -in this example the sender reference will be that of the actor hidden away -within the inbox. This allows the reply to be received on the last line. -Watching an actor is quite simple as well: - -@@snip [ActorDSLSpec.scala](/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala) { #watch } - -@@@ - -@@@ div { .group-java } - -The `send` method wraps a normal `tell` and supplies the internal -actor’s reference as the sender. This allows the reply to be received on the -last line. Watching an actor is quite simple as well: - -@@snip [InboxDocTest.java](/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java) { #watch } - -@@@ - ## Actor API @scala[The `Actor` trait defines only one abstract method, the above mentioned diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 63b93392f6..1d171089cd 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -3,3 +3,9 @@ ## Scala 2.11 no longer supported If you are still using Scala 2.11 then you must upgrade to 2.12 or 2.13 + +### Actor DSL removal + +Actor DSL is a rarely used feature and has been deprecated since `2.5.0`. +Use plain `system.actorOf` instead of the DSL to create Actors if you have been using it. + diff --git a/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java b/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java deleted file mode 100644 index 9e78e1437a..0000000000 --- a/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package jdocs.actor; - -import akka.testkit.AkkaJUnitActorSystemResource; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.ClassRule; -import org.junit.Test; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Inbox; -import akka.actor.PoisonPill; -import akka.actor.Terminated; -import akka.testkit.AkkaSpec; - -import java.time.Duration; - -public class InboxDocTest extends AbstractJavaTest { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("InboxDocTest", AkkaSpec.testConf()); - - private final ActorSystem system = actorSystemResource.getSystem(); - - @Test - public void demonstrateInbox() { - final TestKit probe = new TestKit(system); - final ActorRef target = probe.getRef(); - // #inbox - final Inbox inbox = Inbox.create(system); - inbox.send(target, "hello"); - // #inbox - probe.expectMsgEquals("hello"); - probe.send(probe.getLastSender(), "world"); - // #inbox - try { - assert inbox.receive(Duration.ofSeconds(1)).equals("world"); - } catch (java.util.concurrent.TimeoutException e) { - // timeout - } - // #inbox - } - - @Test - public void demonstrateWatch() { - final TestKit probe = new TestKit(system); - final ActorRef target = probe.getRef(); - // #watch - final Inbox inbox = Inbox.create(system); - inbox.watch(target); - target.tell(PoisonPill.getInstance(), ActorRef.noSender()); - try { - assert inbox.receive(Duration.ofSeconds(1)) instanceof Terminated; - } catch (java.util.concurrent.TimeoutException e) { - // timeout - } - // #watch - } -} diff --git a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala index 75c5486ebe..910b4854d2 100644 --- a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala @@ -724,14 +724,6 @@ class ActorDocSpec extends AkkaSpec(""" lastSender.path.toStringWithoutAddress should be("/user") } - "using ActorDSL outside of akka.actor package" in { - import akka.actor.ActorDSL._ - actor(new Act { - superviseWith(OneForOneStrategy() { case _ => Stop; Restart; Resume; Escalate }) - superviseWith(AllForOneStrategy() { case _ => Stop; Restart; Resume; Escalate }) - }) - } - "using CoordinatedShutdown" in { val someActor = system.actorOf(Props(classOf[Replier], this)) //#coordinated-shutdown-addTask diff --git a/akka-docs/src/test/scala/docs/io/EchoServer.scala b/akka-docs/src/test/scala/docs/io/EchoServer.scala index c452e003ea..8ca58da0a0 100644 --- a/akka-docs/src/test/scala/docs/io/EchoServer.scala +++ b/akka-docs/src/test/scala/docs/io/EchoServer.scala @@ -6,34 +6,24 @@ package docs.io import java.net.InetSocketAddress -import scala.concurrent.duration.DurationInt - import com.typesafe.config.ConfigFactory - -import akka.actor.{ Actor, ActorDSL, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy } -import akka.actor.ActorDSL.inbox +import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy } import akka.io.{ IO, Tcp } import akka.util.ByteString +import scala.io.StdIn + object EchoServer extends App { val config = ConfigFactory.parseString("akka.loglevel = DEBUG") implicit val system = ActorSystem("EchoServer", config) - // make sure to stop the system so that the application stops - try run() - finally system.terminate() - - def run(): Unit = { - import ActorDSL._ - - // create two EchoManager and stop the application once one dies - val watcher = inbox() - watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo")) - watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple")) - watcher.receive(10.minutes) - } + system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo") + system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple") + println("Press enter to exit...") + StdIn.readLine() + system.terminate() } class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging { diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 246cad4b17..d26210e846 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -133,7 +133,6 @@ object AkkaBuild { initialCommands := """|import language.postfixOps |import akka.actor._ - |import ActorDSL._ |import scala.concurrent._ |import com.typesafe.config.ConfigFactory |import scala.concurrent.duration._