Refactored declarative supervision, removed ScalaConfig and JavaConfig, moved things around
This commit is contained in:
parent
49774393c3
commit
e9c946dda4
41 changed files with 186 additions and 275 deletions
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor
|
|||
|
||||
import se.scalablesolutions.akka.dispatch._
|
||||
import se.scalablesolutions.akka.config.Config._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently}
|
||||
import se.scalablesolutions.akka.util.{Logging, Duration}
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ package se.scalablesolutions.akka.actor
|
|||
|
||||
import se.scalablesolutions.akka.dispatch._
|
||||
import se.scalablesolutions.akka.config.Config._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.{NoFaultHandlingStrategy, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||
import se.scalablesolutions.akka.config.{NoFaultHandlingStrategy, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy,
|
||||
LifeCycle, Temporary, Permanent, UndefinedLifeCycle}
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||
import se.scalablesolutions.akka.stm.{ TransactionManagement, TransactionSetAbortedException }
|
||||
|
|
|
|||
|
|
@ -4,8 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
import se.scalablesolutions.akka.util._
|
||||
import ReflectiveAccess._
|
||||
|
|
@ -13,6 +12,7 @@ import Actor._
|
|||
|
||||
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
|
||||
import java.net.InetSocketAddress
|
||||
import se.scalablesolutions.akka.config. {Supervision, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||
|
||||
class SupervisorException private[akka](message: String) extends AkkaException(message)
|
||||
|
||||
|
|
@ -83,8 +83,8 @@ object SupervisorFactory {
|
|||
config match {
|
||||
case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) =>
|
||||
scheme match {
|
||||
case AllForOne => AllForOneStrategy(trapExceptions,maxNrOfRetries, timeRange)
|
||||
case OneForOne => OneForOneStrategy(trapExceptions,maxNrOfRetries, timeRange)
|
||||
case a:AllForOne => AllForOneStrategy(trapExceptions.toList,maxNrOfRetries, timeRange)
|
||||
case o:OneForOne => OneForOneStrategy(trapExceptions.toList,maxNrOfRetries, timeRange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor
|
|||
import se.scalablesolutions.akka.dispatch._
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
|
|
|
|||
|
|
@ -4,12 +4,12 @@
|
|||
|
||||
package se.scalablesolutions.akka.config
|
||||
|
||||
import ScalaConfig.{RestartStrategy, Component}
|
||||
import se.scalablesolutions.akka.config.Supervision. {SuperviseTypedActor, RestartStrategy}
|
||||
|
||||
private[akka] trait TypedActorConfiguratorBase {
|
||||
def getExternalDependency[T](clazz: Class[T]): T
|
||||
|
||||
def configure(restartStrategy: RestartStrategy, components: List[Component]): TypedActorConfiguratorBase
|
||||
def configure(restartStrategy: RestartStrategy, components: List[SuperviseTypedActor]): TypedActorConfiguratorBase
|
||||
|
||||
def inject: TypedActorConfiguratorBase
|
||||
|
||||
|
|
|
|||
|
|
@ -47,17 +47,24 @@ case object NoFaultHandlingStrategy extends FaultHandlingStrategy {
|
|||
def trapExit: List[Class[_ <: Throwable]] = Nil
|
||||
}
|
||||
|
||||
sealed abstract class LifeCycle
|
||||
|
||||
case object Permanent extends LifeCycle
|
||||
case object Temporary extends LifeCycle
|
||||
case object UndefinedLifeCycle extends LifeCycle
|
||||
|
||||
case class RemoteAddress(val hostname: String, val port: Int)
|
||||
|
||||
/**
|
||||
* Configuration classes - not to be used as messages.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ScalaConfig {
|
||||
object Supervision {
|
||||
sealed abstract class ConfigElement
|
||||
|
||||
abstract class Server extends ConfigElement
|
||||
abstract class FailOverScheme extends ConfigElement
|
||||
abstract class LifeCycle extends ConfigElement
|
||||
|
||||
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server {
|
||||
//Java API
|
||||
|
|
@ -65,7 +72,7 @@ object ScalaConfig {
|
|||
}
|
||||
|
||||
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
|
||||
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
|
||||
val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress)
|
||||
}
|
||||
|
||||
object Supervise {
|
||||
|
|
@ -74,22 +81,27 @@ object ScalaConfig {
|
|||
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress))
|
||||
}
|
||||
|
||||
case class RestartStrategy(
|
||||
scheme: FailOverScheme,
|
||||
maxNrOfRetries: Int,
|
||||
withinTimeRange: Int,
|
||||
trapExceptions: List[Class[_ <: Throwable]]) extends ConfigElement
|
||||
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int, trapExceptions: Array[Class[_ <: Throwable]]) extends ConfigElement
|
||||
|
||||
case object AllForOne extends FailOverScheme
|
||||
case object OneForOne extends FailOverScheme
|
||||
object RestartStrategy {
|
||||
def apply(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int, trapExceptions: List[Class[_ <: Throwable]]) =
|
||||
new RestartStrategy(scheme,maxNrOfRetries,withinTimeRange,trapExceptions.toArray)
|
||||
}
|
||||
|
||||
case object Permanent extends LifeCycle
|
||||
case object Temporary extends LifeCycle
|
||||
case object UndefinedLifeCycle extends LifeCycle
|
||||
//Java API
|
||||
class AllForOne extends FailOverScheme
|
||||
class OneForOne extends FailOverScheme
|
||||
|
||||
case class RemoteAddress(val hostname: String, val port: Int) extends ConfigElement
|
||||
//Java API (& Scala if you fancy)
|
||||
def permanent() = se.scalablesolutions.akka.config.Permanent
|
||||
def temporary() = se.scalablesolutions.akka.config.Temporary
|
||||
def undefinedLifeCycle = se.scalablesolutions.akka.config.UndefinedLifeCycle
|
||||
|
||||
class Component(_intf: Class[_],
|
||||
//Scala API
|
||||
object AllForOne extends AllForOne { def apply() = this }
|
||||
object OneForOne extends OneForOne { def apply() = this }
|
||||
|
||||
case class SuperviseTypedActor(_intf: Class[_],
|
||||
val target: Class[_],
|
||||
val lifeCycle: LifeCycle,
|
||||
val timeout: Long,
|
||||
|
|
@ -97,165 +109,53 @@ object ScalaConfig {
|
|||
_dispatcher: MessageDispatcher, // optional
|
||||
_remoteAddress: RemoteAddress // optional
|
||||
) extends Server {
|
||||
val intf: Option[Class[_]] = if (_intf eq null) None else Some(_intf)
|
||||
val dispatcher: Option[MessageDispatcher] = if (_dispatcher eq null) None else Some(_dispatcher)
|
||||
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
|
||||
}
|
||||
object Component {
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
new Component(intf, target, lifeCycle, timeout, false, null, null)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
new Component(null, target, lifeCycle, timeout, false, null, null)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
|
||||
new Component(intf, target, lifeCycle, timeout, false, dispatcher, null)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
|
||||
new Component(null, target, lifeCycle, timeout, false, dispatcher, null)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
|
||||
new Component(intf, target, lifeCycle, timeout, false, null, remoteAddress)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
|
||||
new Component(null, target, lifeCycle, timeout, false, null, remoteAddress)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
new Component(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
new Component(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) =
|
||||
new Component(intf, target, lifeCycle, timeout, transactionRequired, null, null)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) =
|
||||
new Component(null, target, lifeCycle, timeout, transactionRequired, null, null)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
|
||||
new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
|
||||
new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
|
||||
new Component(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
|
||||
new Component(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
|
||||
|
||||
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
|
||||
|
||||
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object JavaConfig {
|
||||
import scala.reflect.BeanProperty
|
||||
|
||||
sealed abstract class ConfigElement
|
||||
|
||||
class RestartStrategy(
|
||||
@BeanProperty val scheme: FailOverScheme,
|
||||
@BeanProperty val maxNrOfRetries: Int,
|
||||
@BeanProperty val withinTimeRange: Int,
|
||||
@BeanProperty val trapExceptions: Array[Class[_ <: Throwable]]) extends ConfigElement {
|
||||
def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartStrategy(
|
||||
scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
|
||||
}
|
||||
|
||||
abstract class LifeCycle extends ConfigElement {
|
||||
def transform: se.scalablesolutions.akka.config.ScalaConfig.LifeCycle
|
||||
}
|
||||
|
||||
class Permanent extends LifeCycle {
|
||||
override def transform = se.scalablesolutions.akka.config.ScalaConfig.Permanent
|
||||
}
|
||||
|
||||
class Temporary extends LifeCycle {
|
||||
override def transform = se.scalablesolutions.akka.config.ScalaConfig.Temporary
|
||||
}
|
||||
|
||||
class UndefinedLifeCycle extends LifeCycle {
|
||||
override def transform = se.scalablesolutions.akka.config.ScalaConfig.UndefinedLifeCycle
|
||||
}
|
||||
|
||||
abstract class FailOverScheme extends ConfigElement {
|
||||
def transform: se.scalablesolutions.akka.config.ScalaConfig.FailOverScheme
|
||||
}
|
||||
class AllForOne extends FailOverScheme {
|
||||
override def transform = se.scalablesolutions.akka.config.ScalaConfig.AllForOne
|
||||
}
|
||||
class OneForOne extends FailOverScheme {
|
||||
override def transform = se.scalablesolutions.akka.config.ScalaConfig.OneForOne
|
||||
}
|
||||
|
||||
class RemoteAddress(@BeanProperty val hostname: String, @BeanProperty val port: Int)
|
||||
|
||||
abstract class Server extends ConfigElement
|
||||
class Component(@BeanProperty val intf: Class[_],
|
||||
@BeanProperty val target: Class[_],
|
||||
@BeanProperty val lifeCycle: LifeCycle,
|
||||
@BeanProperty val timeout: Long,
|
||||
@BeanProperty val transactionRequired: Boolean, // optional
|
||||
@BeanProperty val dispatcher: MessageDispatcher, // optional
|
||||
@BeanProperty val remoteAddress: RemoteAddress // optional
|
||||
) extends Server {
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
this(intf, target, lifeCycle, timeout, false, null, null)
|
||||
val intf: Option[Class[_]] = Option(_intf)
|
||||
val dispatcher: Option[MessageDispatcher] = Option(_dispatcher)
|
||||
val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
this(null, target, lifeCycle, timeout, false, null, null)
|
||||
this(null: Class[_], target, lifeCycle, timeout, false, null.asInstanceOf[MessageDispatcher], null: RemoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
|
||||
this(intf, target, lifeCycle, timeout, false, null, remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
|
||||
this(null, target, lifeCycle, timeout, false, null, remoteAddress)
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
|
||||
this(intf, target, lifeCycle, timeout, false, null.asInstanceOf[MessageDispatcher], null: RemoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
|
||||
this(intf, target, lifeCycle, timeout, false, dispatcher, null)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
|
||||
this(null, target, lifeCycle, timeout, false, dispatcher, null)
|
||||
this(null: Class[_], target, lifeCycle, timeout, false, dispatcher, null:RemoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
|
||||
this(intf, target, lifeCycle, timeout, false, null: MessageDispatcher, remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
|
||||
this(null: Class[_], target, lifeCycle, timeout, false, null, remoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
this(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
this(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
|
||||
this(null: Class[_], target, lifeCycle, timeout, false, dispatcher, remoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) =
|
||||
this(intf, target, lifeCycle, timeout, transactionRequired, null, null)
|
||||
this(intf, target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, null: RemoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) =
|
||||
this(null, target, lifeCycle, timeout, transactionRequired, null, null)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
|
||||
this(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
|
||||
this(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
|
||||
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, null: RemoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
|
||||
this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
|
||||
this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null: RemoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
|
||||
this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
|
||||
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, null: RemoteAddress)
|
||||
|
||||
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
|
||||
this(intf, target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
|
||||
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, remoteAddress)
|
||||
|
||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||
this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
|
||||
|
||||
def transform =
|
||||
se.scalablesolutions.akka.config.ScalaConfig.Component(
|
||||
intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher,
|
||||
if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
|
||||
|
||||
def newSupervised(actorRef: ActorRef) =
|
||||
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform)
|
||||
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package se.scalablesolutions.akka.config;
|
||||
|
||||
import se.scalablesolutions.akka.actor.ActorRef;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static se.scalablesolutions.akka.config.Supervision.*;
|
||||
|
||||
public class SupervisionConfig {
|
||||
/*Just some sample code to demonstrate the declarative supervision configuration for Java */
|
||||
public SupervisorConfig createSupervisorConfig(List<ActorRef> toSupervise) {
|
||||
ArrayList<Server> targets = new ArrayList<Server>(toSupervise.size());
|
||||
for(ActorRef ref : toSupervise) {
|
||||
targets.add(new Supervise(ref, permanent(), new RemoteAddress("localhost",9999)));
|
||||
}
|
||||
|
||||
return new SupervisorConfig(new RestartStrategy(new OneForOne(),50,1000,new Class[]{ Exception.class}), targets.toArray(new Server[0]));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import org.junit.Test
|
|||
import Actor._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
|
||||
import se.scalablesolutions.akka.config.{Permanent, LifeCycle}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
||||
class RestartStrategySpec extends JUnitSuite {
|
||||
|
|
|
|||
|
|
@ -6,8 +6,9 @@ package se.scalablesolutions.akka.actor
|
|||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{RestartStrategy, SupervisorConfig, LifeCycle, Permanent, OneForOne, Supervise}
|
||||
import se.scalablesolutions.akka.config.Supervision.{RestartStrategy, SupervisorConfig, OneForOne, Supervise}
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import se.scalablesolutions.akka.config.Permanent
|
||||
|
||||
class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||
"A Supervisor" should {
|
||||
|
|
@ -57,7 +58,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
|||
|
||||
val sup = Supervisor(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
|
||||
RestartStrategy(new OneForOne, 3, 5000, List(classOf[Exception])),
|
||||
Supervise(actor1, Permanent) ::
|
||||
Supervise(actor2, Permanent) ::
|
||||
Supervise(actor3, Permanent) ::
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.{OneWay, Die, Ping}
|
||||
import Actor._
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
|
||||
import se.scalablesolutions.akka.config.{Temporary, Permanent, OneForOneStrategy}
|
||||
|
||||
object SupervisorSpec {
|
||||
var messageLog = new LinkedBlockingQueue[String]
|
||||
|
|
@ -78,7 +78,7 @@ object SupervisorSpec {
|
|||
|
||||
class TemporaryActor extends Actor {
|
||||
import self._
|
||||
lifeCycle = Temporary
|
||||
lifeCycle = se.scalablesolutions.akka.config.Temporary
|
||||
def receive = {
|
||||
case Ping =>
|
||||
messageLog.put("ping")
|
||||
|
|
|
|||
|
|
@ -3,9 +3,10 @@ package se.scalablesolutions.akka.actor
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import Actor._
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.config.Permanent
|
||||
|
||||
class SchedulerSpec extends JUnitSuite {
|
||||
|
||||
|
|
@ -98,7 +99,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
val pingLatch = new CountDownLatch(6)
|
||||
|
||||
val actor = actorOf(new Actor {
|
||||
self.lifeCycle = Permanent
|
||||
self.lifeCycle = se.scalablesolutions.akka.config.Permanent
|
||||
|
||||
def receive = {
|
||||
case Ping => pingLatch.countDown
|
||||
|
|
@ -107,6 +108,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
|
||||
override def postRestart(reason: Throwable) = restartLatch.open
|
||||
})
|
||||
|
||||
Supervisor(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(AllForOne, 3, 1000,
|
||||
|
|
|
|||
|
|
@ -8,8 +8,7 @@ import java.util.{TimerTask, Timer}
|
|||
import java.io.IOException
|
||||
import com.rabbitmq.client._
|
||||
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent}
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{ Permanent, OneForOneStrategy }
|
||||
import se.scalablesolutions.akka.actor.{Exit, Actor}
|
||||
|
||||
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.security
|
||||
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
|
||||
import org.scalatest.Suite
|
||||
|
|
|
|||
|
|
@ -11,10 +11,9 @@ import org.scalatest.junit.JUnitRunner
|
|||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy, Permanent}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import StorageObj._
|
||||
|
||||
|
|
|
|||
|
|
@ -7,10 +7,9 @@ import org.scalatest.junit.JUnitRunner
|
|||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import HbaseStorageBackend._
|
||||
|
|
|
|||
|
|
@ -7,10 +7,9 @@ import org.scalatest.junit.JUnitRunner
|
|||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import MongoStorageBackend._
|
||||
|
|
|
|||
|
|
@ -5,11 +5,10 @@ import sbinary.Operations._
|
|||
import sbinary.DefaultProtocol._
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy, Permanent}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.persistence.common.PersistentVector
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import java.util.{Calendar, Date}
|
||||
|
|
|
|||
|
|
@ -7,11 +7,10 @@ import org.scalatest.junit.JUnitRunner
|
|||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor}
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.persistence.common.PersistentVector
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import RedisStorageBackend._
|
||||
|
|
|
|||
|
|
@ -5,11 +5,12 @@
|
|||
package se.scalablesolutions.akka.remote
|
||||
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRef, ActorRegistry}
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import scala.collection.immutable.{Map, HashMap}
|
||||
import se.scalablesolutions.akka.config. {Permanent, RemoteAddress}
|
||||
|
||||
/**
|
||||
* Interface for interacting with the Cluster Membership API.
|
||||
|
|
|
|||
|
|
@ -11,8 +11,7 @@ import se.scalablesolutions.akka.dispatch.MessageInvocation
|
|||
import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer}
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
|
||||
import ActorTypeProtocol._
|
||||
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy, Permanent, Temporary, UndefinedLifeCycle}
|
||||
import se.scalablesolutions.akka.actor.{uuidFrom,newUuid}
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
|
|
|
|||
|
|
@ -6,13 +6,14 @@ package se.scalablesolutions.akka.actor.remote
|
|||
|
||||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import se.scalablesolutions.akka.serialization.BinaryString
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
|
||||
import se.scalablesolutions.akka.OneWay
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{Test, Before, After}
|
||||
import se.scalablesolutions.akka.actor.{SupervisorFactory, Supervisor, ActorRef, Actor}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.config.Permanent
|
||||
|
||||
object Log {
|
||||
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
|
|
|
|||
|
|
@ -8,15 +8,13 @@ import org.scalatest.matchers.ShouldMatchers
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
|
||||
|
||||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll}
|
||||
import se.scalablesolutions.akka.config. {Permanent, Config, TypedActorConfigurator, RemoteAddress}
|
||||
|
||||
object RemoteTypedActorSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
|
|
@ -50,18 +48,18 @@ class RemoteTypedActorSpec extends
|
|||
server.start("localhost", 9995)
|
||||
Config.config
|
||||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
new RestartStrategy(AllForOne(), 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
classOf[RemoteTypedActorOne],
|
||||
classOf[RemoteTypedActorOneImpl],
|
||||
new Permanent,
|
||||
Permanent,
|
||||
10000,
|
||||
new RemoteAddress("localhost", 9995)),
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
classOf[RemoteTypedActorTwo],
|
||||
classOf[RemoteTypedActorTwoImpl],
|
||||
new Permanent,
|
||||
Permanent,
|
||||
10000,
|
||||
new RemoteAddress("localhost", 9995))
|
||||
).toArray).supervise
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext
|
|||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.actor.{TypedActor, Supervisor}
|
||||
import se.scalablesolutions.akka.camel.CamelContextManager
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
|
|||
|
|
@ -11,8 +11,7 @@ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
|
|||
import se.scalablesolutions.akka.persistence.common.PersistentVector
|
||||
import se.scalablesolutions.akka.persistence.redis.RedisStorage
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy,Permanent}
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import Actor._
|
||||
|
||||
|
|
|
|||
|
|
@ -5,23 +5,23 @@
|
|||
package sample.rest.java;
|
||||
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator;
|
||||
import static se.scalablesolutions.akka.config.JavaConfig.*;
|
||||
import static se.scalablesolutions.akka.config.Supervision.*;
|
||||
|
||||
public class Boot {
|
||||
public final static TypedActorConfigurator configurator = new TypedActorConfigurator();
|
||||
static {
|
||||
configurator.configure(
|
||||
new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
|
||||
new Component[] {
|
||||
new Component(
|
||||
new SuperviseTypedActor[] {
|
||||
new SuperviseTypedActor(
|
||||
SimpleService.class,
|
||||
SimpleServiceImpl.class,
|
||||
new Permanent(),
|
||||
permanent(),
|
||||
1000),
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
PersistentSimpleService.class,
|
||||
PersistentSimpleServiceImpl.class,
|
||||
new Permanent(),
|
||||
permanent(),
|
||||
1000)
|
||||
}).supervise();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
|
|||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter
|
||||
import scala.xml.NodeSeq
|
||||
|
|
@ -21,6 +21,7 @@ import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
|||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
import org.atmosphere.jersey.Broadcastable
|
||||
import se.scalablesolutions.akka.config.Permanent
|
||||
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
|
|
|
|||
|
|
@ -6,11 +6,12 @@ package sample.security
|
|||
|
||||
import se.scalablesolutions.akka.actor.{SupervisorFactory, Transactor, Actor}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
|
||||
import se.scalablesolutions.akka.config.Permanent
|
||||
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import org.springframework.beans.factory.config.AbstractFactoryBean
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import AkkaSpringConfigurationTags._
|
||||
import reflect.BeanProperty
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.spring
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import org.springframework.beans.factory.xml.{ParserContext, AbstractSingleBeanDefinitionParser}
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import AkkaSpringConfigurationTags._
|
||||
|
||||
|
||||
|
|
@ -47,7 +47,7 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser
|
|||
}
|
||||
|
||||
private[akka] def parseRestartStrategy(element: Element, builder: BeanDefinitionBuilder) {
|
||||
val failover = if (mandatory(element, FAILOVER) == "AllForOne") new AllForOne() else new OneForOne()
|
||||
val failover = if (mandatory(element, FAILOVER) == "AllForOne") AllForOne else OneForOne
|
||||
val timeRange = mandatory(element, TIME_RANGE).toInt
|
||||
val retries = mandatory(element, RETRIES).toInt
|
||||
val trapExitsElement = mandatoryElement(element, TRAP_EXISTS_TAG)
|
||||
|
|
@ -71,7 +71,7 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser
|
|||
private def parseTrapExits(element: Element): Array[Class[_ <: Throwable]] = {
|
||||
import StringReflect._
|
||||
val trapExits = DomUtils.getChildElementsByTagName(element, TRAP_EXIT_TAG).toArray.toList.asInstanceOf[List[Element]]
|
||||
trapExits.map(DomUtils.getTextValue(_).toClass).toArray
|
||||
trapExits.map(DomUtils.getTextValue(_).toClass.asInstanceOf[Class[_ <: Throwable]]).toArray
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -4,13 +4,11 @@
|
|||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import org.springframework.beans.factory.config.AbstractFactoryBean
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{Supervise, Server, SupervisorConfig, RemoteAddress => SRemoteAddress}
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor}
|
||||
import AkkaSpringConfigurationTags._
|
||||
import reflect.BeanProperty
|
||||
|
||||
import se.scalablesolutions.akka.config.{Temporary, Permanent, TypedActorConfigurator, RemoteAddress}
|
||||
|
||||
/**
|
||||
* Factory bean for supervisor configuration.
|
||||
|
|
@ -46,7 +44,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
|||
private def createInstanceForUntypedActors() : Supervisor = {
|
||||
val factory = new SupervisorFactory(
|
||||
new SupervisorConfig(
|
||||
restartStrategy.transform,
|
||||
restartStrategy,
|
||||
supervised.map(createSupervise(_))))
|
||||
factory.newInstance
|
||||
}
|
||||
|
|
@ -54,24 +52,24 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
|||
/**
|
||||
* Create configuration for TypedActor
|
||||
*/
|
||||
private[akka] def createComponent(props: ActorProperties): Component = {
|
||||
private[akka] def createComponent(props: ActorProperties): SuperviseTypedActor = {
|
||||
import StringReflect._
|
||||
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new Temporary() else new Permanent()
|
||||
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) Temporary else Permanent
|
||||
val isRemote = (props.host ne null) && (!props.host.isEmpty)
|
||||
val withInterface = (props.interface ne null) && (!props.interface.isEmpty)
|
||||
if (isRemote) {
|
||||
//val remote = new RemoteAddress(props.host, props.port)
|
||||
val remote = new RemoteAddress(props.host, props.port.toInt)
|
||||
if (withInterface) {
|
||||
new Component(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional, remote)
|
||||
new SuperviseTypedActor(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional, remote)
|
||||
} else {
|
||||
new Component(props.target.toClass, lifeCycle, props.timeout, props.transactional, remote)
|
||||
new SuperviseTypedActor(props.target.toClass, lifeCycle, props.timeout, props.transactional, remote)
|
||||
}
|
||||
} else {
|
||||
if (withInterface) {
|
||||
new Component(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional)
|
||||
new SuperviseTypedActor(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional)
|
||||
} else {
|
||||
new Component(props.target.toClass, lifeCycle, props.timeout, props.transactional)
|
||||
new SuperviseTypedActor(props.target.toClass, lifeCycle, props.timeout, props.transactional)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -81,7 +79,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
|||
*/
|
||||
private[akka] def createSupervise(props: ActorProperties): Server = {
|
||||
import StringReflect._
|
||||
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new Temporary() else new Permanent()
|
||||
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) Temporary else Permanent
|
||||
val isRemote = (props.host ne null) && (!props.host.isEmpty)
|
||||
val actorRef = Actor.actorOf(props.target.toClass)
|
||||
if (props.timeout > 0) {
|
||||
|
|
@ -92,10 +90,10 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
|||
}
|
||||
|
||||
val supervise = if (isRemote) {
|
||||
val remote = new SRemoteAddress(props.host, props.port.toInt)
|
||||
Supervise(actorRef, lifeCycle.transform, remote)
|
||||
val remote = new RemoteAddress(props.host, props.port.toInt)
|
||||
Supervise(actorRef, lifeCycle, remote)
|
||||
} else {
|
||||
Supervise(actorRef, lifeCycle.transform)
|
||||
Supervise(actorRef, lifeCycle)
|
||||
}
|
||||
supervise
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import org.scalatest.Spec
|
|||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.dispatch.MessageDispatcher
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
|
|
|
|||
|
|
@ -9,10 +9,9 @@ import org.scalatest.junit.JUnitRunner
|
|||
import org.junit.runner.RunWith
|
||||
import ScalaDom._
|
||||
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
|
||||
import org.w3c.dom.Element
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import se.scalablesolutions.akka.config.Supervision. {RestartStrategy, AllForOne}
|
||||
|
||||
/**
|
||||
* Test for SupervisionBeanDefinitionParser
|
||||
|
|
@ -39,7 +38,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
|||
val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[RestartStrategy]
|
||||
assert(strategy ne null)
|
||||
assert(strategy.scheme match {
|
||||
case x:AllForOne => true
|
||||
case AllForOne => true
|
||||
case _ => false })
|
||||
expect(3) { strategy.maxNrOfRetries }
|
||||
expect(1000) { strategy.withinTimeRange }
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import org.scalatest.Spec
|
|||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
|
||||
private[akka] class Foo
|
||||
|
|
@ -15,7 +15,7 @@ private[akka] class Foo
|
|||
@RunWith(classOf[JUnitRunner])
|
||||
class SupervisionFactoryBeanTest extends Spec with ShouldMatchers {
|
||||
|
||||
val restartStrategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Throwable]))
|
||||
val restartStrategy = new RestartStrategy(AllForOne(), 3, 1000, Array(classOf[Throwable]))
|
||||
val typedActors = List(createTypedActorProperties("se.scalablesolutions.akka.spring.Foo", "1000"))
|
||||
|
||||
private def createTypedActorProperties(target: String, timeout: String) : ActorProperties = {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor
|
|||
import Actor._
|
||||
import se.scalablesolutions.akka.config.FaultHandlingStrategy
|
||||
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture, Dispatchers}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import ReflectiveAccess._
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.config
|
||||
|
||||
import JavaConfig._
|
||||
import Supervision._
|
||||
|
||||
import java.util.{List => JList}
|
||||
import java.util.{ArrayList}
|
||||
|
|
@ -43,10 +43,10 @@ class TypedActorConfigurator {
|
|||
*/
|
||||
def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head
|
||||
|
||||
def configure(restartStrategy: RestartStrategy, components: Array[Component]): TypedActorConfigurator = {
|
||||
def configure(restartStrategy: RestartStrategy, components: Array[SuperviseTypedActor]): TypedActorConfigurator = {
|
||||
INSTANCE.configure(
|
||||
restartStrategy.transform,
|
||||
components.toList.asInstanceOf[scala.List[Component]].map(_.transform))
|
||||
restartStrategy,
|
||||
components.toList.asInstanceOf[scala.List[SuperviseTypedActor]])
|
||||
this
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package se.scalablesolutions.akka.config
|
||||
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import ReflectiveAccess._
|
||||
|
||||
|
|
@ -28,11 +28,11 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
private var injector: Injector = _
|
||||
private var supervisor: Option[Supervisor] = None
|
||||
private var restartStrategy: RestartStrategy = _
|
||||
private var components: List[Component] = _
|
||||
private var components: List[SuperviseTypedActor] = _
|
||||
private var supervised: List[Supervise] = Nil
|
||||
private var bindings: List[DependencyBinding] = Nil
|
||||
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
|
||||
private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
|
||||
private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
|
||||
private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
|
||||
private var modules = new java.util.ArrayList[Module]
|
||||
private var methodToUriRegistry = new HashMap[Method, String]
|
||||
|
||||
|
|
@ -68,10 +68,10 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
else c.target
|
||||
}
|
||||
|
||||
override def configure(restartStrategy: RestartStrategy, components: List[Component]):
|
||||
override def configure(restartStrategy: RestartStrategy, components: List[SuperviseTypedActor]):
|
||||
TypedActorConfiguratorBase = synchronized {
|
||||
this.restartStrategy = restartStrategy
|
||||
this.components = components.toArray.toList.asInstanceOf[List[Component]]
|
||||
this.components = components.toArray.toList.asInstanceOf[List[SuperviseTypedActor]]
|
||||
bindings = for (component <- this.components) yield {
|
||||
newDelegatingProxy(component)
|
||||
// if (component.intf.isDefined) newDelegatingProxy(component)
|
||||
|
|
@ -84,7 +84,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
}
|
||||
|
||||
/*
|
||||
private def newSubclassingProxy(component: Component): DependencyBinding = {
|
||||
private def newSubclassingProxy(component: SuperviseTypedActor): DependencyBinding = {
|
||||
val targetClass =
|
||||
if (component.target.isInstanceOf[Class[_ <: TypedActor]]) component.target.asInstanceOf[Class[_ <: TypedActor]]
|
||||
else throw new IllegalArgumentException("TypedActor [" + component.target.getName + "] must be a subclass of TypedActor")
|
||||
|
|
@ -101,7 +101,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
new DependencyBinding(targetClass, proxy)
|
||||
}
|
||||
*/
|
||||
private def newDelegatingProxy(component: Component): DependencyBinding = {
|
||||
private def newDelegatingProxy(component: SuperviseTypedActor): DependencyBinding = {
|
||||
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
|
||||
val interfaceClass = if (component.intf.isDefined) component.intf.get
|
||||
else throw new IllegalActorStateException("No interface for TypedActor specified")
|
||||
|
|
@ -169,8 +169,8 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
|
||||
def reset = synchronized {
|
||||
modules = new java.util.ArrayList[Module]
|
||||
configRegistry = new HashMap[Class[_], Component]
|
||||
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
|
||||
configRegistry = new HashMap[Class[_], SuperviseTypedActor]
|
||||
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
|
||||
methodToUriRegistry = new HashMap[Method, String]
|
||||
injector = null
|
||||
restartStrategy = null
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import org.junit.runner.RunWith
|
|||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
|
|
|
|||
|
|
@ -11,11 +11,9 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.config. {Temporary, Config, TypedActorConfigurator}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RestartTransactionalTypedActorSpec extends
|
||||
|
|
@ -29,15 +27,15 @@ class RestartTransactionalTypedActorSpec extends
|
|||
def before {
|
||||
Config.config
|
||||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
new RestartStrategy(AllForOne, 3, 5000, Array(classOf[Exception])),
|
||||
List(
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
classOf[TransactionalTypedActor],
|
||||
new Temporary,
|
||||
Temporary,
|
||||
10000),
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
classOf[TypedActorFailer],
|
||||
new Temporary,
|
||||
Temporary,
|
||||
10000)
|
||||
).toArray).supervise
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,11 +14,10 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config.TypedActorConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
import se.scalablesolutions.akka.dispatch._
|
||||
import se.scalablesolutions.akka.dispatch.FutureTimeoutException
|
||||
import se.scalablesolutions.akka.config. {Permanent, Config, TypedActorConfigurator}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class TypedActorGuiceConfiguratorSpec extends
|
||||
|
|
@ -36,18 +35,18 @@ class TypedActorGuiceConfiguratorSpec extends
|
|||
conf.addExternalGuiceModule(new AbstractModule {
|
||||
def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON)
|
||||
}).configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
new RestartStrategy(AllForOne(), 3, 5000, Array(classOf[Exception])),
|
||||
List(
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
classOf[Foo],
|
||||
classOf[FooImpl],
|
||||
new Permanent,
|
||||
Permanent,
|
||||
1000,
|
||||
dispatcher),
|
||||
new Component(
|
||||
new SuperviseTypedActor(
|
||||
classOf[Bar],
|
||||
classOf[BarImpl],
|
||||
new Permanent,
|
||||
Permanent,
|
||||
1000,
|
||||
dispatcher)
|
||||
).toArray).inject.supervise
|
||||
|
|
|
|||
|
|
@ -7,10 +7,10 @@ import org.scalatest.matchers.ShouldMatchers
|
|||
|
||||
import se.scalablesolutions.akka.actor.TypedActor._
|
||||
|
||||
import se.scalablesolutions.akka.config.{OneForOneStrategy, TypedActorConfigurator}
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.config.Supervision._
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import se.scalablesolutions.akka.config. {OneForOneStrategy, TypedActorConfigurator}
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -21,9 +21,9 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
var conf2: TypedActorConfigurator = _
|
||||
|
||||
override protected def beforeAll() = {
|
||||
val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception]))
|
||||
val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new Permanent(), 1000)
|
||||
val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new Temporary(), 1000)
|
||||
val strategy = new RestartStrategy(AllForOne(), 3, 1000, Array(classOf[Exception]))
|
||||
val comp3 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], permanent(), 1000)
|
||||
val comp4 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], temporary(), 1000)
|
||||
conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
|
||||
conf2 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue