added Java compat API for defining up ActiveObjects

This commit is contained in:
Jonas Boner 2009-02-16 19:50:50 +01:00
parent 0a31ad7188
commit d01a293fa7
10 changed files with 682 additions and 122 deletions

View file

@ -0,0 +1,115 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
import scala.actors.behavior._
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
import java.lang.annotation.Annotation
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActiveObject {
def newInstance[T](intf: Class[T] forSome {type T}, target: AnyRef, timeout: Int): T = {
val proxy = new ActiveObjectProxy(target, timeout)
supervise(proxy)
newInstance(intf, proxy)
}
def newInstance[T](intf: Class[T] forSome {type T}, proxy: ActiveObjectProxy): T = {
Proxy.newProxyInstance(
proxy.target.getClass.getClassLoader,
Array(intf),
proxy).asInstanceOf[T]
}
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(restartStrategy, components)
}
}
val supervisor = factory.newSupervisor
supervisor ! scala.actors.behavior.Start
supervisor
}
private def supervise(proxy: ActiveObjectProxy): Supervisor =
supervise(
RestartStrategy(OneForOne, 5, 1000),
Worker(
proxy.server,
LifeCycle(Permanent, 100))
:: Nil)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectProxy(val target: AnyRef, val timeout: Int) extends InvocationHandler {
private val oneway = classOf[scala.actors.annotation.oneway]
private[ActiveObjectProxy] object dispatcher extends GenericServer {
override def body: PartialFunction[Any, Unit] = {
case invocation: Invocation =>
try {
reply(ErrRef(invocation.invoke))
} catch {
case e: InvocationTargetException => reply(ErrRef({ throw e.getTargetException }))
case e => reply(ErrRef({ throw e }))
}
case 'exit => exit; reply()
case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
}
}
private[kernel] val server = new GenericServerContainer(target.getClass.getName, () => dispatcher)
server.setTimeout(timeout)
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = invoke(Invocation(m, args, target))
def invoke(invocation: Invocation): AnyRef = {
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
else {
val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("proxy invocation timed out after " + timeout + " milliseconds") }))
result()
}
}
}
/**
* Represents a snapshot of the current invocation.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef) {
def invoke: AnyRef = method.invoke(target, args: _*)
override def toString: String = "Invocation [method: " + method.getName + ", args: " + args + ", target: " + target + "]"
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, method)
result = HashCode.hash(result, args)
result = HashCode.hash(result, target)
result
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Invocation] &&
that.asInstanceOf[Invocation].method == method &&
that.asInstanceOf[Invocation].args == args
that.asInstanceOf[Invocation].target == target
}
}

View file

@ -0,0 +1,157 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.api
import com.scalablesolutions.akka.kernel.ActiveObject
import java.util.{List => JList}
import scala.actors.behavior._
import scala.reflect.BeanProperty
sealed abstract class Configuration
class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime)
}
abstract class Scope extends Configuration {
def transform: scala.actors.behavior.Scope
}
class Permanent extends Scope {
override def transform = scala.actors.behavior.Permanent
}
class Transient extends Scope {
override def transform = scala.actors.behavior.Transient
}
class Temporary extends Scope {
override def transform = scala.actors.behavior.Temporary
}
abstract class FailOverScheme extends Configuration {
def transform: scala.actors.behavior.FailOverScheme
}
class AllForOne extends FailOverScheme {
override def transform = scala.actors.behavior.AllForOne
}
class OneForOne extends FailOverScheme {
override def transform = scala.actors.behavior.OneForOne
}
abstract class Server extends Configuration
class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Component]].map(_.transform))
}
class Component(@BeanProperty val serverContainer: GenericServerContainer, @BeanProperty val lifeCycle: LifeCycle) extends Server {
def transform = scala.actors.behavior.Worker(serverContainer, lifeCycle.transform)
}
object Configuration {
def supervise(restartStrategy: RestartStrategy, components: JList[Component]): Supervisor =
ActiveObject.supervise(
restartStrategy.transform,
components.toArray.toList.asInstanceOf[List[Component]].map(
c => scala.actors.behavior.Worker(c.serverContainer, c.lifeCycle.transform)))
}
// static class SupervisorConfig extends Server {
// private final RestartStrategy restartStrategy;
// private final List<Server> servers;
// public SupervisorConfig(RestartStrategy restartStrategy, List<Server> servers) {
// this.restartStrategy = restartStrategy;
// this.servers = servers;
// }
// public RestartStrategy getRestartStrategy() {
// return restartStrategy;
// }
// public List<Server> getServer() {
// return servers;
// }
// public scala.actors.behavior.SupervisorConfig scalaVersion() {
// List<scala.actors.behavior.Server> ss = new ArrayList<scala.actors.behavior.Server>();
// for (Server s: servers) {
// ss.add(s.scalaVersion());
// }
// return new scala.actors.behavior.SupervisorConfig(restartStrategy.scalaVersion(), ss);
// }
// }
// static class Component extends Server {
// private final GenericServerContainer serverContainer;
// private final LifeCycle lifeCycle;
// public Component(GenericServerContainer serverContainer, LifeCycle lifeCycle) {
// this.serverContainer = serverContainer;
// this.lifeCycle = lifeCycle;
// }
// public GenericServerContainer getServerContainer() {
// return serverContainer;
// }
// public LifeCycle getLifeCycle() {
// return lifeCycle;
// }
// public scala.actors.behavior.Server scalaVersion() {
// return new scala.actors.behavior.Worker(serverContainer, lifeCycle.scalaVersion());
// }
// }
// static class RestartStrategy extends Configuration {
// private final FailOverScheme scheme;
// private final int maxNrOfRetries;
// private final int withinTimeRange;
// public RestartStrategy(FailOverScheme scheme, int maxNrOfRetries, int withinTimeRange) {
// this.scheme = scheme;
// this.maxNrOfRetries = maxNrOfRetries;
// this.withinTimeRange = withinTimeRange;
// }
// public FailOverScheme getFailOverScheme() {
// return scheme;
// }
// public int getMaxNrOfRetries() {
// return maxNrOfRetries;
// }
// public int getWithinTimeRange() {
// return withinTimeRange;
// }
// public scala.actors.behavior.RestartStrategy scalaVersion() {
// scala.actors.behavior.FailOverScheme fos;
// switch (scheme) {
// case AllForOne: fos = new scala.actors.behavior.AllForOne(); break;
// case OneForOne: fos = new scala.actors.behavior.OneForOne(); break;
// }
// return new scala.actors.behavior.RestartStrategy(fos, maxNrOfRetries, withinTimeRange);
// }
// }
// static class LifeCycle extends Configuration {
// private final Scope scope;
// private final int shutdownTime;
// public LifeCycle(Scope scope, int shutdownTime) {
// this.scope = scope;
// this.shutdownTime = shutdownTime;
// }
// public Scope getScope() {
// return scope;
// }
// public int getShutdownTime() {
// return shutdownTime;
// }
// public scala.actors.behavior.LifeCycle scalaVersion() {
// scala.actors.behavior.Scope s;
// switch (scope) {
// case Permanent: s = new scala.actors.behavior.Permanent(); break;
// case Transient: s = new scala.actors.behavior.Transient(); break;
// case Temporary: s = new scala.actors.behavior.Temporary(); break;
// }
// return new scala.actors.behavior.LifeCycle(s, shutdownTime);
// }
// }
// }

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
/**
* Reference that can hold either a typed value or an exception.
*
* Usage:
* <pre>
* scala> ErrRef(1)
* res0: ErrRef[Int] = ErrRef@a96606
*
* scala> res0()
* res1: Int = 1
*
* scala> res0() = 3
*
* scala> res0()
* res3: Int = 3
*
* scala> res0() = { println("Hello world"); 3}
* Hello world
*
* scala> res0()
* res5: Int = 3
*
* scala> res0() = error("Lets see what happens here...")
*
* scala> res0()
* java.lang.RuntimeException: Lets see what happens here...
* at ErrRef.apply(RefExcept.scala:11)
* at .<init>(<console>:6)
* at .<clinit>(<console>)
* at Re...
* </pre>
*/
class ErrRef[S](s: S){
private[this] var contents: Either[Throwable, S] = Right(s)
def update(value: => S) = contents = try { Right(value) } catch { case (e : Throwable) => Left(e) }
def apply() = contents match {
case Right(s) => s
case Left(e) => throw e.fillInStackTrace
}
override def toString(): String = "ErrRef[" + contents + "]"
}
object ErrRef {
def apply[S](s: S) = new ErrRef(s)
}

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
import java.lang.reflect.{Array => JArray}
import java.lang.{Float => JFloat, Double => JDouble}
/**
* Set of methods which allow easy implementation of <code>hashCode</code>.
*
* Example:
* <pre>
* override def hashCode: Int = {
* var result = HashCode.SEED
* //collect the contributions of various fields
* result = HashCode.hash(result, fPrimitive)
* result = HashCode.hash(result, fObject)
* result = HashCode.hash(result, fArray)
* result
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object HashCode {
val SEED = 23
def hash(seed: Int, value: Boolean): Int = firstTerm(seed) + (if (value) 1 else 0)
def hash(seed: Int, value: Char): Int = firstTerm(seed) + value.asInstanceOf[Int]
def hash(seed: Int, value: Int): Int = firstTerm(seed) + value
def hash(seed: Int, value: Long): Int = firstTerm(seed) + (value ^ (value >>> 32) ).asInstanceOf[Int]
def hash(seed: Int, value: Float): Int = hash(seed, JFloat.floatToIntBits(value))
def hash(seed: Int, value: Double): Int = hash(seed, JDouble.doubleToLongBits(value))
def hash(seed: Int, anyRef: AnyRef): Int = {
var result = seed
if (anyRef == null) result = hash(result, 0)
else if (!isArray(anyRef)) result = hash(result, anyRef.hashCode())
else for (id <- 0 until JArray.getLength(anyRef)) result = hash(result, JArray.get(anyRef, id)) // is an array
result
}
private def firstTerm(seed: Int): Int = PRIME * seed
private def isArray(anyRef: AnyRef): Boolean = anyRef.getClass.isArray
private val PRIME = 37
}

View file

@ -4,3 +4,5 @@
package com.scalablesolutions.akka.kernel
object Kernel {
}

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
import scala.actors.behavior._
import scala.actors.annotation.oneway
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
// class ActiveObjectSuite extends TestNGSuite {
// private var messageLog = ""
// trait Foo {
// def foo(msg: String): String
// @oneway def bar(msg: String)
// def longRunning
// def throwsException
// }
// class FooImpl extends Foo {
// val bar: Bar = new BarImpl
// def foo(msg: String): String = {
// messageLog += msg
// "return_foo "
// }
// def bar(msg: String) = bar.bar(msg)
// def longRunning = Thread.sleep(10000)
// def throwsException = error("expected")
// }
// trait Bar {
// @oneway def bar(msg: String)
// }
// class BarImpl extends Bar {
// def bar(msg: String) = {
// Thread.sleep(100)
// messageLog += msg
// }
// }
// @BeforeMethod
// def setup = messageLog = ""
// @Test { val groups=Array("unit") }
// def testCreateGenericServerBasedComponentUsingDefaultSupervisor = {
// val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
// val result = foo.foo("foo ")
// messageLog += result
// foo.bar("bar ")
// messageLog += "before_bar "
// Thread.sleep(500)
// assert(messageLog === "foo return_foo before_bar bar ")
// }
// @Test { val groups=Array("unit") }
// def testCreateGenericServerBasedComponentUsingCustomSupervisorConfiguration = {
// val proxy = new ActiveObjectProxy(new FooImpl, 1000)
// val supervisor =
// ActiveObject.supervise(
// RestartStrategy(AllForOne, 3, 100),
// Component(
// proxy,
// LifeCycle(Permanent, 100))
// :: Nil)
// val foo = ActiveObject.newInstance[Foo](classOf[Foo], proxy)
// val result = foo.foo("foo ")
// messageLog += result
// foo.bar("bar ")
// messageLog += "before_bar "
// Thread.sleep(500)
// assert(messageLog === "foo return_foo before_bar bar ")
// supervisor ! Stop
// }
// @Test { val groups=Array("unit") }
// def testCreateTwoGenericServerBasedComponentUsingCustomSupervisorConfiguration = {
// val fooProxy = new ActiveObjectProxy(new FooImpl, 1000)
// val barProxy = new ActiveObjectProxy(new BarImpl, 1000)
// val supervisor =
// ActiveObject.supervise(
// RestartStrategy(AllForOne, 3, 100),
// Component(
// fooProxy,
// LifeCycle(Permanent, 100)) ::
// Component(
// barProxy,
// LifeCycle(Permanent, 100))
// :: Nil)
// val foo = ActiveObject.newInstance[Foo](classOf[Foo], fooProxy)
// val bar = ActiveObject.newInstance[Bar](classOf[Bar], barProxy)
// val result = foo.foo("foo ")
// messageLog += result
// bar.bar("bar ")
// messageLog += "before_bar "
// Thread.sleep(500)
// assert(messageLog === "foo return_foo before_bar bar ")
// supervisor ! Stop
// }
// @Test { val groups=Array("unit") }
// def testCreateGenericServerBasedComponentUsingDefaultSupervisorAndForcedTimeout = {
// val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
// intercept(classOf[ActiveObjectInvocationTimeoutException]) {
// foo.longRunning
// }
// assert(true === true)
// }
// @Test { val groups=Array("unit") }
// def testCreateGenericServerBasedComponentUsingDefaultSupervisorAndForcedException = {
// val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 10000)
// intercept(classOf[RuntimeException]) {
// foo.throwsException
// }
// assert(true === true)
// }
// }