Removing the old Supervision-DSL and replacing it with a temporary one
This commit is contained in:
parent
d9cc9e3105
commit
69768dbc96
30 changed files with 69 additions and 483 deletions
|
|
@ -1,23 +0,0 @@
|
|||
package akka.config;
|
||||
|
||||
import akka.actor.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static akka.config.Supervision.*;
|
||||
|
||||
public class SupervisionConfig {
|
||||
/*Just some sample code to demonstrate the declarative supervision configuration for Java */
|
||||
@SuppressWarnings("unchecked")
|
||||
public SupervisorConfig createSupervisorConfig(List<ActorRef> toSupervise) {
|
||||
ArrayList<Server> targets = new ArrayList<Server>(toSupervise.size());
|
||||
for(ActorRef ref : toSupervise) {
|
||||
targets.add(new Supervise(ref, permanent(), true));
|
||||
}
|
||||
|
||||
|
||||
return new SupervisorConfig(new AllForOneStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()]));
|
||||
}
|
||||
}
|
||||
|
|
@ -12,8 +12,7 @@ import akka.testkit._
|
|||
import akka.testkit.Testing.sleepFor
|
||||
import akka.util.duration._
|
||||
|
||||
import Actor._
|
||||
import akka.config.Supervision._
|
||||
import akka.actor.Actor._
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
object ActorFireForgetRequestReplySpec {
|
||||
|
|
@ -84,7 +83,7 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be
|
|||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
filterEvents(EventFilter[Exception]("Expected")) {
|
||||
val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val supervisor = Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0)))
|
||||
val actor = actorOf(Props[CrashingActor].withSupervisor(supervisor))
|
||||
actor.isRunning must be(true)
|
||||
actor ! "Die"
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import org.scalatest.matchers.MustMatchers
|
|||
import akka.testkit._
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.config.Supervision._
|
||||
import akka.event.EventHandler
|
||||
|
||||
import FSM._
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import akka.event.EventHandler
|
|||
import Actor._
|
||||
import akka.util.duration._
|
||||
import akka.config.Config.config
|
||||
import akka.config.Supervision._
|
||||
|
||||
object LoggingReceiveSpec {
|
||||
class TestLogActor extends Actor {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import akka.event.EventHandler
|
|||
import akka.testkit.TestEvent._
|
||||
import akka.testkit.EventFilter
|
||||
import Actor._
|
||||
import akka.config.Supervision._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import org.junit.{ Test, Before, After }
|
||||
import java.util.concurrent.{ ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit }
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.actor
|
|||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.config.Supervision.{ SupervisorConfig, Supervise, Permanent }
|
||||
import akka.testkit.{ filterEvents, EventFilter }
|
||||
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
|
|
|
|||
|
|
@ -11,9 +11,8 @@ import org.scalatest.BeforeAndAfterAll
|
|||
|
||||
import akka.testkit.Testing.sleepFor
|
||||
import akka.util.duration._
|
||||
import akka.config.Supervision._
|
||||
import akka.{ Die, Ping }
|
||||
import Actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.testkit.EventFilter
|
||||
|
|
@ -70,120 +69,53 @@ object SupervisorSpec {
|
|||
// =====================================================
|
||||
|
||||
def temporaryActorAllForOne = {
|
||||
val temporaryActor = actorOf(Props[PingPongActor])
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), Some(0)),
|
||||
Supervise(
|
||||
temporaryActor,
|
||||
Temporary)
|
||||
:: Nil))
|
||||
val supervisor = Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0)))
|
||||
val temporaryActor = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
|
||||
(temporaryActor, supervisor)
|
||||
}
|
||||
|
||||
def singleActorAllForOne = {
|
||||
val pingpong = actorOf[PingPongActor]
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
val supervisor = Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))
|
||||
val pingpong = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def singleActorOneForOne = {
|
||||
val pingpong = actorOf[PingPongActor]
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
val supervisor = Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))
|
||||
val pingpong = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsAllForOne = {
|
||||
val pingpong1 = actorOf[PingPongActor]
|
||||
val pingpong2 = actorOf[PingPongActor]
|
||||
val pingpong3 = actorOf[PingPongActor]
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong3,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
val supervisor = Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))
|
||||
val pingpong1 = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
val pingpong2 = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
val pingpong3 = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsOneForOne = {
|
||||
val pingpong1 = actorOf[PingPongActor]
|
||||
val pingpong2 = actorOf[PingPongActor]
|
||||
val pingpong3 = actorOf[PingPongActor]
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong3,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
val supervisor = Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))
|
||||
val pingpong1 = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
val pingpong2 = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
val pingpong3 = actorOf(Props[PingPongActor].withSupervisor(supervisor))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def nestedSupervisorsAllForOne = {
|
||||
val pingpong1 = actorOf[PingPongActor]
|
||||
val pingpong2 = actorOf[PingPongActor]
|
||||
val pingpong3 = actorOf[PingPongActor]
|
||||
val topSupervisor = Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))
|
||||
val pingpong1 = actorOf(Props[PingPongActor].withSupervisor(topSupervisor))
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(Nil, 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong3,
|
||||
Permanent)
|
||||
:: Nil)
|
||||
:: Nil))
|
||||
val middleSupervisor = Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis), topSupervisor)
|
||||
val pingpong2 = actorOf(Props[PingPongActor].withSupervisor(middleSupervisor))
|
||||
val pingpong3 = actorOf(Props[PingPongActor].withSupervisor(middleSupervisor))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
(pingpong1, pingpong2, pingpong3, topSupervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -359,6 +291,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
|
||||
"must attempt restart when exception during restart" in {
|
||||
val inits = new AtomicInteger(0)
|
||||
val supervisor = Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000))
|
||||
|
||||
val dyingActor = actorOf(Props(new Actor {
|
||||
inits.incrementAndGet
|
||||
|
|
@ -369,13 +302,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
case Ping ⇒ tryReply(PongMessage)
|
||||
case Die ⇒ throw new RuntimeException("Expected")
|
||||
}
|
||||
}))
|
||||
|
||||
val supervisor =
|
||||
Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000),
|
||||
Supervise(dyingActor, Permanent) :: Nil))
|
||||
}).withSupervisor(supervisor))
|
||||
|
||||
intercept[RuntimeException] {
|
||||
(dyingActor.?(Die, TimeoutMillis)).get
|
||||
|
|
@ -388,7 +315,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
|
||||
inits.get must be(3)
|
||||
|
||||
supervisor.shutdown()
|
||||
supervisor.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,8 +10,7 @@ import akka.util.duration._
|
|||
import akka.testkit.Testing.sleepFor
|
||||
import akka.testkit.{ EventFilter, filterEvents, filterException }
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.config.Supervision.{ SupervisorConfig, Supervise, Permanent }
|
||||
import Actor._
|
||||
import akka.actor.Actor._
|
||||
|
||||
class SupervisorTreeSpec extends WordSpec with MustMatchers {
|
||||
|
||||
|
|
|
|||
|
|
@ -6,12 +6,11 @@ package akka.actor
|
|||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
|
||||
import akka.actor._
|
||||
import akka.config.Supervision._
|
||||
import akka.testkit.{ filterEvents, EventFilter }
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit.{ TestKit, filterEvents, EventFilter }
|
||||
|
||||
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
||||
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll with TestKit {
|
||||
import Ticket669Spec._
|
||||
|
||||
override def beforeAll = Thread.interrupted() //remove interrupted status.
|
||||
|
|
@ -24,42 +23,29 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
|||
"A supervised actor with lifecycle PERMANENT" should {
|
||||
"be able to reply on failure during preRestart" in {
|
||||
filterEvents(EventFilter[Exception]("test")) {
|
||||
val latch = new CountDownLatch(1)
|
||||
val sender = Actor.actorOf(new Sender(latch))
|
||||
val supervisor = Actor.actorOf(Props(context ⇒ { case _ ⇒ }).
|
||||
withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))
|
||||
val supervisor = Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000))
|
||||
val supervised = Actor.actorOf(Props[Supervised].withSupervisor(supervisor))
|
||||
|
||||
supervised.!("test")(Some(sender))
|
||||
latch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
supervised.!("test")(Some(testActor))
|
||||
expectMsg("failure1")
|
||||
supervisor.stop()
|
||||
}
|
||||
}
|
||||
|
||||
"be able to reply on failure during postStop" in {
|
||||
filterEvents(EventFilter[Exception]("test")) {
|
||||
val latch = new CountDownLatch(1)
|
||||
val sender = Actor.actorOf(new Sender(latch))
|
||||
|
||||
val supervisor = Actor.actorOf(Props(context ⇒ { case _ ⇒ }).
|
||||
withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))
|
||||
val supervisor = Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None))
|
||||
val supervised = Actor.actorOf(Props[Supervised].withSupervisor(supervisor))
|
||||
|
||||
supervised.!("test")(Some(sender))
|
||||
latch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
supervised.!("test")(Some(testActor))
|
||||
expectMsg("failure2")
|
||||
supervisor.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object Ticket669Spec {
|
||||
class Sender(latch: CountDownLatch) extends Actor {
|
||||
def receive = {
|
||||
case "failure1" ⇒ latch.countDown()
|
||||
case "failure2" ⇒ latch.countDown()
|
||||
case _ ⇒ {}
|
||||
}
|
||||
}
|
||||
|
||||
class Supervised extends Actor {
|
||||
def receive = {
|
||||
case msg ⇒ throw new Exception("test")
|
||||
|
|
|
|||
|
|
@ -361,7 +361,6 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
|
||||
"provide default supervision of pooled actors" in {
|
||||
filterException[RuntimeException] {
|
||||
import akka.config.Supervision._
|
||||
val pingCount = new AtomicInteger(0)
|
||||
val deathCount = new AtomicInteger(0)
|
||||
val keepDying = new AtomicBoolean(false)
|
||||
|
|
@ -507,7 +506,6 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
|
||||
"support customizable supervision config of pooled actors" in {
|
||||
filterEvents(EventFilter[IllegalStateException], EventFilter[RuntimeException]) {
|
||||
import akka.config.Supervision._
|
||||
val pingCount = new AtomicInteger(0)
|
||||
val deathCount = new AtomicInteger(0)
|
||||
var keepDying = new AtomicBoolean(false)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
package akka.actor
|
||||
|
||||
import akka.dispatch._
|
||||
import akka.config.Supervision._
|
||||
import akka.util._
|
||||
import akka.serialization.{ Serializer, Serialization }
|
||||
import ReflectiveAccess._
|
||||
|
|
|
|||
|
|
@ -4,180 +4,17 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import akka.AkkaException
|
||||
import akka.util._
|
||||
import ReflectiveAccess._
|
||||
import Actor._
|
||||
import java.util.concurrent.{ CopyOnWriteArrayList }
|
||||
import akka.config.Supervision._
|
||||
import collection.mutable.ListBuffer
|
||||
|
||||
class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class.
|
||||
* These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor'
|
||||
* factory object.
|
||||
* <p/>
|
||||
*
|
||||
* Here is a sample on how to use it:
|
||||
* <pre>
|
||||
* val supervisor = Supervisor(
|
||||
* SupervisorConfig(
|
||||
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
|
||||
* Supervise(
|
||||
* myFirstActor,
|
||||
* Permanent) ::
|
||||
* Supervise(
|
||||
* mySecondActor,
|
||||
* Permanent) ::
|
||||
* Nil))
|
||||
* </pre>
|
||||
*
|
||||
* You dynamically link and unlink child children using the 'link' and 'unlink' methods.
|
||||
* <pre>
|
||||
* supervisor.link(child)
|
||||
* supervisor.unlink(child)
|
||||
* </pre>
|
||||
*
|
||||
* If you are using it from Java you have to use <code>Supervisor.apply(..)</code> like in:
|
||||
* <pre>
|
||||
* Supervisor supervisor = Supervisor.apply(
|
||||
* SupervisorConfig(
|
||||
* ..
|
||||
* ))
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Supervisor {
|
||||
def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start()
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this factory instead of the Supervisor factory object if you want to control
|
||||
* instantiation and starting of the Supervisor, if not then it is easier and better
|
||||
* to use the Supervisor factory object.
|
||||
* <p>
|
||||
* Example usage:
|
||||
* <pre>
|
||||
* val factory = SupervisorFactory(
|
||||
* SupervisorConfig(
|
||||
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
|
||||
* Supervise(
|
||||
* myFirstActor,
|
||||
* Permanent) ::
|
||||
* Supervise(
|
||||
* mySecondActor,
|
||||
* Permanent) ::
|
||||
* Nil))
|
||||
* </pre>
|
||||
*
|
||||
* Then create a new Supervisor tree with the concrete Services we have defined.
|
||||
*
|
||||
* <pre>
|
||||
* val supervisor = factory.newInstance
|
||||
* supervisor.start() // start up all managed servers
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
case class SupervisorFactory(val config: SupervisorConfig) {
|
||||
|
||||
def newInstance: Supervisor = newInstanceFor(config)
|
||||
|
||||
def newInstanceFor(config: SupervisorConfig): Supervisor = {
|
||||
val supervisor = new Supervisor(config.restartStrategy, config.maxRestartsHandler)
|
||||
supervisor.configure(config)
|
||||
supervisor.start()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <b>NOTE:</b>
|
||||
* <p/>
|
||||
* The supervisor class is only used for the configuration system when configuring supervisor
|
||||
* hierarchies declaratively. Should not be used as part of the regular programming API. Instead
|
||||
* wire the children together using 'link', 'startLink' etc.
|
||||
* <p/>
|
||||
* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit) {
|
||||
import Supervisor._
|
||||
|
||||
private val _childActors = new CopyOnWriteArrayList[ActorRef]
|
||||
private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
|
||||
|
||||
private[akka] val supervisor = actorOf(Props(new SupervisorActor(maxRestartsHandler)).withFaultHandler(handler))
|
||||
|
||||
def uuid = supervisor.uuid
|
||||
|
||||
def start(): Supervisor = {
|
||||
this
|
||||
}
|
||||
|
||||
def shutdown(): Unit = supervisor.stop()
|
||||
|
||||
def link(child: ActorRef) = supervisor.link(child)
|
||||
|
||||
def unlink(child: ActorRef) = supervisor.unlink(child)
|
||||
|
||||
def children: List[ActorRef] = {
|
||||
val buf = new ListBuffer[ActorRef]
|
||||
val i = _childActors.iterator()
|
||||
while (i.hasNext) buf += i.next()
|
||||
buf.toList
|
||||
}
|
||||
|
||||
def childSupervisors: List[Supervisor] = {
|
||||
val buf = new ListBuffer[Supervisor]
|
||||
val i = _childSupervisors.iterator()
|
||||
while (i.hasNext) buf += i.next()
|
||||
buf.toList
|
||||
}
|
||||
|
||||
def configure(config: SupervisorConfig): Unit = config match {
|
||||
case SupervisorConfig(_, servers, _) ⇒
|
||||
servers foreach {
|
||||
case Supervise(actorRef, lifeCycle, registerAsRemoteService) ⇒
|
||||
// actorRef.lifeCycle = lifeCycle THIS IS NOT COOL, BUT WAITING FOR https://www.assembla.com/spaces/akka/tickets/1124-supervisor-dsl-doesn-t-make-much-sense-after-the-introduction-of-props
|
||||
supervisor.link(actorRef)
|
||||
|
||||
_childActors.add(actorRef) //TODO Why do we keep this here, mem leak?
|
||||
|
||||
if (ClusterModule.isEnabled && registerAsRemoteService)
|
||||
Actor.remote.register(actorRef)
|
||||
case supervisorConfig @ SupervisorConfig(_, _, _) ⇒ // recursive supervisor configuration
|
||||
val childSupervisor = Supervisor(supervisorConfig)
|
||||
supervisor.link(childSupervisor.supervisor)
|
||||
_childSupervisors.add(childSupervisor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit) extends Actor {
|
||||
|
||||
override def postStop() {
|
||||
val i = linkedActors.iterator
|
||||
while (i.hasNext) {
|
||||
val ref = i.next
|
||||
ref.stop()
|
||||
self.unlink(ref)
|
||||
}
|
||||
}
|
||||
|
||||
class Supervisor(terminationHandling: (ActorContext, Terminated) ⇒ Unit) extends Actor {
|
||||
def receive = {
|
||||
case termination: Terminated ⇒ maxRestartsHandler(self, termination)
|
||||
case unknown ⇒ throw new SupervisorException(
|
||||
"SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]")
|
||||
case t: Terminated ⇒ terminationHandling(context, t)
|
||||
}
|
||||
}
|
||||
|
||||
private val doNothing: (ActorContext, Terminated) ⇒ Unit = (_, _) ⇒ ()
|
||||
|
||||
def apply(faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, supervisor: ActorRef = null,
|
||||
terminationHandling: (ActorContext, Terminated) ⇒ Unit = doNothing): ActorRef =
|
||||
Actor.actorOf(Props(new Supervisor(terminationHandling)).withSupervisor(supervisor).withFaultHandler(faultHandler))
|
||||
}
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
||||
import akka.actor.FaultHandlingStrategy
|
||||
import akka.config.Supervision.SuperviseTypedActor
|
||||
|
||||
private[akka] trait TypedActorConfiguratorBase {
|
||||
def getExternalDependency[T](clazz: Class[T]): T
|
||||
|
||||
def configure(restartStrategy: FaultHandlingStrategy, components: List[SuperviseTypedActor]): TypedActorConfiguratorBase
|
||||
|
||||
def inject: TypedActorConfiguratorBase
|
||||
|
||||
def supervise: TypedActorConfiguratorBase
|
||||
|
||||
def reset
|
||||
|
||||
def stop
|
||||
}
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
||||
import akka.actor.FaultHandlingStrategy
|
||||
import akka.dispatch.MessageDispatcher
|
||||
import akka.actor.{ Terminated, ActorRef }
|
||||
import akka.japi.{ Procedure2 }
|
||||
|
||||
case class RemoteAddress(val hostname: String, val port: Int)
|
||||
|
||||
/**
|
||||
* Configuration classes - not to be used as messages.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Supervision {
|
||||
sealed abstract class ConfigElement
|
||||
|
||||
abstract class Server extends ConfigElement
|
||||
sealed abstract class LifeCycle extends ConfigElement
|
||||
|
||||
case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit = { (aRef, max) ⇒ () }) extends Server {
|
||||
//Java API
|
||||
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy, worker.toList)
|
||||
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler: Procedure2[ActorRef, Terminated]) = this(restartStrategy, worker.toList, { (aRef, max) ⇒ restartHandler.apply(aRef, max) })
|
||||
}
|
||||
|
||||
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server {
|
||||
//Java API
|
||||
def this(actorRef: ActorRef, lifeCycle: LifeCycle) =
|
||||
this(actorRef, lifeCycle, false)
|
||||
}
|
||||
|
||||
object Supervise {
|
||||
def apply(actorRef: ActorRef, lifeCycle: LifeCycle, registerAsRemoteService: Boolean = false) = new Supervise(actorRef, lifeCycle, registerAsRemoteService)
|
||||
def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, false)
|
||||
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.registerAsRemoteService))
|
||||
}
|
||||
|
||||
//Scala API
|
||||
case object Permanent extends LifeCycle
|
||||
case object Temporary extends LifeCycle
|
||||
|
||||
//Java API (& Scala if you fancy)
|
||||
def permanent(): LifeCycle = Permanent
|
||||
def temporary(): LifeCycle = Temporary
|
||||
|
||||
case class SuperviseTypedActor(_intf: Class[_],
|
||||
val target: Class[_],
|
||||
val lifeCycle: LifeCycle,
|
||||
val timeout: Long,
|
||||
_dispatcher: MessageDispatcher // optional
|
||||
) extends Server {
|
||||
val intf: Option[Class[_]] = Option(_intf)
|
||||
val dispatcher: Option[MessageDispatcher] = Option(_dispatcher)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
this(intf, target, lifeCycle, timeout, null: MessageDispatcher)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
|
||||
this(null: Class[_], target, lifeCycle, timeout, dispatcher)
|
||||
}
|
||||
}
|
||||
|
|
@ -5,7 +5,6 @@
|
|||
package akka.routing
|
||||
|
||||
import akka.dispatch.{ Promise }
|
||||
import akka.config.Supervision._
|
||||
import akka.actor._
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import org.scalatest.matchers.MustMatchers
|
|||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor._
|
||||
import akka.config.Supervision._
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
|
|||
|
|
@ -34,9 +34,8 @@ import DeploymentConfig._
|
|||
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
|
||||
import akka.config.{ Config, Supervision }
|
||||
import Supervision._
|
||||
import Config._
|
||||
import akka.config.Config
|
||||
import akka.config.Config._
|
||||
|
||||
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
|
||||
import ActorSerialization._
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
|
|||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.config.Supervision.Temporary
|
||||
import akka.dispatch.MessageDispatcher
|
||||
|
||||
object DurableMailboxSpecActorFactory {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
|
|||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.config.Supervision.Temporary
|
||||
import akka.dispatch.MessageDispatcher
|
||||
|
||||
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoNaiveDurableMailboxStorage) {
|
||||
|
|
|
|||
|
|
@ -5,19 +5,18 @@
|
|||
package akka.remote
|
||||
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
|
||||
import akka.config.{ Config, Supervision }
|
||||
import Supervision._
|
||||
import Status._
|
||||
import Config._
|
||||
import akka.config.Config
|
||||
import akka.config.Config._
|
||||
import akka.actor.Status._
|
||||
import akka.util._
|
||||
import duration._
|
||||
import Helpers._
|
||||
import DeploymentConfig._
|
||||
import akka.util.duration._
|
||||
import akka.util.Helpers._
|
||||
import akka.actor.DeploymentConfig._
|
||||
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
|
||||
import ActorSerialization._
|
||||
import akka.serialization.ActorSerialization._
|
||||
import Compression.LZF
|
||||
import RemoteProtocol._
|
||||
import RemoteDaemonMessageType._
|
||||
|
|
@ -41,18 +40,14 @@ object Remote extends RemoteService {
|
|||
// FIXME configure computeGridDispatcher to what?
|
||||
val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build
|
||||
|
||||
private[remote] lazy val remoteDaemon = new LocalActorRef(
|
||||
Props(new RemoteDaemon).copy(dispatcher = new PinnedDispatcher()),
|
||||
Remote.remoteDaemonServiceName,
|
||||
systemService = true)
|
||||
|
||||
private[remote] lazy val remoteDaemonSupervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
|
||||
Supervise(
|
||||
remoteDaemon,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
OneForOneStrategy(List(classOf[Exception]), None, None)) // is infinite restart what we want?
|
||||
|
||||
private[remote] lazy val remoteDaemon =
|
||||
new LocalActorRef(
|
||||
props = Props(new RemoteDaemon).withDispatcher(new PinnedDispatcher()).withSupervisor(remoteDaemonSupervisor),
|
||||
address = Remote.remoteDaemonServiceName,
|
||||
systemService = true)
|
||||
|
||||
private[remote] lazy val remoteClientLifeCycleHandler = actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -4,10 +4,8 @@
|
|||
|
||||
package akka.serialization
|
||||
|
||||
import akka.config.Supervision._
|
||||
import akka.actor.{ uuidFrom, newUuid }
|
||||
import akka.actor._
|
||||
import DeploymentConfig._
|
||||
import akka.actor.DeploymentConfig._
|
||||
import akka.dispatch.Envelope
|
||||
import akka.util.{ ReflectiveAccess, Duration }
|
||||
import akka.event.EventHandler
|
||||
|
|
@ -192,14 +190,6 @@ object ActorSerialization {
|
|||
case e: Exception ⇒ Stack[PartialFunction[Any, Unit]]()
|
||||
}
|
||||
|
||||
val storedLifeCycle =
|
||||
if (protocol.hasLifeCycle) {
|
||||
protocol.getLifeCycle.getLifeCycle match {
|
||||
case LifeCycleType.PERMANENT ⇒ Permanent
|
||||
case LifeCycleType.TEMPORARY ⇒ Temporary
|
||||
}
|
||||
} else LifeCycleType.PERMANENT
|
||||
|
||||
val storedSupervisor =
|
||||
if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
else None
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import akka.actor.Actor._
|
|||
import akka.actor.Props
|
||||
import akka.actor.TypedActor
|
||||
import akka.camel.CamelContextManager
|
||||
import akka.config.Supervision._
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -24,13 +23,6 @@ class Boot {
|
|||
actorOf[Consumer1]
|
||||
actorOf[Consumer2]
|
||||
|
||||
// Alternatively, use a supervisor for these actors
|
||||
//val supervisor = Supervisor(
|
||||
// SupervisorConfig(
|
||||
// RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
|
||||
// Supervise(actorOf[Consumer1], Permanent) ::
|
||||
// Supervise(actorOf[Consumer2], Permanent) :: Nil))
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Custom Camel route example
|
||||
// -----------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -8,8 +8,7 @@
|
|||
|
||||
import akka.actor.{Actor, ActorRef, Props}
|
||||
import akka.stm._
|
||||
import akka.config.Supervision.{OneForOneStrategy,Permanent}
|
||||
import Actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.event.EventHandler
|
||||
|
||||
/******************************************************************************
|
||||
|
|
|
|||
|
|
@ -6,15 +6,9 @@ package sample.hello
|
|||
|
||||
import akka.actor._
|
||||
import akka.http._
|
||||
import akka.config.Supervision._
|
||||
|
||||
class Boot {
|
||||
val factory =
|
||||
SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(Actor.actorOf[RootEndpoint], Permanent) ::
|
||||
Supervise(Actor.actorOf[HelloEndpoint], Permanent) :: Nil))
|
||||
|
||||
factory.newInstance.start()
|
||||
val supervisor = Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 100))
|
||||
Actor.actorOf(Props[RootEndpoint].withSupervisor(supervisor))
|
||||
Actor.actorOf(Props[HelloEndpoint].withSupervisor(supervisor))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
package akka.spring
|
||||
|
||||
import org.springframework.beans.factory.config.AbstractFactoryBean
|
||||
import akka.config.Supervision._
|
||||
import AkkaSpringConfigurationTags._
|
||||
import reflect.BeanProperty
|
||||
import akka.actor.ActorRef
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.spring
|
|||
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import org.springframework.beans.factory.xml.{ ParserContext, AbstractSingleBeanDefinitionParser }
|
||||
import akka.config.Supervision._
|
||||
import AkkaSpringConfigurationTags._
|
||||
|
||||
import org.w3c.dom.Element
|
||||
|
|
|
|||
|
|
@ -4,8 +4,7 @@
|
|||
package akka.spring
|
||||
|
||||
import org.springframework.beans.factory.config.AbstractFactoryBean
|
||||
import akka.config.Supervision._
|
||||
import akka.actor.{ Supervisor, SupervisorFactory, Actor, ActorRegistry }
|
||||
import akka.actor.{ Supervisor, Actor, ActorRegistry }
|
||||
import AkkaSpringConfigurationTags._
|
||||
import reflect.BeanProperty
|
||||
import akka.config.{ TypedActorConfigurator, RemoteAddress }
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import org.scalatest.Spec
|
|||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import akka.config.Supervision._
|
||||
import akka.dispatch.MessageDispatcher
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import ScalaDom._
|
|||
|
||||
import org.w3c.dom.Element
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import akka.config.Supervision.{ FaultHandlingStrategy, AllForOneStrategy }
|
||||
|
||||
/**
|
||||
* Test for SupervisionBeanDefinitionParser
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import org.scalatest.Spec
|
|||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import akka.config.Supervision._
|
||||
import akka.config.TypedActorConfigurator
|
||||
|
||||
private[akka] class Foo
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue