completed Guice API for ActiveObjects
This commit is contained in:
parent
42cc47bf6a
commit
a2e3406064
8 changed files with 551 additions and 251 deletions
|
|
@ -1,115 +1,113 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package com.scalablesolutions.akka.kernel
|
||||
|
||||
import scala.actors.behavior._
|
||||
|
||||
import java.util.{List => JList, ArrayList}
|
||||
|
||||
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
|
||||
import java.lang.annotation.Annotation
|
||||
|
||||
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
|
||||
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ActiveObject {
|
||||
|
||||
def newInstance[T](intf: Class[T] forSome {type T}, target: AnyRef, timeout: Int): T = {
|
||||
val proxy = new ActiveObjectProxy(target, timeout)
|
||||
supervise(proxy)
|
||||
newInstance(intf, proxy)
|
||||
}
|
||||
|
||||
def newInstance[T](intf: Class[T] forSome {type T}, proxy: ActiveObjectProxy): T = {
|
||||
Proxy.newProxyInstance(
|
||||
proxy.target.getClass.getClassLoader,
|
||||
Array(intf),
|
||||
proxy).asInstanceOf[T]
|
||||
}
|
||||
|
||||
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
|
||||
object factory extends SupervisorFactory {
|
||||
override def getSupervisorConfig: SupervisorConfig = {
|
||||
SupervisorConfig(restartStrategy, components)
|
||||
}
|
||||
}
|
||||
val supervisor = factory.newSupervisor
|
||||
supervisor ! scala.actors.behavior.Start
|
||||
supervisor
|
||||
}
|
||||
|
||||
private def supervise(proxy: ActiveObjectProxy): Supervisor =
|
||||
supervise(
|
||||
RestartStrategy(OneForOne, 5, 1000),
|
||||
Worker(
|
||||
proxy.server,
|
||||
LifeCycle(Permanent, 100))
|
||||
:: Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ActiveObjectProxy(val target: AnyRef, val timeout: Int) extends InvocationHandler {
|
||||
private val oneway = classOf[scala.actors.annotation.oneway]
|
||||
|
||||
private[ActiveObjectProxy] object dispatcher extends GenericServer {
|
||||
override def body: PartialFunction[Any, Unit] = {
|
||||
case invocation: Invocation =>
|
||||
try {
|
||||
reply(ErrRef(invocation.invoke))
|
||||
} catch {
|
||||
case e: InvocationTargetException => reply(ErrRef({ throw e.getTargetException }))
|
||||
case e => reply(ErrRef({ throw e }))
|
||||
}
|
||||
case 'exit => exit; reply()
|
||||
case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] val server = new GenericServerContainer(target.getClass.getName, () => dispatcher)
|
||||
server.setTimeout(timeout)
|
||||
|
||||
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = invoke(Invocation(m, args, target))
|
||||
|
||||
def invoke(invocation: Invocation): AnyRef = {
|
||||
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
|
||||
else {
|
||||
val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("proxy invocation timed out after " + timeout + " milliseconds") }))
|
||||
result()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a snapshot of the current invocation.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef) {
|
||||
def invoke: AnyRef = method.invoke(target, args: _*)
|
||||
|
||||
override def toString: String = "Invocation [method: " + method.getName + ", args: " + args + ", target: " + target + "]"
|
||||
|
||||
override def hashCode(): Int = {
|
||||
var result = HashCode.SEED
|
||||
result = HashCode.hash(result, method)
|
||||
result = HashCode.hash(result, args)
|
||||
result = HashCode.hash(result, target)
|
||||
result
|
||||
}
|
||||
|
||||
override def equals(that: Any): Boolean = {
|
||||
that != null &&
|
||||
that.isInstanceOf[Invocation] &&
|
||||
that.asInstanceOf[Invocation].method == method &&
|
||||
that.asInstanceOf[Invocation].args == args
|
||||
that.asInstanceOf[Invocation].target == target
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package com.scalablesolutions.akka.kernel
|
||||
|
||||
import scala.actors.behavior._
|
||||
|
||||
import java.util.{List => JList, ArrayList}
|
||||
|
||||
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
|
||||
import java.lang.annotation.Annotation
|
||||
|
||||
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
|
||||
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ActiveObjectFactory {
|
||||
def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = ActiveObject.newInstance(intf, proxy)
|
||||
|
||||
def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor =
|
||||
ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]])
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ActiveObject {
|
||||
def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = {
|
||||
Proxy.newProxyInstance(
|
||||
intf.getClassLoader,
|
||||
Array(intf),
|
||||
proxy).asInstanceOf[T]
|
||||
}
|
||||
|
||||
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
|
||||
object factory extends SupervisorFactory {
|
||||
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
|
||||
}
|
||||
val supervisor = factory.newSupervisor
|
||||
supervisor ! scala.actors.behavior.Start
|
||||
supervisor
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
|
||||
private val oneway = classOf[scala.actors.annotation.oneway]
|
||||
private var targetInstance: AnyRef = _
|
||||
private[akka] def setTargetInstance(instance: AnyRef) = targetInstance = instance
|
||||
|
||||
private[ActiveObjectProxy] object dispatcher extends GenericServer {
|
||||
override def body: PartialFunction[Any, Unit] = {
|
||||
case invocation: Invocation =>
|
||||
try {
|
||||
reply(ErrRef(invocation.invoke))
|
||||
} catch {
|
||||
case e: InvocationTargetException => reply(ErrRef({ throw e.getTargetException }))
|
||||
case e => reply(ErrRef({ throw e }))
|
||||
}
|
||||
case 'exit => exit; reply()
|
||||
case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] val server = new GenericServerContainer(target.getName, () => dispatcher)
|
||||
server.setTimeout(timeout)
|
||||
|
||||
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = invoke(Invocation(m, args, targetInstance))
|
||||
|
||||
def invoke(invocation: Invocation): AnyRef = {
|
||||
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
|
||||
else {
|
||||
val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("proxy invocation timed out after " + timeout + " milliseconds") }))
|
||||
result()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a snapshot of the current invocation.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef) {
|
||||
method.setAccessible(true);
|
||||
|
||||
def invoke: AnyRef = method.invoke(target, args: _*)
|
||||
|
||||
override def toString: String = "Invocation[method: " + method.getName + ", args: " + args + ", target: " + target + "]"
|
||||
|
||||
override def hashCode(): Int = {
|
||||
var result = HashCode.SEED
|
||||
result = HashCode.hash(result, method)
|
||||
result = HashCode.hash(result, args)
|
||||
result = HashCode.hash(result, target)
|
||||
result
|
||||
}
|
||||
|
||||
override def equals(that: Any): Boolean = {
|
||||
that != null &&
|
||||
that.isInstanceOf[Invocation] &&
|
||||
that.asInstanceOf[Invocation].method == method &&
|
||||
that.asInstanceOf[Invocation].args == args
|
||||
that.asInstanceOf[Invocation].target == target
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,91 +1,105 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package com.scalablesolutions.akka.api
|
||||
|
||||
import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy}
|
||||
|
||||
import java.util.{List => JList}
|
||||
|
||||
import scala.actors.behavior._
|
||||
import scala.reflect.BeanProperty
|
||||
|
||||
sealed abstract class Configuration
|
||||
|
||||
class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
|
||||
def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
|
||||
}
|
||||
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
|
||||
def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime)
|
||||
}
|
||||
|
||||
abstract class Scope extends Configuration {
|
||||
def transform: scala.actors.behavior.Scope
|
||||
}
|
||||
class Permanent extends Scope {
|
||||
override def transform = scala.actors.behavior.Permanent
|
||||
}
|
||||
class Transient extends Scope {
|
||||
override def transform = scala.actors.behavior.Transient
|
||||
}
|
||||
class Temporary extends Scope {
|
||||
override def transform = scala.actors.behavior.Temporary
|
||||
}
|
||||
|
||||
abstract class FailOverScheme extends Configuration {
|
||||
def transform: scala.actors.behavior.FailOverScheme
|
||||
}
|
||||
class AllForOne extends FailOverScheme {
|
||||
override def transform = scala.actors.behavior.AllForOne
|
||||
}
|
||||
class OneForOne extends FailOverScheme {
|
||||
override def transform = scala.actors.behavior.OneForOne
|
||||
}
|
||||
|
||||
abstract class Server extends Configuration
|
||||
class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
|
||||
def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Component]].map(_.transform))
|
||||
}
|
||||
class Component(@BeanProperty val proxy: ActiveObjectProxy, @BeanProperty val lifeCycle: LifeCycle) extends Server {
|
||||
def transform = scala.actors.behavior.Worker(proxy.server, lifeCycle.transform)
|
||||
}
|
||||
|
||||
|
||||
object Configuration {
|
||||
import com.google.inject.{AbstractModule, CreationException, Guice, Injector, Provides, Singleton}
|
||||
import com.google.inject.spi.CloseFailedException
|
||||
import com.google.inject.jsr250.{ResourceProviderFactory}
|
||||
import javax.annotation.Resource
|
||||
import javax.naming.Context
|
||||
|
||||
def supervise(restartStrategy: RestartStrategy, components: JList[Component]): Supervisor = {
|
||||
val componentList = components.toArray.toList.asInstanceOf[List[Component]]
|
||||
|
||||
val injector = Guice.createInjector(new AbstractModule {
|
||||
protected def configure = {
|
||||
bind(classOf[ResourceProviderFactory[_]])
|
||||
componentList.foreach(c => bind(c.getClass).in(classOf[Singleton]))
|
||||
}
|
||||
|
||||
// @Provides
|
||||
// def createJndiContext: Context = {
|
||||
// val answer = new JndiContext
|
||||
// answer.bind("foo", new AnotherBean("Foo"))
|
||||
// answer.bind("xyz", new AnotherBean("XYZ"))
|
||||
// answer
|
||||
// }
|
||||
})
|
||||
|
||||
val injectedComponents = componentList.map(c => injector.getInstance(c.getClass))
|
||||
|
||||
// TODO: swap 'target' in proxy before running supervise
|
||||
|
||||
ActiveObject.supervise(
|
||||
restartStrategy.transform,
|
||||
componentList.map(c => scala.actors.behavior.Worker(c.proxy.server, c.lifeCycle.transform)))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package com.scalablesolutions.akka.api
|
||||
|
||||
import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy}
|
||||
import google.inject.{AbstractModule}
|
||||
|
||||
import java.util.{List => JList, ArrayList}
|
||||
|
||||
import scala.actors.behavior._
|
||||
import scala.reflect.BeanProperty
|
||||
|
||||
// ============================================
|
||||
// Java version of the configuration API
|
||||
|
||||
sealed abstract class Configuration
|
||||
|
||||
class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
|
||||
def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
|
||||
}
|
||||
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
|
||||
def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime)
|
||||
}
|
||||
|
||||
abstract class Scope extends Configuration {
|
||||
def transform: scala.actors.behavior.Scope
|
||||
}
|
||||
class Permanent extends Scope {
|
||||
override def transform = scala.actors.behavior.Permanent
|
||||
}
|
||||
class Transient extends Scope {
|
||||
override def transform = scala.actors.behavior.Transient
|
||||
}
|
||||
class Temporary extends Scope {
|
||||
override def transform = scala.actors.behavior.Temporary
|
||||
}
|
||||
|
||||
abstract class FailOverScheme extends Configuration {
|
||||
def transform: scala.actors.behavior.FailOverScheme
|
||||
}
|
||||
class AllForOne extends FailOverScheme {
|
||||
override def transform = scala.actors.behavior.AllForOne
|
||||
}
|
||||
class OneForOne extends FailOverScheme {
|
||||
override def transform = scala.actors.behavior.OneForOne
|
||||
}
|
||||
|
||||
abstract class Server extends Configuration
|
||||
//class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
|
||||
// def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
|
||||
//}
|
||||
class Component(@BeanProperty val intf: Class[_],
|
||||
@BeanProperty val target: Class[_],
|
||||
@BeanProperty val lifeCycle: LifeCycle,
|
||||
@BeanProperty val timeout: Int) extends Server {
|
||||
def newWorker(proxy: ActiveObjectProxy) = scala.actors.behavior.Worker(proxy.server, lifeCycle.transform)
|
||||
}
|
||||
|
||||
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
//object Configuration {
|
||||
// import com.google.inject.{Module, AbstractModule, CreationException, Guice, Injector, Provides, Singleton, Binder}
|
||||
// import com.google.inject.jsr250.{ResourceProviderFactory}
|
||||
//
|
||||
// private val modules = new ArrayList[Module]
|
||||
//
|
||||
// def addModule(module: Module) = modules.add(module)
|
||||
//
|
||||
// def supervise(restartStrategy: RestartStrategy, components: Array[Component]): Supervisor = {
|
||||
// val componentList = components.toList.asInstanceOf[List[Component]]
|
||||
//
|
||||
// object defaultModule extends AbstractModule {
|
||||
// protected def configure {
|
||||
// bind(classOf[ResourceProviderFactory[_]])
|
||||
// //componentList.foreach(c => bind(c.proxy.intf.asInstanceOf[Class[_]]).to(c.proxy.target.getClass.asInstanceOf[Class[_]]).in(classOf[Singleton]))
|
||||
// }
|
||||
//
|
||||
// // @Provides
|
||||
// // def createJndiContext: Context = {
|
||||
// // val answer = new JndiContext
|
||||
// // answer.bind("foo", new AnotherBean("Foo"))
|
||||
// // answer.bind("xyz", new AnotherBean("XYZ"))
|
||||
// // answer
|
||||
// // }
|
||||
// }
|
||||
// modules.add(defaultModule)
|
||||
// val injector = Guice.createInjector(modules)
|
||||
//
|
||||
// // swap 'target' in proxy before running supervise
|
||||
// // componentList.foreach(c => c.proxy.target = injector.getInstance(c.proxy.targetClass))
|
||||
//
|
||||
// ActiveObject.supervise(
|
||||
// restartStrategy.transform,
|
||||
// componentList.map(c => scala.actors.behavior.Worker(c.proxy.server, c.lifeCycle.transform)))
|
||||
//
|
||||
// }
|
||||
//}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue