Merge branch 'master' into osgi

This commit is contained in:
Roman Roelofsen 2010-06-24 15:49:52 +02:00
commit c2f8d20ed2
43 changed files with 2200 additions and 1394 deletions

View file

@ -40,7 +40,13 @@
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0.RC3</artifactId>
<version>0.9</version>
<version>0.9.1</version>
<exclusions>
<exclusion>
<groupId>org.multiverse</groupId>
<artifactId>multiverse-alpha</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
@ -54,6 +60,12 @@
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.multiverse</groupId>
<artifactId>multiverse-alpha</artifactId>
<version>0.6-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View file

@ -0,0 +1,35 @@
package se.scalablesolutions.akka.api;
import static se.scalablesolutions.akka.actor.ActiveObject.link;
import static se.scalablesolutions.akka.actor.ActiveObject.newInstance;
import org.junit.Assert;
import org.junit.Test;
import se.scalablesolutions.akka.config.OneForOneStrategy;
import junit.framework.TestCase;
/**
* <p>Small misc tests that do not fit anywhere else and does not require a separate testcase</p>
*
* @author johanrask
*
*/
public class MiscActiveObjectTest extends TestCase {
/**
* Verifies that both preRestart and postRestart methods are invoked when
* an actor is restarted
*/
public void testFailingPostRestartInvocation() throws InterruptedException {
SimpleJavaPojo pojo = newInstance(SimpleJavaPojo.class,500);
SimpleJavaPojo supervisor = newInstance(SimpleJavaPojo.class,500);
link(supervisor,pojo,new OneForOneStrategy(3, 2000),new Class[]{Throwable.class});
pojo.throwException();
Thread.sleep(500);
Assert.assertTrue(pojo.pre);
Assert.assertTrue(pojo.post);
}
}

View file

@ -38,7 +38,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
@ -51,7 +51,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@ -59,10 +59,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setVectorState("init"); // set init state
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 10000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
@ -72,7 +72,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@ -80,10 +80,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setRefState("init"); // set init state
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 10000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");

View file

@ -0,0 +1,36 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
public class SimpleJavaPojo {
public boolean pre = false;
public boolean post = false;
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@prerestart
public void pre() {
System.out.println("** pre()");
pre = true;
}
@postrestart
public void post() {
System.out.println("** post()");
post = true;
}
public void throwException() {
throw new RuntimeException();
}
}

View file

@ -31,9 +31,9 @@ message SerializedActorRefProtocol {
required string uuid = 1;
required string id = 2;
required string actorClassname = 3;
required bytes actorInstance = 4;
required string serializerClassname = 5;
required AddressProtocol originalAddress = 6;
required AddressProtocol originalAddress = 4;
optional bytes actorInstance = 5;
optional string serializerClassname = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
optional LifeCycleProtocol lifeCycle = 9;

View file

@ -355,9 +355,10 @@ object ActiveObject extends Logging {
}
private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(target, false, false)
val proxy = Proxy.newInstance(target, true, false)
val context = injectActiveObjectContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
@ -368,8 +369,9 @@ object ActiveObject extends Logging {
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
val proxy = Proxy.newInstance(Array(intf), Array(target), true, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
@ -462,7 +464,7 @@ object ActiveObject extends Logging {
val parent = clazz.getSuperclass
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.warning(
log.trace(
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
None
@ -646,6 +648,7 @@ private[akka] sealed class ActiveObjectAspect {
object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()
var crashedActorTl:ThreadLocal[Dispatcher] = new ThreadLocal();
}
/**
@ -653,7 +656,7 @@ object Dispatcher {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Option[RestartCallbacks]) extends Actor {
private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor {
import Dispatcher._
private[actor] var target: Option[AnyRef] = None
@ -661,13 +664,18 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
private var context: Option[ActiveObjectContext] = None
private var targetClass:Class[_] = _
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
self.makeTransactionRequired
self.id = targetClass.getName
this.targetClass = targetClass
target = Some(targetInstance)
context = ctx
val methods = targetInstance.getClass.getDeclaredMethods.toList
@ -733,22 +741,43 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
override def preRestart(reason: Throwable) {
try {
// Since preRestart is called we know that this dispatcher
// is about to be restarted. Put the instance in a thread
// local so the new dispatcher can be initialized with the contents of the
// old.
//FIXME - This should be considered as a workaround.
crashedActorTl.set(this)
if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def postRestart(reason: Throwable) {
try {
if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
if (postRestart.isDefined) {
postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def init = {
// Get the crashed dispatcher from thread local and intitialize this actor with the
// contents of the old dispatcher
val oldActor = crashedActorTl.get();
if(oldActor != null) {
initialize(oldActor.targetClass,oldActor.target.get,oldActor.context)
crashedActorTl.set(null)
}
}
override def initTransactionalState = {
try {
try {
if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
} catch { case e: InvocationTargetException => throw e.getCause }
}
private def serializeArguments(joinPoint: JoinPoint) = {
val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
var unserializable = false

View file

@ -36,26 +36,50 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
}
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol.
* Base trait defining a serializable actor.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait SerializableActor extends Actor {
trait SerializableActor extends Actor
/**
* Base trait defining a stateless serializable actor.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait StatelessSerializableActor extends SerializableActor
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol. This actor <b>is</b> the serialized state.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait StatefulSerializerSerializableActor extends SerializableActor {
val serializer: Serializer
def toBinary: Array[Byte]
}
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol. This actor <b>is wrapping</b> serializable state.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait StatefulWrappedSerializableActor extends SerializableActor {
def toBinary: Array[Byte]
def fromBinary(bytes: Array[Byte])
}
/**
* Mix in this trait to create a serializable actor, serializable through
* Protobuf.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
val serializer = Serializer.Protobuf
trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor {
def toBinary: Array[Byte] = toProtobuf.toByteArray
def fromBinary(bytes: Array[Byte]) = fromProtobuf(serializer.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
val clazz: Class[T]
def toProtobuf: T
@ -68,7 +92,7 @@ trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaSerializableActor extends SerializableActor {
trait JavaSerializableActor extends StatefulSerializerSerializableActor {
@transient val serializer = Serializer.Java
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -79,7 +103,7 @@ trait JavaSerializableActor extends SerializableActor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaJSONSerializableActor extends SerializableActor {
trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor {
val serializer = Serializer.JavaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -90,7 +114,7 @@ trait JavaJSONSerializableActor extends SerializableActor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ScalaJSONSerializableActor extends SerializableActor {
trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor {
val serializer = Serializer.ScalaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}

View file

@ -37,7 +37,7 @@ import com.google.protobuf.ByteString
* <p/>
* Binary -> ActorRef:
* <pre>
* val actorRef = ActorRef.fromBinary(bytes)
* val actorRef = ActorRef.fromBinaryToRemoteActorRef(bytes)
* actorRef ! message // send message to remote actor through its reference
* </pre>
*
@ -74,7 +74,8 @@ object ActorRef {
/**
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
RemoteActorRef(
protocol.getUuid,
protocol.getActorClassname,
@ -82,6 +83,7 @@ object ActorRef {
protocol.getHomeAddress.getPort,
protocol.getTimeout,
loader)
}
/**
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
@ -99,11 +101,15 @@ object ActorRef {
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
*/
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
val serializerClass =
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
else Class.forName(protocol.getSerializerClassname)
val serializer = serializerClass.newInstance.asInstanceOf[Serializer]
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
val serializer = if (protocol.hasSerializerClassname) {
val serializerClass =
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
else Class.forName(protocol.getSerializerClassname)
Some(serializerClass.newInstance.asInstanceOf[Serializer])
} else None
val lifeCycle =
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
@ -120,8 +126,9 @@ object ActorRef {
if (protocol.hasSupervisor)
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
val hotswap =
if (protocol.hasHotswapStack) Some(serializer
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
.asInstanceOf[PartialFunction[Any, Unit]])
else None
@ -349,10 +356,12 @@ trait ActorRef extends TransactionManagement {
/**
* Returns the 'Serializer' instance for the Actor as an Option.
* <p/>
* It returns 'Some(serializer)' if the Actor is serializable and 'None' if not.
* It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor
* trait (which has a Serializer defined) and 'None' if not.
*/
def serializer: Option[Serializer] =
if (isSerializable) Some(actor.asInstanceOf[SerializableActor].serializer)
if (actor.isInstanceOf[StatefulSerializerSerializableActor])
Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer)
else None
/**
@ -710,15 +719,25 @@ sealed class LocalActorRef private[akka](
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__loader: ClassLoader,
__serializer: Serializer) = {
__serializer: Option[Serializer]) = {
this(() => {
val actorClass = __loader.loadClass(__actorClassName)
val actorInstance = actorClass.newInstance
if (actorInstance.isInstanceOf[ProtobufSerializableActor[_]]) {
val instance = actorInstance.asInstanceOf[ProtobufSerializableActor[_]]
if (actorInstance.isInstanceOf[StatelessSerializableActor]) {
actorInstance.asInstanceOf[Actor]
} else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) {
__serializer.getOrElse(throw new IllegalStateException(
"No serializer defined for SerializableActor [" + actorClass.getName + "]"))
.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
} else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) {
val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor]
instance.fromBinary(__actorBytes)
instance
} else __serializer.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
} else throw new IllegalStateException(
"Can't deserialize Actor that is not an instance of one of:\n" +
"\n\t- StatelessSerializableActor" +
"\n\t- StatefulSerializerSerializableActor" +
"\n\t- StatefulWrappedSerializableActor")
})
loader = Some(__loader)
isDeserialized = true
@ -777,7 +796,8 @@ sealed class LocalActorRef private[akka](
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
if (!isSerializable) throw new IllegalStateException(
"Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
"Can't serialize an ActorRef using SerializedActorRefProtocol" +
"\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
@ -798,39 +818,43 @@ sealed class LocalActorRef private[akka](
}
}
val serializerClassname = serializer
.getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined"))
.getClass.getName
val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build
val originalAddress = AddressProtocol.newBuilder
.setHostname(homeAddress.getHostName)
.setPort(homeAddress.getPort)
.build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(uuid)
.setId(id)
.setActorClassname(actorClass.getName)
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor].toBinary))
.setSerializerClassname(serializerClassname)
.setOriginalAddress(originalAddress)
.setIsTransactor(isTransactor)
.setTimeout(timeout)
if (actor.isInstanceOf[StatefulSerializerSerializableActor]) builder.setActorInstance(
ByteString.copyFrom(actor.asInstanceOf[StatefulSerializerSerializableActor].toBinary))
else if (actor.isInstanceOf[StatefulWrappedSerializableActor]) builder.setActorInstance(
ByteString.copyFrom(actor.asInstanceOf[StatefulWrappedSerializableActor].toBinary))
serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName))
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol))
supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol))
// FIXME: how to serialize the hotswap PartialFunction ??
// hotswap.foreach(builder.setHotswapStack(_))
//hotswap.foreach(builder.setHotswapStack(_))
builder.build
}
/**
* Returns the mailbox.
*/
protected[akka] def mailbox: Deque[MessageInvocation] = _mailbox
def mailbox: Deque[MessageInvocation] = _mailbox
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
def toBinary: Array[Byte] = {
if (isSerializable) toSerializedActorRefProtocol.toByteArray
else toRemoteActorRefProtocol.toByteArray
val protocol = if (isSerializable) toSerializedActorRefProtocol
else toRemoteActorRefProtocol
Actor.log.debug("Serializing ActorRef to binary:\n" + protocol)
protocol.toByteArray
}
/**

View file

@ -6,7 +6,10 @@ package se.scalablesolutions.akka.actor
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap}
import java.util.{Set=>JSet}
import se.scalablesolutions.akka.util.ListenerManagement
sealed trait ActorRegistryEvent
@ -27,8 +30,8 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
*/
object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, List[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]]
private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]]
/**
* Returns all actors in the system.
@ -73,16 +76,18 @@ object ActorRegistry extends ListenerManagement {
* Finds all actors of the exact type specified by the class passed in as the Class argument.
*/
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = {
if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName)
else Nil
if (actorsByClassName.containsKey(clazz.getName)) {
actorsByClassName.get(clazz.getName).toArray.toList.asInstanceOf[List[ActorRef]]
} else Nil
}
/**
* Finds all actors that has a specific id.
*/
def actorsFor(id: String): List[ActorRef] = {
if (actorsById.containsKey(id)) actorsById.get(id)
else Nil
if (actorsById.containsKey(id)) {
actorsById.get(id).toArray.toList.asInstanceOf[List[ActorRef]]
} else Nil
}
/**
@ -103,27 +108,38 @@ object ActorRegistry extends ListenerManagement {
// ID
val id = actor.id
if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id))
else actorsById.put(id, actor :: Nil)
if (actorsById.containsKey(id)) actorsById.get(id).add(actor)
else {
val set = new CopyOnWriteArraySet[ActorRef]
set.add(actor)
actorsById.put(id, set)
}
// Class name
val className = actor.actor.getClass.getName
if (actorsByClassName.containsKey(className)) {
actorsByClassName.put(className, actor :: actorsByClassName.get(className))
} else actorsByClassName.put(className, actor :: Nil)
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor)
else {
val set = new CopyOnWriteArraySet[ActorRef]
set.add(actor)
actorsByClassName.put(className, set)
}
// notify listeners
foreachListener(_ ! ActorRegistered(actor))
}
/**
* FIXME: WRONG - unregisters all actors with the same id and class name, should remove the right one in each list
* Unregisters an actor in the ActorRegistry.
*/
def unregister(actor: ActorRef) = {
actorsByUUID remove actor.uuid
actorsById remove actor.id
actorsByClassName remove actor.getClass.getName
val id = actor.id
if (actorsById.containsKey(id)) actorsById.get(id).remove(actor)
val className = actor.getClass.getName
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).remove(actor)
// notify listeners
foreachListener(_ ! ActorUnregistered(actor))
}

View file

@ -184,5 +184,3 @@ trait StmUtil {
}.execute()
}
}

View file

@ -1,6 +1,5 @@
package se.scalablesolutions.akka.patterns
import java.util.concurrent.atomic.AtomicInteger
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
@ -11,8 +10,11 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.{Before, After, Test}
import scala.collection.mutable.HashSet
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
@RunWith(classOf[JUnitRunner])
class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMatchers with Logging {

View file

@ -20,7 +20,6 @@ class SerializableActorSpec extends
describe("SerializableActor") {
it("should be able to serialize and deserialize a JavaSerializableActor") {
val actor1 = actorOf[JavaSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
val bytes = actor1.toBinary
@ -32,7 +31,6 @@ class SerializableActorSpec extends
it("should be able to serialize and deserialize a ProtobufSerializableActor") {
val actor1 = actorOf[ProtobufSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
(actor1 !! "hello").getOrElse("_") should equal("world 2")
@ -43,33 +41,16 @@ class SerializableActorSpec extends
(actor2 !! "hello").getOrElse("_") should equal("world 3")
}
/*
it("should be able to serialize and deserialize a JavaJSONSerializableActor") {
val actor1 = actorOf[JavaJSONSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
(actor1 !! "hello").getOrElse("_") should equal("world 2")
it("should be able to serialize and deserialize a StatelessSerializableActor") {
val actor1 = actorOf[StatelessSerializableTestActor].start
(actor1 !! "hello").getOrElse("_") should equal("world")
val bytes = actor1.toBinary
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 3")
(actor2 !! "hello").getOrElse("_") should equal("world")
}
it("should be able to serialize and deserialize a ScalaJSONSerializableActor") {
val actor1 = actorOf[ScalaJSONSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
val bytes = actor1.toBinary
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 2")
}
*/
}
}
@ -82,6 +63,13 @@ class SerializableActorSpec extends
}
}
class StatelessSerializableTestActor extends StatelessSerializableActor {
def receive = {
case "hello" =>
self.reply("world")
}
}
class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufProtocol.Counter] {
val clazz = classOf[ProtobufProtocol.Counter]
private var count = 0
@ -95,21 +83,3 @@ class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufPr
self.reply("world " + count)
}
}
class JavaJSONSerializableTestActor extends JavaJSONSerializableActor {
private var count = 0
def receive = {
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}
@scala.reflect.BeanInfo class ScalaJSONSerializableTestActor extends ScalaJSONSerializableActor {
private var count = 0
def receive = {
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}

View file

@ -104,26 +104,21 @@ class StmSpec extends
describe("Transactor") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
import GlobalTransactionVectorTestActor._
try {
val actor = actorOf[NestedTransactorLevelOneActor].start
actor !! Add(2)
val size1 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size1 should equal(2)
actor !! Add(7)
actor ! "HiLevelOne"
val size2 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size2 should equal(7)
actor !! Add(0)
actor ! "HiLevelTwo"
val size3 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size3 should equal(0)
actor !! Add(3)
val size4 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size4 should equal(3)
} catch {
case e =>
fail(e.toString)
}
val actor = actorOf[NestedTransactorLevelOneActor].start
actor !! Add(2)
val size1 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size1 should equal(2)
actor !! Add(7)
actor ! "HiLevelOne"
val size2 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size2 should equal(7)
actor !! Add(0)
actor ! "HiLevelTwo"
val size3 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size3 should equal(0)
actor !! Add(3)
val size4 = (actor !! Size).as[Int].getOrElse(fail("Could not get size"))
size4 should equal(3)
}
}
/*

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.comet
import se.scalablesolutions.akka.util.Logging
import java.util.{List => JList}
import javax.servlet.ServletConfig
import javax.servlet.{ServletConfig,ServletContext}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.sun.jersey.spi.container.servlet.ServletContainer
@ -43,14 +43,32 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
class AkkaServlet extends AtmosphereServlet with Logging {
import se.scalablesolutions.akka.config.Config.{config => c}
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
lazy val servlet = createRestServlet
protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet {
val servlet = new AtmosphereRestServlet {
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
}
override def getInitParameter(key : String) = Option(super.getInitParameter(key)).getOrElse(initParams.get(key))
override def getInitParameterNames() = {
val names = new java.util.Vector[String]()
val i = initParams.keySet.iterator
while(i.hasNext) names.add(i.next.toString)
val e = super.getInitParameterNames
while(e.hasMoreElements) names.add(e.nextElement.toString)
names.elements
}
/**
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
* Instead we specify what semantics we want in code.

View file

@ -37,10 +37,6 @@ import javax.annotation.security.{DenyAll, PermitAll, RolesAllowed}
import java.security.Principal
import java.util.concurrent.TimeUnit
import net.liftweb.util.{SecurityHelpers, StringHelpers, IoHelpers}
object Enc extends SecurityHelpers with StringHelpers with IoHelpers
case object OK
/**
@ -249,7 +245,7 @@ trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] {
* rest-part of the akka config
*/
trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging {
import Enc._
import LiftUtils._
private object InvalidateNonces
@ -483,3 +479,87 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] w
}
}
/*
* Copyright 2006-2010 WorldWide Conferencing, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
object LiftUtils {
import java.security.{MessageDigest,SecureRandom}
val random = new SecureRandom()
def md5(in: Array[Byte]): Array[Byte] = (MessageDigest.getInstance("MD5")).digest(in)
/**
* Create a random string of a given size
* @param size size of the string to create. Must be a positive or nul integer
* @return the generated string
*/
def randomString(size: Int): String = {
def addChar(pos: Int, lastRand: Int, sb: StringBuilder): StringBuilder = {
if (pos >= size) sb
else {
val randNum = if ((pos % 6) == 0) random.nextInt else lastRand
sb.append((randNum & 0x1f) match {
case n if n < 26 => ('A' + n).toChar
case n => ('0' + (n - 26)).toChar
})
addChar(pos + 1, randNum >> 5, sb)
}
}
addChar(0, 0, new StringBuilder(size)).toString
}
/** encode a Byte array as hexadecimal characters */
def hexEncode(in: Array[Byte]): String = {
val sb = new StringBuilder
val len = in.length
def addDigit(in: Array[Byte], pos: Int, len: Int, sb: StringBuilder) {
if (pos < len) {
val b: Int = in(pos)
val msb = (b & 0xf0) >> 4
val lsb = (b & 0x0f)
sb.append((if (msb < 10) ('0' + msb).asInstanceOf[Char] else ('a' + (msb - 10)).asInstanceOf[Char]))
sb.append((if (lsb < 10) ('0' + lsb).asInstanceOf[Char] else ('a' + (lsb - 10)).asInstanceOf[Char]))
addDigit(in, pos + 1, len, sb)
}
}
addDigit(in, 0, len, sb)
sb.toString
}
/**
* Splits a string of the form &lt;name1=value1, name2=value2, ... &gt; and unquotes the quoted values.
* The result is a Map[String, String]
*/
def splitNameValuePairs(props: String): Map[String, String] = {
/**
* If str is surrounded by quotes it return the content between the quotes
*/
def unquote(str: String) = {
if ((str ne null) && str.length >= 2 && str.charAt(0) == '\"' && str.charAt(str.length - 1) == '\"')
str.substring(1, str.length - 1)
else
str
}
val list = props.split(",").toList.map(in => {
val pair = in match { case null => Nil case s => s.split("=").toList.map(_.trim).filter(_.length > 0) }
(pair(0), unquote(pair(1)))
})
val map: Map[String, String] = Map.empty
(map /: list)((m, next) => m + (next))
}
}

View file

@ -61,12 +61,6 @@ trait EmbeddedAppServer extends Bootable with Logging {
"org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
adapter.addInitParameter("com.sun.jersey.config.property.packages",
config.getList("akka.rest.resource_packages").mkString(";")
)
adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(",")
)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath)

View file

@ -1,11 +1,24 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka.xsd">
http://scalablesolutions.se/akka/akka-0.10.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="routeBuilder" class="sample.camel.StandaloneSpringApplicationRoute" />
<camel:camelContext id="camelContext">
<camel:routeBuilder ref="routeBuilder" />
</camel:camelContext>
<akka:camel-service>
<akka:camel-context ref="camelContext" />
</akka:camel-service>
<akka:active-object id="pojo3" target="sample.camel.BeanImpl" timeout="1000" />

View file

@ -41,7 +41,7 @@ class Boot {
// Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
// -----------------------------------------------------------------------
// Routing example
// Tranformer example
// -----------------------------------------------------------------------
val producer = actorOf[Producer1]

View file

@ -59,24 +59,16 @@ class StandaloneApplicationRoute extends RouteBuilder {
object StandaloneSpringApplication {
def main(args: Array[String]) {
import CamelContextManager.context
import CamelContextManager._
// use Spring application context as active object registry
val springctx = new ClassPathXmlApplicationContext("/context-standalone.xml")
val registry = new ApplicationContextRegistry(springctx)
// customize CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new StandaloneSpringApplicationRoute)
// start CamelService
val camelService = CamelService.newInstance.load
// load Spring application context
val appctx = new ClassPathXmlApplicationContext("/context-standalone.xml")
// access 'externally' registered active objects with active-object component
assert("hello msg3" == context.createProducerTemplate.requestBody("direct:test3", "msg3"))
assert("hello msg3" == template.requestBody("direct:test3", "msg3"))
// shutdown CamelService
camelService.unload
// destroy Spring application context
appctx.close
// shutdown all (internally) created actors
ActorRegistry.shutdownAll

View file

@ -1,6 +1,6 @@
project.name=Akka Plugin
project.name=Akka SBT Plugin
project.organization=se.scalablesolutions.akka
# mirrors akka version
project.version=0.9.1
project.version=0.10
sbt.version=0.7.4
build.scala.versions=2.7.7

View file

@ -1,3 +1,7 @@
import sbt._
class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info)
class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info) {
override def managedStyle = ManagedStyle.Maven
val publishTo = "Scala Tools Nexus" at "http://nexus.scala-tools.org/content/repositories/releases/"
Credentials(Path.userHome / ".ivy2" / ".scala-tools-credentials", log)
}

View file

@ -1,11 +1,12 @@
import sbt._
object AkkaRepositories {
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
}
trait AkkaBaseProject extends BasicScalaProject {
@ -14,7 +15,8 @@ trait AkkaBaseProject extends BasicScalaProject {
// Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases)
// is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
// for development version resolve to .ivy2/local
// val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
@ -34,11 +36,12 @@ trait AkkaBaseProject extends BasicScalaProject {
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) // only while snapshot version
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
}
trait AkkaProject extends AkkaBaseProject {
val akkaVersion = "0.9.1"
val akkaVersion = "0.10"
// convenience method
def akkaModule(module: String) = "se.scalablesolutions.akka" %% ("akka-" + module) % akkaVersion

View file

@ -17,6 +17,11 @@
</properties>
<repositories>
<repository>
<id>akka</id>
<name>Akka Repo</name>
<url>http://www.scalablesolutions.se/akka/repository/</url>
</repository>
<repository>
<id>project.embedded.module</id>
<name>Project Embedded Repository</name>
@ -146,23 +151,23 @@
<!-- akka -->
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0.Beta1</artifactId>
<version>0.9</version>
<artifactId>akka-core_2.8.0.RC3</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-util_2.8.0.Beta1</artifactId>
<version>0.9</version>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-util-java_2.8.0.Beta1</artifactId>
<version>0.9</version>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-spring_2.8.0.Beta1</artifactId>
<version>0.9</version>
<artifactId>akka-spring_2.8.0.RC3</artifactId>
<version>0.9.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>

View file

@ -1 +1 @@
http\://www.akkasource.org/schema/akka=se/scalablesolutions/akka/spring/akka.xsd
http\://scalablesolutions.se/akka/akka-0.10.xsd=se/scalablesolutions/akka/spring/akka-0.10.xsd

View file

@ -0,0 +1,255 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.akkasource.org/schema/akka"
targetNamespace="http://www.akkasource.org/schema/akka"
elementFormDefault="qualified" attributeFormDefault="unqualified"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:beans="http://www.springframework.org/schema/beans">
<xsd:import namespace="http://www.springframework.org/schema/beans"
schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" />
<!-- base types -->
<!-- restart strategies enumeration -->
<xsd:simpleType name="failover-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="AllForOne"/>
<xsd:enumeration value="OneForOne"/>
</xsd:restriction>
</xsd:simpleType>
<!-- restart strategies enumeration -->
<xsd:simpleType name="lifecycle-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="permanent"/>
<xsd:enumeration value="temporary"/>
</xsd:restriction>
</xsd:simpleType>
<!-- Scopes enumeration -->
<xsd:simpleType name="scope-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="prototype"/>
<xsd:enumeration value="singleton"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatchers enumeration -->
<xsd:simpleType name="dispatcher-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="executor-based-event-driven"/>
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
<xsd:enumeration value="thread-based"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher queue types enumeration -->
<xsd:simpleType name="dispatcher-queue-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="bounded-linked-blocking-queue"/>
<xsd:enumeration value="unbounded-linked-blocking-queue"/>
<xsd:enumeration value="synchronous-queue"/>
<xsd:enumeration value="bounded-array-blocking-queue"/>
</xsd:restriction>
</xsd:simpleType>
<!-- thread pool rejection policies enumeration -->
<xsd:simpleType name="rejection-policy-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="abort-policy"/>
<xsd:enumeration value="caller-runs-policy"/>
<xsd:enumeration value="discard-oldest-policy"/>
<xsd:enumeration value="discard-policy"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher type -->
<xsd:complexType name="dispatcher-type">
<xsd:choice minOccurs="0" maxOccurs="1">
<xsd:element name="thread-pool" type="threadpool-type"/>
</xsd:choice>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="ref" type="xsd:string"/>
<xsd:attribute name="type" type="dispatcher-enum-type"/>
<xsd:attribute name="name" type="xsd:string"/>
</xsd:complexType>
<xsd:complexType name="threadpool-type">
<xsd:attribute name="queue" type="dispatcher-queue-type"/>
<xsd:attribute name="bound" type="xsd:integer"/>
<xsd:attribute name="capacity" type="xsd:integer"/>
<xsd:attribute name="fairness" type="xsd:boolean"/>
<xsd:attribute name="core-pool-size" type="xsd:integer"/>
<xsd:attribute name="max-pool-size" type="xsd:integer"/>
<xsd:attribute name="keep-alive" type="xsd:long"/>
<xsd:attribute name="rejection-policy" type="rejection-policy-type"/>
</xsd:complexType>
<!-- Remote -->
<xsd:complexType name="remote-type">
<xsd:attribute name="host" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" type="xsd:integer" use="required">
<xsd:annotation>
<xsd:documentation>
Port of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- callbacks -->
<xsd:complexType name="restart-callbacks-type">
<xsd:attribute name="pre" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Pre restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="post" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Post restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- active object -->
<xsd:complexType name="active-object-type">
<xsd:sequence>
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="restart-callbacks" type="restart-callbacks-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="target" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:long" use="required">
<xsd:annotation>
<xsd:documentation>
default timeout for '!!' invocations
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transactional" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
Set to true if messages should have REQUIRES_NEW semantics
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="interface" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Interface implemented by target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lifecycle" type="lifecycle-type">
<xsd:annotation>
<xsd:documentation>
Lifecycle, permanent or temporary
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scope" type="scope-enum-type">
<xsd:annotation>
<xsd:documentation>
Supported scopes are singleton and prototype
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- trap exits -->
<xsd:complexType name="trap-exits-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="trap-exit" type="xsd:string"/>
</xsd:choice>
</xsd:complexType>
<!-- active objects -->
<xsd:complexType name="active-objects-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="active-object" type="active-object-type"/>
</xsd:choice>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="strategy-type">
<xsd:sequence>
<xsd:element name="trap-exits" type="trap-exits-type" minOccurs="1" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="failover" type="failover-type">
<xsd:annotation>
<xsd:documentation>
Failover scheme, AllForOne or OneForOne
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="retries" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Maximal number of retries.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timerange" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Timerange for restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="supervision-type">
<xsd:all>
<xsd:element name="restart-strategy" type="strategy-type" minOccurs="1" maxOccurs="1"/>
<xsd:element name="active-objects" type="active-objects-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="supervision" type="supervision-type" minOccurs="0"/>
</xsd:all>
<xsd:attribute name="id" type="xsd:ID"/>
</xsd:complexType>
<xsd:complexType name="camel-service-type">
<xsd:sequence>
<xsd:element name="camel-context" type="camel-context-type" minOccurs="0" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID"/>
</xsd:complexType>
<xsd:complexType name="camel-context-type">
<xsd:attribute name="ref" type="xsd:string"/>
</xsd:complexType>
<!-- ActiveObject -->
<xsd:element name="active-object" type="active-object-type"/>
<!-- Dispatcher -->
<xsd:element name="dispatcher" type="dispatcher-type"/>
<!-- Supervision -->
<xsd:element name="supervision" type="supervision-type"/>
<!-- CamelService -->
<xsd:element name="camel-service" type="camel-service-type"/>
</xsd:schema>

View file

@ -25,6 +25,13 @@
</xsd:restriction>
</xsd:simpleType>
<!-- Scopes enumeration -->
<xsd:simpleType name="scope-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="prototype"/>
<xsd:enumeration value="singleton"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatchers enumeration -->
<xsd:simpleType name="dispatcher-enum-type">
@ -158,6 +165,13 @@
Lifecycle, permanent or temporary
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scope" type="scope-enum-type">
<xsd:annotation>
<xsd:documentation>
Supported scopes are singleton and prototype
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>

View file

@ -4,19 +4,31 @@
package se.scalablesolutions.akka.spring
import java.beans.PropertyDescriptor
import java.lang.reflect.Method
import org.springframework.beans.BeanWrapperImpl
import org.springframework.beans.BeanWrapper
import org.springframework.beans.BeanUtils
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
import se.scalablesolutions.akka.actor.ActiveObject
import reflect.BeanProperty
import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.Logging
/**
* Factory bean for active objects.
*
* @author michaelkober
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] {
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
import StringReflect._
import AkkaSpringConfigurationTags._
@BeanProperty var target: String = ""
@BeanProperty var timeout: Long = _
@ -28,22 +40,61 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] {
@BeanProperty var port: Int = _
@BeanProperty var lifecycle: String = ""
@BeanProperty var dispatcher: DispatcherProperties = _
@BeanProperty var scope:String = VAL_SCOPE_SINGLETON
@BeanProperty var property:PropertyEntries = _
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
*/
def getObjectType: Class[AnyRef] = target.toClass
def getObjectType: Class[AnyRef] = try {
target.toClass
} catch {
// required by contract to return null
case e: ClassNotFoundException => null
}
/*
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
*/
def createInstance: AnyRef = {
if(scope.equals(VAL_SCOPE_SINGLETON)) {
setSingleton(true)
} else {
setSingleton(false)
}
var argumentList = ""
if (isRemote) argumentList += "r"
if (hasInterface) argumentList += "i"
if (hasDispatcher) argumentList += "d"
create(argumentList)
setProperties(
create(argumentList))
}
/**
* This method manages <property/> element by injecting either
* values (<property value="value"/>) and bean references (<property ref="beanId"/>)
*/
private def setProperties(ref:AnyRef) : AnyRef = {
log.debug("Processing properties and dependencies for target class %s",target)
val beanWrapper = new BeanWrapperImpl(ref);
for(entry <- property.entryList) {
val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name)
val method = propertyDescriptor.getWriteMethod();
if(StringUtils.hasText(entry.ref)) {
log.debug("Setting property %s with bean ref %s using method %s",
entry.name,entry.ref,method.getName)
method.invoke(ref,getBeanFactory().getBean(entry.ref))
} else if(StringUtils.hasText(entry.value)) {
log.debug("Setting property %s with value %s using method %s",
entry.name,entry.value,method.getName)
beanWrapper.setPropertyValue(entry.name,entry.value)
} else {
throw new AkkaBeansException("Either property@ref or property@value must be set on property element")
}
}
ref
}
// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here)
@ -63,15 +114,15 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] {
if (argList == "r") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
} else if (argList == "ri" ) {
ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, host, port, callbacks)
} else if (argList == "rd") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
} else if (argList == "rid") {
ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, host, port, callbacks)
} else if (argList == "i") {
ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, callbacks)
} else if (argList == "id") {
ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, callbacks)
} else if (argList == "d") {
ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
} else {
@ -79,6 +130,10 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] {
}
}
def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
clazz.newInstance().asInstanceOf[T]
}
/**
* create Option[RestartCallback]
*/

View file

@ -5,10 +5,12 @@ package se.scalablesolutions.akka.spring
import org.springframework.util.xml.DomUtils
import org.w3c.dom.Element
import scala.collection.JavaConversions._
/**
* Parser trait for custom namespace configuration for active-object.
* @author michaelkober
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
trait ActiveObjectParser extends BeanParser with DispatcherParser {
import AkkaSpringConfigurationTags._
@ -23,6 +25,7 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
if (remoteElement != null) {
objectProperties.host = mandatory(remoteElement, HOST)
@ -42,6 +45,14 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
objectProperties.dispatcher = dispatcherProperties
}
for(element <- propertyEntries) {
val entry = new PropertyEntry()
entry.name = element.getAttribute("name");
entry.value = element.getAttribute("value")
entry.ref = element.getAttribute("ref")
objectProperties.propertyEntries.add(entry)
}
try {
objectProperties.timeout = mandatory(element, TIMEOUT).toLong
} catch {
@ -58,8 +69,13 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
}
if (!element.getAttribute(LIFECYCLE).isEmpty) {
objectProperties.lifecyclye = element.getAttribute(LIFECYCLE)
objectProperties.lifecycle = element.getAttribute(LIFECYCLE)
}
if (!element.getAttribute(SCOPE).isEmpty) {
objectProperties.scope = element.getAttribute(SCOPE)
}
objectProperties
}

View file

@ -20,8 +20,10 @@ class ActiveObjectProperties {
var postRestart: String = ""
var host: String = ""
var port: Int = _
var lifecyclye: String = ""
var lifecycle: String = ""
var scope:String = ""
var dispatcher: DispatcherProperties = _
var propertyEntries = new PropertyEntries()
/**
@ -37,8 +39,10 @@ class ActiveObjectProperties {
builder.addPropertyValue(TARGET, target)
builder.addPropertyValue(INTERFACE, interface)
builder.addPropertyValue(TRANSACTIONAL, transactional)
builder.addPropertyValue(LIFECYCLE, lifecyclye)
builder.addPropertyValue(LIFECYCLE, lifecycle)
builder.addPropertyValue(SCOPE, scope)
builder.addPropertyValue(DISPATCHER_TAG, dispatcher)
}
builder.addPropertyValue(PROPERTYENTRY_TAG,propertyEntries)
}
}

View file

@ -0,0 +1,14 @@
package se.scalablesolutions.akka.spring
import org.springframework.beans.BeansException
/**
* Exception to use when something goes wrong during bean creation
@author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
class AkkaBeansException(errorMsg:String,t:Throwable) extends BeansException(errorMsg,t) {
def this(errorMsg:String) = {
this(errorMsg,null)
}
}

View file

@ -15,5 +15,6 @@ class AkkaNamespaceHandler extends NamespaceHandlerSupport {
registerBeanDefinitionParser(ACTIVE_OBJECT_TAG, new ActiveObjectBeanDefinitionParser());
registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser());
registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser());
registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser);
}
}

View file

@ -15,6 +15,8 @@ object AkkaSpringConfigurationTags {
val ACTIVE_OBJECT_TAG = "active-object"
val SUPERVISION_TAG = "supervision"
val DISPATCHER_TAG = "dispatcher"
val PROPERTYENTRY_TAG = "property"
val CAMEL_SERVICE_TAG = "camel-service"
// active-object sub tags
val RESTART_CALLBACKS_TAG = "restart-callbacks"
@ -29,6 +31,9 @@ object AkkaSpringConfigurationTags {
// dispatcher sub tags
val THREAD_POOL_TAG = "thread-pool"
// camel-service sub tags
val CAMEL_CONTEXT_TAG = "camel-context"
// --- ATTRIBUTES
//
// active object attributes
@ -41,6 +46,7 @@ object AkkaSpringConfigurationTags {
val PRE_RESTART = "pre"
val POST_RESTART = "post"
val LIFECYCLE = "lifecycle"
val SCOPE = "scope"
// supervision attributes
val FAILOVER = "failover"
@ -68,6 +74,9 @@ object AkkaSpringConfigurationTags {
val VAL_LIFECYCYLE_TEMPORARY = "temporary"
val VAL_LIFECYCYLE_PERMANENT = "permanent"
val VAL_SCOPE_SINGLETON = "singleton"
val VAL_SCOPE_PROTOTYPE = "prototype"
// Failover
val VAL_ALL_FOR_ONE = "AllForOne"
val VAL_ONE_FOR_ONE = "OneForOne"

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.{ParserContext, AbstractSingleBeanDefinitionParser}
import org.springframework.util.xml.DomUtils
import org.w3c.dom.Element
import se.scalablesolutions.akka.spring.AkkaSpringConfigurationTags._
/**
* Parser for &lt;camel-service&gt; elements.
*
* @author Martin Krasser
*/
class CamelServiceBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
/**
* Parses the &lt;camel-service&gt; element. If a nested &lt;camel-context&gt; element
* is defined then the referenced context is set on the {@link CamelServiceFactoryBean}.
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val camelContextElement = DomUtils.getChildElementByTagName(element, CAMEL_CONTEXT_TAG);
if (camelContextElement ne null) {
val camelContextReference = camelContextElement.getAttribute("ref")
builder.addPropertyReference("camelContext", camelContextReference)
}
}
/**
* Returns the class of {@link CamelServiceFactoryBean}
*/
override def getBeanClass(element: Element): Class[_] = classOf[CamelServiceFactoryBean]
/**
* Returns <code>true</code>.
*/
override def shouldGenerateIdAsFallback = true
}

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.apache.camel.CamelContext
import org.springframework.beans.factory.{DisposableBean, InitializingBean, FactoryBean}
import se.scalablesolutions.akka.camel.{CamelContextManager, CamelService}
/**
* Factory bean for a {@link CamelService}.
*
* @author Martin Krasser
*/
class CamelServiceFactoryBean extends FactoryBean[CamelService] with InitializingBean with DisposableBean {
@scala.reflect.BeanProperty var camelContext: CamelContext = _
var instance: CamelService = _
def isSingleton = true
def getObjectType = classOf[CamelService]
def getObject = instance
/**
* Initializes the {@link CamelContextManager} with <code>camelService</code> if defined, then
* creates and starts the {@link CamelService} singleton.
*/
def afterPropertiesSet = {
if (camelContext ne null) {
CamelContextManager.init(camelContext)
}
instance = CamelService.newInstance
instance.load
}
/**
* Stops the {@link CamelService} singleton.
*/
def destroy = {
instance.unload
}
}

View file

@ -0,0 +1,18 @@
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import scala.collection.mutable._
/**
* Simple container for Properties
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
class PropertyEntries {
var entryList:ListBuffer[PropertyEntry] = ListBuffer[PropertyEntry]()
def add(entry:PropertyEntry) = {
entryList.append(entry)
}
}

View file

@ -0,0 +1,17 @@
package se.scalablesolutions.akka.spring
/**
* Represents a property element
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
class PropertyEntry {
var name:String = _
var value:String = null
var ref:String = null
override def toString(): String = {
format("name = %s,value = %s, ref = %s", name,value,ref)
}
}

View file

@ -40,7 +40,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[ActiveObjectConfigurato
*/
private[akka] def createComponent(props: ActiveObjectProperties): Component = {
import StringReflect._
val lifeCycle = if (!props.lifecyclye.isEmpty && props.lifecyclye.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
val isRemote = (props.host != null) && (!props.host.isEmpty)
val withInterface = (props.interface != null) && (!props.interface.isEmpty)
if (isRemote) {

View file

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<akka:active-object id="bean"
target="org.springframework.core.io.ResourceEditor"
transactional="true"
timeout="1000"
scope="prototype">
<property name="source" ref="string"/>
</akka:active-object>
<bean id="string" class="java.lang.String">
<constructor-arg value="someString"/>
</bean>
</beans>

View file

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.akkasource.org/schema/akka
classpath:se/scalablesolutions/akka/spring/akka-0.10.xsd">
<akka:active-object id="bean"
target="org.springframework.core.io.ResourceEditor"
transactional="true"
timeout="1000"
scope="prototype">
<property name="source" ref="nonExistentRef"/>
</akka:active-object>
<bean id="string" class="java.lang.String">
<constructor-arg value="someString"/>
</bean>
</beans>

View file

@ -25,13 +25,18 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
val xml = <akka:active-object id="active-object1"
target="foo.bar.MyPojo"
timeout="1000"
transactional="true"/>
transactional="true"
scope="prototype">
<property name="someProp" value="someValue" ref="someRef"/>
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.timeout == 1000)
assert(props.target == "foo.bar.MyPojo")
assert(props.timeout === 1000)
assert(props.target === "foo.bar.MyPojo")
assert(props.transactional)
assert(props.scope === "prototype")
assert(props.propertyEntries.entryList.size === 1)
}
it("should throw IllegalArgumentException on missing mandatory attributes") {
@ -50,7 +55,7 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcher.dispatcherType == "thread-based")
}
}
it("should parse remote ActiveObjects configuration") {
val xml = <akka:active-object id="remote active-object" target="se.scalablesolutions.akka.spring.foo.MyPojo"

View file

@ -7,6 +7,8 @@ import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import org.springframework.core.io.ResourceEditor
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Test for ActiveObjectFactoryBean
@ -45,5 +47,28 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
bean.setTarget("java.lang.String")
assert(bean.getObjectType == classOf[String])
}
it("should create a proxy of type ResourceEditor") {
val bean = new ActiveObjectFactoryBean()
// we must have a java class here
bean.setTarget("org.springframework.core.io.ResourceEditor")
val entries = new PropertyEntries()
val entry = new PropertyEntry()
entry.name = "source"
entry.value = "sourceBeanIsAString"
entries.add(entry)
bean.setProperty(entries)
assert(bean.getObjectType == classOf[ResourceEditor])
// Check that we have injected the depencency correctly
val target:ResourceEditor = bean.createInstance.asInstanceOf[ResourceEditor]
assert(target.getSource === entry.value)
}
it("should create an application context and inject a string dependency") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
assert(target.getSource === "someString")
}
}
}

View file

@ -79,7 +79,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_core, akka_camel)
lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_core)
lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_))
lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_core)
lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_core, akka_camel)
lazy val akka_jta = project("akka-jta", "akka-jta", new AkkaJTAProject(_), akka_core)
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
akka_core, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp)
@ -229,8 +229,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile"
val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile"
val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
val lift_common = "net.liftweb" % "lift-common" % LIFT_VERSION % "compile"
val lift_util = "net.liftweb" % "lift-util" % LIFT_VERSION % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"