Moved Faulthandling into Supvervision

This commit is contained in:
Viktor Klang 2010-10-19 10:44:27 +02:00
parent e9c946dda4
commit 7b1d23428b
29 changed files with 73 additions and 82 deletions

View file

@ -6,8 +6,7 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.{NoFaultHandlingStrategy, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy,
LifeCycle, Temporary, Permanent, UndefinedLifeCycle}
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{ TransactionManagement, TransactionSetAbortedException }

View file

@ -12,7 +12,7 @@ import Actor._
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.net.InetSocketAddress
import se.scalablesolutions.akka.config. {Supervision, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.Supervision._
class SupervisorException private[akka](message: String) extends AkkaException(message)

View file

@ -6,7 +6,6 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.Supervision._
import java.net.InetSocketAddress

View file

@ -7,52 +7,6 @@ package se.scalablesolutions.akka.config
import se.scalablesolutions.akka.actor.{ActorRef}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
sealed abstract class FaultHandlingStrategy {
def trapExit: List[Class[_ <: Throwable]]
}
object AllForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new AllForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]],
maxNrOfRetries: Option[Int] = None,
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
object OneForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new OneForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]],
maxNrOfRetries: Option[Int] = None,
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case object NoFaultHandlingStrategy extends FaultHandlingStrategy {
def trapExit: List[Class[_ <: Throwable]] = Nil
}
sealed abstract class LifeCycle
case object Permanent extends LifeCycle
case object Temporary extends LifeCycle
case object UndefinedLifeCycle extends LifeCycle
case class RemoteAddress(val hostname: String, val port: Int)
/**
@ -65,6 +19,7 @@ object Supervision {
abstract class Server extends ConfigElement
abstract class FailOverScheme extends ConfigElement
sealed abstract class LifeCycle extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server {
//Java API
@ -92,10 +47,15 @@ object Supervision {
class AllForOne extends FailOverScheme
class OneForOne extends FailOverScheme
//Scala API
case object Permanent extends LifeCycle
case object Temporary extends LifeCycle
case object UndefinedLifeCycle extends LifeCycle
//Java API (& Scala if you fancy)
def permanent() = se.scalablesolutions.akka.config.Permanent
def temporary() = se.scalablesolutions.akka.config.Temporary
def undefinedLifeCycle = se.scalablesolutions.akka.config.UndefinedLifeCycle
def permanent() = Permanent
def temporary() = Temporary
def undefinedLifeCycle = UndefinedLifeCycle
//Scala API
object AllForOne extends AllForOne { def apply() = this }
@ -158,4 +118,44 @@ object Supervision {
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
}
sealed abstract class FaultHandlingStrategy {
def trapExit: List[Class[_ <: Throwable]]
}
object AllForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new AllForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]],
maxNrOfRetries: Option[Int] = None,
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
object OneForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new OneForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]],
maxNrOfRetries: Option[Int] = None,
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case object NoFaultHandlingStrategy extends FaultHandlingStrategy {
def trapExit: List[Class[_ <: Throwable]] = Nil
}
}

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
import se.scalablesolutions.akka.config._
import se.scalablesolutions.akka.config.Supervision._
import org.scalatest.junit.JUnitSuite
import org.junit.Test

View file

@ -10,9 +10,8 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import java.util.concurrent.{TimeUnit, CountDownLatch}
import se.scalablesolutions.akka.config.{Permanent, LifeCycle}
import se.scalablesolutions.akka.config.Supervision.{Permanent, LifeCycle, OneForOneStrategy}
import org.multiverse.api.latches.StandardLatch
class RestartStrategySpec extends JUnitSuite {

View file

@ -8,7 +8,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.config.Supervision.OneForOneStrategy
import java.util.concurrent.{TimeUnit, CountDownLatch}

View file

@ -6,9 +6,8 @@ package se.scalablesolutions.akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.config.Supervision.{RestartStrategy, SupervisorConfig, OneForOne, Supervise}
import se.scalablesolutions.akka.config.Supervision.{RestartStrategy, SupervisorConfig, OneForOne, Supervise, Permanent}
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.config.Permanent
class SupervisorMiscSpec extends WordSpec with MustMatchers {
"A Supervisor" should {

View file

@ -11,7 +11,6 @@ import Actor._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import se.scalablesolutions.akka.config.{Temporary, Permanent, OneForOneStrategy}
object SupervisorSpec {
var messageLog = new LinkedBlockingQueue[String]
@ -78,7 +77,7 @@ object SupervisorSpec {
class TemporaryActor extends Actor {
import self._
lifeCycle = se.scalablesolutions.akka.config.Temporary
lifeCycle = Temporary
def receive = {
case Ping =>
messageLog.put("ping")

View file

@ -6,7 +6,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.config.Supervision._
import org.multiverse.api.latches.StandardLatch
import org.junit.Test
import se.scalablesolutions.akka.config.Permanent
class SchedulerSpec extends JUnitSuite {
@ -99,7 +98,7 @@ class SchedulerSpec extends JUnitSuite {
val pingLatch = new CountDownLatch(6)
val actor = actorOf(new Actor {
self.lifeCycle = se.scalablesolutions.akka.config.Permanent
self.lifeCycle = Permanent
def receive = {
case Ping => pingLatch.countDown

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.config.Supervision.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import ConnectionFactory._
import com.rabbitmq.client.AMQP.BasicProperties

View file

@ -8,7 +8,7 @@ import java.util.{TimerTask, Timer}
import java.io.IOException
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import se.scalablesolutions.akka.config.{ Permanent, OneForOneStrategy }
import se.scalablesolutions.akka.config.Supervision.{ Permanent, OneForOneStrategy }
import se.scalablesolutions.akka.actor.{Exit, Actor}
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor {

View file

@ -11,7 +11,7 @@ import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.{OneForOneStrategy, Permanent}
import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy, Permanent}
import Actor._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.util.Logging

View file

@ -7,7 +7,7 @@ import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent}
import Actor._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.util.Logging

View file

@ -7,7 +7,7 @@ import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent}
import Actor._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.util.Logging

View file

@ -5,7 +5,7 @@ import sbinary.Operations._
import sbinary.DefaultProtocol._
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.{OneForOneStrategy, Permanent}
import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy, Permanent}
import Actor._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.stm.global._

View file

@ -7,7 +7,7 @@ import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor}
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent}
import Actor._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.stm.global._

View file

@ -10,7 +10,8 @@ import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRef, ActorRegistry}
import se.scalablesolutions.akka.util.Logging
import scala.collection.immutable.{Map, HashMap}
import se.scalablesolutions.akka.config. {Permanent, RemoteAddress}
import se.scalablesolutions.akka.config.Supervision.{Permanent}
import se.scalablesolutions.akka.config.{RemoteAddress}
/**
* Interface for interacting with the Cluster Membership API.

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.dispatch.MessageInvocation
import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import ActorTypeProtocol._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy, Permanent, Temporary, UndefinedLifeCycle}
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.actor.{uuidFrom,newUuid}
import se.scalablesolutions.akka.actor._

View file

@ -13,7 +13,6 @@ import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Supervisor, ActorRef, Actor}
import Actor._
import se.scalablesolutions.akka.config.Permanent
object Log {
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]

View file

@ -14,7 +14,7 @@ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll}
import se.scalablesolutions.akka.config. {Permanent, Config, TypedActorConfigurator, RemoteAddress}
import se.scalablesolutions.akka.config.{Config, TypedActorConfigurator, RemoteAddress}
object RemoteTypedActorSpec {
val HOSTNAME = "localhost"

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent}
import se.scalablesolutions.akka.util.Logging
import Actor._

View file

@ -21,7 +21,6 @@ import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
import org.atmosphere.util.XSSHtmlFilter
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
import org.atmosphere.jersey.Broadcastable
import se.scalablesolutions.akka.config.Permanent
class Boot {
val factory = SupervisorFactory(

View file

@ -11,7 +11,6 @@ import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
import se.scalablesolutions.akka.stm.TransactionalMap
import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
import se.scalablesolutions.akka.config.Permanent
class Boot {
val factory = SupervisorFactory(

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor}
import AkkaSpringConfigurationTags._
import reflect.BeanProperty
import se.scalablesolutions.akka.config.{Temporary, Permanent, TypedActorConfigurator, RemoteAddress}
import se.scalablesolutions.akka.config.{TypedActorConfigurator, RemoteAddress}
/**
* Factory bean for supervisor configuration.

View file

@ -5,7 +5,6 @@
package se.scalablesolutions.akka.actor
import Actor._
import se.scalablesolutions.akka.config.FaultHandlingStrategy
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture, Dispatchers}
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.util._

View file

@ -13,7 +13,7 @@ import org.junit.runner.RunWith
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.config. {Temporary, Config, TypedActorConfigurator}
import se.scalablesolutions.akka.config.{Config, TypedActorConfigurator}
@RunWith(classOf[JUnitRunner])
class RestartTransactionalTypedActorSpec extends

View file

@ -17,7 +17,7 @@ import org.junit.runner.RunWith
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.dispatch.FutureTimeoutException
import se.scalablesolutions.akka.config. {Permanent, Config, TypedActorConfigurator}
import se.scalablesolutions.akka.config.{Config, TypedActorConfigurator}
@RunWith(classOf[JUnitRunner])
class TypedActorGuiceConfiguratorSpec extends

View file

@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.TypedActor._
import se.scalablesolutions.akka.config.Supervision._
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.config. {OneForOneStrategy, TypedActorConfigurator}
import se.scalablesolutions.akka.config.TypedActorConfigurator
/**
* @author Martin Krasser