Merged with master

This commit is contained in:
Jonas Bonér 2011-07-02 20:53:27 +02:00
commit f48d91ccd7
41 changed files with 839 additions and 301 deletions

View file

@ -259,18 +259,16 @@ abstract class ActorModelSpec extends JUnitSuite {
val counter = new CountDownLatch(200)
a.start()
def start = spawn { for (i 1 to 20) { a ! WaitAck(1, counter) } }
for (i 1 to 10) { start }
for (i 1 to 10) { spawn { for (i 1 to 20) { a ! WaitAck(1, counter) } } }
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop()
}
def spawn(f: Unit) = {
val thread = new Thread { override def run { f } }
def spawn(f: Unit) {
val thread = new Thread { override def run { try { f } catch { case e e.printStackTrace } } }
thread.start()
thread
}
@Test
@ -369,3 +367,8 @@ class BalancingDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher =
new BalancingDispatcher("foo") with MessageDispatcherInterceptor
}
class FJDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher =
new FJDispatcher("foo") with MessageDispatcherInterceptor
}

View file

@ -48,6 +48,8 @@ trait BootableActorLoaderService extends Bootable {
abstract override def onLoad = {
super.onLoad
applicationLoader foreach Thread.currentThread.setContextClassLoader
for (loader applicationLoader; clazz BOOT_CLASSES) {
loader.loadClass(clazz).newInstance
}

View file

@ -12,30 +12,229 @@ import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
* A TypedActor in Akka is an implementation of the Active Objects Pattern, i.e. an object with asynchronous method dispatch
*
* It consists of 2 parts:
* The Interface
* The Implementation
*
* Given a combination of Interface and Implementation, a JDK Dynamic Proxy object with the Interface will be returned
*
* The semantics is as follows,
* any methods in the Interface that returns Unit/void will use fire-and-forget semantics (same as Actor !)
* any methods in the Interface that returns Option/JOption will use ask + block-with-timeout-return-none-if-timeout semantics
* any methods in the Interface that returns anything else will use ask + block-with-timeout-throw-if-timeout semantics
*
* TypedActors needs, just like Actors, to be Stopped when they are no longer needed, use TypedActor.stop(proxy)
*/
object TypedActor {
private val selfReference = new ThreadLocal[AnyRef]
/**
* Returns the reference to the proxy when called inside a method call in a TypedActor
*
* Example:
* <p/>
* class FooImpl extends Foo {
* def doFoo {
* val myself = self[Foo]
* }
* }
*
* Useful when you want to send a reference to this TypedActor to someone else.
*
* NEVER EXPOSE "this" to someone else, always use "self[TypeOfInterface(s)]"
*
* @throws IllegalStateException if called outside of the scope of a method on this TypedActor
* @throws ClassCastException if the supplied type T isn't the type of the proxy associated with this TypedActor
*/
def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match {
case null throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!")
case some some
}
@deprecated("This should be replaced with the same immutable configuration that will be used for ActorRef.actorOf", "!!!")
object Configuration { //TODO: Replace this with the new ActorConfiguration when it exists
val defaultTimeout = Duration(Actor.TIMEOUT, "millis")
val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher)
def apply(): Configuration = defaultConfiguration
}
@deprecated("This should be replaced with the same immutable configuration that will be used for ActorRef.actorOf", "!!!")
case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
/**
* This class represents a Method call, and has a reference to the Method to be called and the parameters to supply
* It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized
*/
case class MethodCall(method: Method, parameters: Array[AnyRef]) {
def isOneWay = method.getReturnType == java.lang.Void.TYPE
def returnsFuture_? = classOf[Future[_]].isAssignableFrom(method.getReturnType)
def returnsJOption_? = classOf[akka.japi.Option[_]].isAssignableFrom(method.getReturnType)
def returnsOption_? = classOf[scala.Option[_]].isAssignableFrom(method.getReturnType)
/**
* Invokes the Method on the supplied instance
*
* @throws the underlying exception if there's an InvocationTargetException thrown on the invocation
*/
def apply(instance: AnyRef): AnyRef = try {
parameters match { //TODO: We do not yet obey Actor.SERIALIZE_MESSAGES
case null method.invoke(instance)
case args if args.length == 0 method.invoke(instance)
case args method.invoke(instance, args: _*)
}
} catch { case i: InvocationTargetException throw i.getTargetException }
private def writeReplace(): AnyRef = new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, parameters)
}
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], parameterValues: Array[AnyRef]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues)
}
/**
* Creates a new TypedActor proxy using the supplied configuration,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.newInstance, config, loader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.create, config, loader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(impl, impl.newInstance, config, loader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
}
/**
* Stops the underlying ActorRef for the supplied TypedActor proxy, if any, returns whether it could stop it or not
*/
def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null false
case ref ref.stop; true
}
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match {
case null null
case handler handler.actor
}
/**
* Returns wether the supplied AnyRef is a TypedActor proxy or not
*/
def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null
/**
* Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](constructor: Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
/**
* Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R =
createProxy(interfaces, (ref: AtomVar[R]) constructor.create, config, loader)
/**
* Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, config: Configuration, loader: ClassLoader): R =
createProxy[R](interfaces, (ref: AtomVar[R]) constructor, config, loader)
/* Internal API */
private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {
case null null
case other Proxy.getInvocationHandler(other) match {
case null null
case handler: TypedActorInvocationHandler handler
case _ null
}
}
else null
private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomVar[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader)
}
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, config: Configuration, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), config, loader)
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: Actor, config: Configuration, loader: ClassLoader): T = {
val ref = actorOf(actor)
ref.timeout = config.timeout.toMillis
ref.dispatcher = config.dispatcher
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T]
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
Actor.registry.registerTypedActor(ref, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
proxy
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomVar[R], createInstance: T) extends Actor {
val me = createInstance
def receive = {
case m: MethodCall
selfReference set proxyRef.get
try {
m match {
case m if m.isOneWay m(me)
case m if m.returnsFuture_? self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
case m self reply m(me)
}
if (m.isOneWay) m(me)
else if (m.returnsFuture_?) self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
else self reply m(me)
} finally { selfReference set null }
}
}
case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler {
private[akka] case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler {
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" actor.toString
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
@ -61,111 +260,4 @@ object TypedActor {
}
}
}
object Configuration { //TODO: Replace this with the new ActorConfiguration when it exists
val defaultTimeout = Duration(Actor.TIMEOUT, "millis")
val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher)
def apply(): Configuration = defaultConfiguration
}
case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
case class MethodCall(method: Method, parameters: Array[AnyRef]) {
def isOneWay = method.getReturnType == java.lang.Void.TYPE
def returnsFuture_? = classOf[Future[_]].isAssignableFrom(method.getReturnType)
def returnsJOption_? = classOf[akka.japi.Option[_]].isAssignableFrom(method.getReturnType)
def returnsOption_? = classOf[scala.Option[_]].isAssignableFrom(method.getReturnType)
def apply(instance: AnyRef): AnyRef = try {
parameters match { //We do not yet obey Actor.SERIALIZE_MESSAGES
case null method.invoke(instance)
case args if args.length == 0 method.invoke(instance)
case args method.invoke(instance, args: _*)
}
} catch { case i: InvocationTargetException throw i.getTargetException }
private def writeReplace(): AnyRef = new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, parameters)
}
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], parameterValues: Array[AnyRef]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues)
}
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.newInstance, config, loader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.create, config, loader)
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(impl, impl.newInstance, config, loader)
def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
}
def stop(typedActor: AnyRef): Boolean = getActorRefFor(typedActor) match {
case null false
case ref ref.stop; true
}
def getActorRefFor(typedActor: AnyRef): ActorRef = invocationHandlerFor(typedActor) match {
case null null
case handler handler.actor
}
def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {
case null null
case other Proxy.getInvocationHandler(other) match {
case null null
case handler: TypedActorInvocationHandler handler
case _ null
}
}
else null
def isTypedActor(typedActor_? : AnyRef): Boolean = invocationHandlerFor(typedActor_?) ne null
def createProxy[R <: AnyRef](constructor: Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R =
createProxy(interfaces, (ref: AtomVar[R]) constructor.create, config, loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, config: Configuration, loader: ClassLoader): R =
createProxy[R](interfaces, (ref: AtomVar[R]) constructor, config, loader)
/* Internal API */
private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomVar[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader)
}
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, config: Configuration, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), config, loader)
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: Actor, config: Configuration, loader: ClassLoader): T = {
val ref = actorOf(actor)
ref.timeout = config.timeout.toMillis
ref.dispatcher = config.dispatcher
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T]
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
Actor.registry.registerTypedActor(ref, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
proxy
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces
}

View file

@ -119,12 +119,4 @@ object Config {
val startTime = System.currentTimeMillis
def uptime = (System.currentTimeMillis - startTime) / 1000
val serializers = config.getSection("akka.actor.serializers").map(_.map).getOrElse(Map("default" -> "akka.serialization.JavaSerializer"))
val bindings = config.getSection("akka.actor.serialization-bindings")
.map(_.map)
.map(m Map() ++ m.map { case (k, v: List[String]) Map() ++ v.map((_, k)) }.flatten)
val serializerMap = bindings.map(m m.map { case (k, v: String) (k, serializers(v)) }).getOrElse(Map())
}

View file

@ -67,7 +67,7 @@ class Dispatcher(
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
val config: ThreadPoolConfig = ThreadPoolConfig())
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig())
extends MessageDispatcher {
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
@ -79,16 +79,16 @@ class Dispatcher(
def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _config: ThreadPoolConfig) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
def this(_name: String, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _executorServiceFactoryProvider)
def this(_name: String) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
val name = "akka:event-driven:dispatcher:" + _name
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
private[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
private[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
@ -134,7 +134,7 @@ class Dispatcher(
private[akka] def start {}
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
if (old ne null) {
old.shutdownNow()
}
@ -160,6 +160,8 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = ()
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
@ -201,8 +203,11 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
finally {
dispatcherLock.unlock()
}
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
dispatcher.doneProcessingMailbox(this)
}
/**
@ -271,7 +276,7 @@ class PriorityDispatcher(
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) with PriorityMailbox {
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
@ -282,8 +287,8 @@ class PriorityDispatcher(
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
def this(name: String, comparator: java.util.Comparator[MessageInvocation], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, executorServiceFactoryProvider)
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage

View file

@ -0,0 +1,108 @@
package akka.dispatch
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
import akka.actor.ActorRef
import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinPool, ForkJoinTask }
import java.util.concurrent._
import java.lang.UnsupportedOperationException
/**
* A Dispatcher that uses the ForkJoin library in scala.concurrent.forkjoin
*/
class FJDispatcher(
name: String,
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
forkJoinPoolConfig: ForkJoinPoolConfig = ForkJoinPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, forkJoinPoolConfig) {
def this(name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(name, throughput, throughputDeadlineTime, mailboxType, ForkJoinPoolConfig()) // Needed for Java API usage
def this(name: String, throughput: Int, mailboxType: MailboxType) =
this(name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
this(name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], forkJoinPoolConfig: ForkJoinPoolConfig) =
this(name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, forkJoinPoolConfig)
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
this(name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case b: UnboundedMailbox
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox with FJMailbox {
@inline
final def dispatcher = FJDispatcher.this
@inline
final def enqueue(m: MessageInvocation) = this.add(m)
@inline
final def dequeue(): MessageInvocation = this.poll()
}
case b: BoundedMailbox
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox with FJMailbox {
@inline
final def dispatcher = FJDispatcher.this
}
}
override private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = {
super.doneProcessingMailbox(mbox)
if (FJDispatcher.isCurrentThreadFJThread)
ForkJoinTask.helpQuiesce()
}
}
object FJDispatcher {
def isCurrentThreadFJThread = Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]
}
case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider {
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
new ForkJoinPool(targetParallelism) with ExecutorService {
setAsyncMode(true)
setMaintainsParallelism(true)
override def execute(r: Runnable) {
r match {
case fjmbox: FJMailbox
fjmbox.fjTask.reinitialize()
if (FJDispatcher.isCurrentThreadFJThread) fjmbox.fjTask.fork()
else super.execute[Unit](fjmbox.fjTask)
case _ super.execute(r)
}
}
import java.util.{ Collection JCollection }
def invokeAny[T](callables: JCollection[_ <: Callable[T]]) =
throw new UnsupportedOperationException("invokeAny. NOT!")
def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
throw new UnsupportedOperationException("invokeAny. NOT!")
def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
throw new UnsupportedOperationException("invokeAny. NOT!")
}
}
}
}
trait FJMailbox { self: ExecutableMailbox
val fjTask = new ForkJoinTask[Unit] with Runnable {
var result: Unit = ()
def getRawResult() = result
def setRawResult(v: Unit) { result = v }
def exec() = {
self.run()
true
}
def run() { invoke() }
}
}

View file

@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration
import akka.event.EventHandler
import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinTask, ForkJoinPool }
object ThreadPoolConfig {
type Bounds = Int
@ -51,18 +52,24 @@ object ThreadPoolConfig {
}
}
trait ExecutorServiceFactory {
def createExecutorService: ExecutorService
}
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(name: String): ExecutorServiceFactory
}
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService =
new LazyExecutorServiceWrapper(createExecutorService(threadFactory))
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
flowHandler match {
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue())
extends ExecutorServiceFactoryProvider {
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
val threadFactory = new MonitorableThreadFactory(name)
def createExecutorService: ExecutorService = flowHandler match {
case Left(rejectHandler)
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)

View file

@ -17,19 +17,22 @@ import akka.AkkaException
object Serialization {
case class NoSerializerFoundException(m: String) extends AkkaException(m)
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
serializerFor(o.getClass).fold((ex) Left(ex), (ser) Right(ser.toBinary(o)))
def serialize(o: AnyRef): Either[Exception, Array[Byte]] = serializerFor(o.getClass) match {
case Left(ex) Left(ex)
case Right(serializer) Right(serializer.toBinary(o))
}
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
serializerFor(clazz)
.fold((ex) Left(ex),
(ser) Right(ser.fromBinary(bytes, Some(clazz), classLoader)))
serializerFor(clazz) match {
case Left(ex) Left(ex)
case Right(serializer) Right(serializer.fromBinary(bytes, Some(clazz), classLoader))
}
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = {
Config.serializerMap.get(clazz.getName) match {
serializerMap.get(clazz.getName) match {
case Some(serializerName: String)
getClassFor(serializerName) match {
case Right(serializer) Right(serializer.newInstance.asInstanceOf[Serializer])
@ -43,34 +46,40 @@ object Serialization {
}
}
private def defaultSerializer = {
Config.serializers.get("default") match {
case Some(ser: String)
getClassFor(ser) match {
case Right(srializer) Some(srializer.newInstance.asInstanceOf[Serializer])
case Left(exception) None
}
case None None
}
private def defaultSerializer = serializers.get("default") match {
case Some(ser: String)
getClassFor(ser) match {
case Right(serializer) Some(serializer.newInstance.asInstanceOf[Serializer])
case Left(exception) None
}
case None None
}
private def getSerializerInstanceForBestMatchClass(
configMap: collection.mutable.Map[String, String],
cl: Class[_]) = {
configMap
.find {
case (clazzName, ser)
getClassFor(clazzName) match {
case Right(clazz) clazz.isAssignableFrom(cl)
case _ false
}
}
.map {
case (_, ser)
getClassFor(ser) match {
case Right(s) Right(s.newInstance.asInstanceOf[Serializer])
case _ Left(new Exception("Error instantiating " + ser))
}
}.getOrElse(Left(NoSerializerFoundException("No mapping serializer found for " + cl)))
private def getSerializerInstanceForBestMatchClass(cl: Class[_]) = bindings match {
case Some(mappings) mappings find {
case (clazzName, ser)
getClassFor(clazzName) match {
case Right(clazz) clazz.isAssignableFrom(cl)
case _ false
}
} map {
case (_, ser)
getClassFor(ser) match {
case Right(s) Right(s.newInstance.asInstanceOf[Serializer])
case _ Left(new Exception("Error instantiating " + ser))
}
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
case None Left(NoSerializerFoundException("No mapping serializer found for " + cl))
}
//TODO: Add type and docs
val serializers = config.getSection("akka.actor.serializers").map(_.map).getOrElse(Map("default" -> "akka.serialization.JavaSerializer"))
//TODO: Add type and docs
val bindings = config.getSection("akka.actor.serialization-bindings")
.map(_.map)
.map(m Map() ++ m.map { case (k, v: List[String]) Map() ++ v.map((_, k)) }.flatten)
//TODO: Add type and docs
val serializerMap = bindings.map(m m.map { case (k, v: String) (k, serializers(v)) }).getOrElse(Map())
}

View file

@ -65,6 +65,14 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
leave()
}
/**
* An await does a enter/leave making this barrier a 'single' barrier instead of a double barrier.
*/
def await() {
enter
leave()
}
def enter = {
zkClient.createEphemeral(entry)
if (zkClient.countChildren(barrier) >= count)

View file

@ -576,18 +576,25 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
}
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
import RemoteServerSettings._
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host, port)
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
private val bootstrap = new ServerBootstrap(factory)
private val executor = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit))
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, serverModule)
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, serverModule)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("backlog", RemoteServerSettings.BACKLOG)
bootstrap.setOption("child.tcpNoDelay", true)
@ -611,6 +618,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources()
executor.releaseExternalResources()
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception
@ -740,6 +748,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
@ -753,16 +762,9 @@ class RemoteServerPipelineFactory(
case "zlib" (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ (Nil, Nil)
}
val execution = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit))
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: authenticator ::: remoteServer :: Nil
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
}

View file

@ -0,0 +1,140 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.util.duration._
import akka.util.Duration
import System.{ currentTimeMillis now }
import java.io.File
trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
def testNodes: Int
override def beforeAll() = {
Cluster.startLocalCluster()
onReady()
ClusterTestNode.ready(getClass.getName)
}
def onReady() = {}
override def afterAll() = {
ClusterTestNode.waitForExits(getClass.getName, testNodes - 1)
ClusterTestNode.cleanUp(getClass.getName)
onShutdown()
Cluster.shutdownLocalCluster()
}
def onShutdown() = {}
}
trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
override def beforeAll() = {
ClusterTestNode.waitForReady(getClass.getName)
}
override def afterAll() = {
ClusterTestNode.exit(getClass.getName)
}
}
object ClusterTestNode {
val TestMarker = "MultiJvm"
val HomeDir = "_akka_cluster"
val TestDir = "multi-jvm"
val Sleep = 100.millis
val Timeout = 1.minute
def ready(className: String) = {
println("ClusterTest: READY")
readyFile(className).createNewFile()
}
def waitForReady(className: String) = {
if (!waitExists(readyFile(className))) {
cleanUp(className)
sys.error("Timeout waiting for cluster ready")
}
println("ClusterTest: GO")
}
def exit(className: String) = {
println("ClusterTest: EXIT")
exitFile(className).createNewFile()
}
def waitForExits(className: String, nodes: Int) = {
if (!waitCount(exitDir(className), nodes)) {
cleanUp(className)
sys.error("Timeout waiting for node exits")
}
println("ClusterTest: SHUTDOWN")
}
def cleanUp(className: String) = {
deleteRecursive(testDir(className))
}
def testName(name: String) = {
val i = name.indexOf(TestMarker)
if (i >= 0) name.substring(0, i) else name
}
def nodeName(name: String) = {
val i = name.indexOf(TestMarker)
if (i >= 0) name.substring(i + TestMarker.length) else name
}
def testDir(className: String) = {
val home = new File(HomeDir)
val tests = new File(home, TestDir)
val dir = new File(tests, testName(className))
dir.mkdirs()
dir
}
def readyFile(className: String) = {
new File(testDir(className), "ready")
}
def exitDir(className: String) = {
val dir = new File(testDir(className), "exit")
dir.mkdirs()
dir
}
def exitFile(className: String) = {
new File(exitDir(className), nodeName(className))
}
def waitExists(file: File) = waitFor(file.exists)
def waitCount(file: File, n: Int) = waitFor(file.list.size >= n)
def waitFor(test: Boolean, sleep: Duration = Sleep, timeout: Duration = Timeout): Boolean = {
val start = now
val limit = start + timeout.toMillis
var passed = test
var expired = false
while (!passed && !expired) {
if (now > limit) expired = true
else {
Thread.sleep(sleep.toMillis)
passed = test
}
}
passed
}
def deleteRecursive(file: File): Boolean = {
if (file.isDirectory) file.listFiles.foreach(deleteRecursive)
file.delete()
}
}

View file

@ -18,9 +18,11 @@ object NewLeaderChangeListenerMultiJvmSpec {
var NrOfNodes = 2
}
class NewLeaderChangeListenerMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class NewLeaderChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
import NewLeaderChangeListenerMultiJvmSpec._
val testNodes = NrOfNodes
"A NewLeader change listener" must {
"be invoked after leader election is completed" in {
@ -43,17 +45,9 @@ class NewLeaderChangeListenerMultiJvmNode1 extends WordSpec with MustMatchers wi
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class NewLeaderChangeListenerMultiJvmNode2 extends WordSpec with MustMatchers {
class NewLeaderChangeListenerMultiJvmNode2 extends ClusterTestNode {
import NewLeaderChangeListenerMultiJvmSpec._
"A NewLeader change listener" must {

View file

@ -18,9 +18,11 @@ object NodeConnectedChangeListenerMultiJvmSpec {
var NrOfNodes = 2
}
class NodeConnectedChangeListenerMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class NodeConnectedChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
import NodeConnectedChangeListenerMultiJvmSpec._
val testNodes = NrOfNodes
"A NodeConnected change listener" must {
"be invoked when a new node joins the cluster" in {
@ -42,17 +44,9 @@ class NodeConnectedChangeListenerMultiJvmNode1 extends WordSpec with MustMatcher
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class NodeConnectedChangeListenerMultiJvmNode2 extends WordSpec with MustMatchers {
class NodeConnectedChangeListenerMultiJvmNode2 extends ClusterTestNode {
import NodeConnectedChangeListenerMultiJvmSpec._
"A NodeConnected change listener" must {

View file

@ -18,9 +18,11 @@ object NodeDisconnectedChangeListenerMultiJvmSpec {
var NrOfNodes = 2
}
class NodeDisconnectedChangeListenerMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class NodeDisconnectedChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
import NodeDisconnectedChangeListenerMultiJvmSpec._
val testNodes = NrOfNodes
"A NodeDisconnected change listener" must {
"be invoked when a new node leaves the cluster" in {
@ -43,17 +45,9 @@ class NodeDisconnectedChangeListenerMultiJvmNode1 extends WordSpec with MustMatc
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class NodeDisconnectedChangeListenerMultiJvmNode2 extends WordSpec with MustMatchers {
class NodeDisconnectedChangeListenerMultiJvmNode2 extends ClusterTestNode {
import NodeDisconnectedChangeListenerMultiJvmSpec._
"A NodeDisconnected change listener" must {

View file

@ -15,9 +15,11 @@ object ConfigurationStorageMultiJvmSpec {
var NrOfNodes = 2
}
class ConfigurationStorageMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class ConfigurationStorageMultiJvmNode1 extends MasterClusterTestNode {
import ConfigurationStorageMultiJvmSpec._
val testNodes = NrOfNodes
"A cluster" must {
"be able to store, read and remove custom configuration data" in {
@ -50,17 +52,9 @@ class ConfigurationStorageMultiJvmNode1 extends WordSpec with MustMatchers with
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class ConfigurationStorageMultiJvmNode2 extends WordSpec with MustMatchers {
class ConfigurationStorageMultiJvmNode2 extends ClusterTestNode {
import ConfigurationStorageMultiJvmSpec._
"A cluster" must {

View file

@ -18,9 +18,11 @@ object LeaderElectionMultiJvmSpec {
var NrOfNodes = 2
}
/*
class LeaderElectionMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class LeaderElectionMultiJvmNode1 extends MasterClusterTestNode {
import LeaderElectionMultiJvmSpec._
val testNodes = NrOfNodes
"A cluster" must {
"be able to elect a single leader in the cluster and perform re-election if leader resigns" in {
@ -39,17 +41,9 @@ class LeaderElectionMultiJvmNode1 extends WordSpec with MustMatchers with Before
}
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class LeaderElectionMultiJvmNode2 extends WordSpec with MustMatchers {
class LeaderElectionMultiJvmNode2 extends ClusterTestNode {
import LeaderElectionMultiJvmSpec._
"A cluster" must {
@ -73,4 +67,4 @@ class LeaderElectionMultiJvmNode2 extends WordSpec with MustMatchers {
}
}
}
*/
*/

View file

@ -29,9 +29,11 @@ object MigrationExplicitMultiJvmSpec {
}
}
class MigrationExplicitMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class MigrationExplicitMultiJvmNode1 extends MasterClusterTestNode {
import MigrationExplicitMultiJvmSpec._
val testNodes = NrOfNodes
"A cluster" must {
"be able to migrate an actor from one node to another" in {
@ -65,17 +67,9 @@ class MigrationExplicitMultiJvmNode1 extends WordSpec with MustMatchers with Bef
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class MigrationExplicitMultiJvmNode2 extends WordSpec with MustMatchers {
class MigrationExplicitMultiJvmNode2 extends ClusterTestNode {
import MigrationExplicitMultiJvmSpec._
"A cluster" must {

View file

@ -40,9 +40,11 @@ object RegistryStoreMultiJvmSpec {
}
}
class RegistryStoreMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode {
import RegistryStoreMultiJvmSpec._
val testNodes = NrOfNodes
"A cluster" must {
"be able to store an ActorRef in the cluster without a replication strategy and retrieve it with 'use'" in {
@ -73,17 +75,9 @@ class RegistryStoreMultiJvmNode1 extends WordSpec with MustMatchers with BeforeA
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
}
}
class RegistryStoreMultiJvmNode2 extends WordSpec with MustMatchers {
class RegistryStoreMultiJvmNode2 extends ClusterTestNode {
import RegistryStoreMultiJvmSpec._
"A cluster" must {

View file

@ -17,9 +17,11 @@ object DeploymentMultiJvmSpec {
var NrOfNodes = 2
}
class DeploymentMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
class DeploymentMultiJvmNode1 extends MasterClusterTestNode {
import DeploymentMultiJvmSpec._
val testNodes = NrOfNodes
"A ClusterDeployer" must {
"be able to deploy deployments in akka.conf and lookup the deployments by 'address'" in {
@ -44,18 +46,9 @@ class DeploymentMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA
node.shutdown()
}
}
override def beforeAll() = {
startLocalCluster()
}
override def afterAll() = {
shutdownLocalCluster()
// ClusterDeployer.shutdown()
}
}
class DeploymentMultiJvmNode2 extends WordSpec with MustMatchers {
class DeploymentMultiJvmNode2 extends ClusterTestNode {
import DeploymentMultiJvmSpec._
"A cluster" must {

View file

@ -0,0 +1,4 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1,4 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -0,0 +1,4 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993

View file

@ -0,0 +1,4 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node4 -Dakka.cluster.port=9994

View file

@ -0,0 +1,149 @@
package akka.cluster.routing.roundrobin_failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
object RoundRobinFailoverMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
val testNodes = NrOfNodes
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
println("Getting reference to service-hello actor")
var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[SomeActor]("service-hello")
}
println("Successfully acquired reference")
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
// ============= the real testing =================
/*
val actor = Actor.actorOf[SomeActor]("service-hello")
val firstTimeResult = (actor ? "identify").get
val secondTimeResult = (actor ? "identify").get
//since there are only 2 nodes, the identity should not have changed.
assert(firstTimeResult == secondTimeResult)
//if we now terminate the node that
actor ! "shutdown"
//todo: do some waiting
println("Doing some sleep")
try {
Thread.sleep(4000) //nasty.. but ok for now.
println("Finished doing sleep")
} finally {
println("Ended the Thread.sleep method somehow..")
}
//now we should get a different node that responds to us since there was a failover.
val thirdTimeResult = (actor ? "identify").get
assert(!(firstTimeResult == thirdTimeResult)) */
// ==================================================
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
/*
class RoundRobinFailoverMultiJvmNode3 extends SlaveNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
class RoundRobinFailoverMultiJvmNode4 extends SlaveNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
} */

View file

@ -0,0 +1,6 @@
What does clustered home mean?
akka.actor.deployment.service-hello.clustered.home = "node:node1"
If a node fails, it should transparently be redeployed on a different node. So actors imho are homeless.. they run
wherever the grid deploys them.

View file

@ -0,0 +1,4 @@
akka.event-handler-level = "INFO"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1,4 @@
akka.event-handler-level = "INFO"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -0,0 +1,68 @@
package akka.cluster.routing.routing_identity_problem
import akka.config.Config
import akka.actor.{ ActorRef, Actor }
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
object RoutingIdentityProblemMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
}
}
}
}
class RoutingIdentityProblemMultiJvmNode1 extends MasterClusterTestNode {
import RoutingIdentityProblemMultiJvmSpec._
val testNodes = NrOfNodes
"foo" must {
"bla" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[SomeActor]("service-hello")
}
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class RoutingIdentityProblemMultiJvmNode2 extends ClusterTestNode {
import RoutingIdentityProblemMultiJvmSpec._
"foo" must {
"bla" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
val actor = Actor.actorOf[SomeActor]("service-hello")
val name: String = (actor ? "identify").get.asInstanceOf[String]
//todo: Jonas: this is the line that needs to be uncommented to get the test to fail.
//name must equal("node1")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -30,10 +30,6 @@ sealed abstract class DurableMailboxStorage(mailboxFQN: String) {
//TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc
def createFor(actor: ActorRef): AnyRef = {
EventHandler.debug(this, "Creating durable mailbox [%s] for [%s]".format(mailboxClass.getName, actor))
val ctor = mailboxClass.getDeclaredConstructor(constructorSignature: _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](actor): _*).asInstanceOf[AnyRef])
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](actor)) match {
case Right(instance) => instance
case Left(exception) =>

View file

@ -36,7 +36,6 @@ class AkkaBeansException(message: String, cause: Throwable) extends BeansExcepti
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with ApplicationContextAware {
import StringReflect._
import AkkaSpringConfigurationTags._
@BeanProperty
var id: String = ""
@ -242,7 +241,6 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with ApplicationConte
* @author michaelkober
*/
class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with ApplicationContextAware {
import StringReflect._
import AkkaSpringConfigurationTags._
@BeanProperty

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.spring
object StringReflect {
/**
* Implicit conversion from String to StringReflect.
*/
implicit def string2StringReflect(x: String) = new StringReflect(x)
}
/**
* Reflection helper class.
* @author michaelkober
*/
class StringReflect(val self: String) {
if ((self eq null) || self == "") throw new IllegalArgumentException("Class name can't be null or empty string [" + self + "]")
def toClass[T <: AnyRef]: Class[T] = {
val clazz = Class.forName(self)
clazz.asInstanceOf[Class[T]]
}
}

View file

@ -71,7 +71,6 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser
}
private def parseTrapExits(element: Element): Array[Class[_ <: Throwable]] = {
import StringReflect._
val trapExits = DomUtils.getChildElementsByTagName(element, TRAP_EXIT_TAG).toArray.toList.asInstanceOf[List[Element]]
trapExits.map(DomUtils.getTextValue(_).toClass.asInstanceOf[Class[_ <: Throwable]]).toArray
}

View file

@ -55,7 +55,6 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
* Create configuration for TypedActor
*/
private[akka] def createComponent(props: ActorProperties): SuperviseTypedActor = {
import StringReflect._
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) Temporary else Permanent
val isRemote = (props.host ne null) && (!props.host.isEmpty)
val withInterface = (props.interface ne null) && (!props.interface.isEmpty)
@ -80,7 +79,6 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
* Create configuration for UntypedActor
*/
private[akka] def createSupervise(props: ActorProperties): Server = {
import StringReflect._
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) Temporary else Permanent
val isRemote = (props.host ne null) && (!props.host.isEmpty)
val actorRef = Actor.actorOf(props.target.toClass)