Small misc tests that do not fit anywhere else and does not require a separate testcase
+ * + * @author johanrask + * + */ +public class MiscActiveObjectTest extends TestCase { + + + /** + * Verifies that both preRestart and postRestart methods are invoked when + * an actor is restarted + */ + public void testFailingPostRestartInvocation() throws InterruptedException { + SimpleJavaPojo pojo = newInstance(SimpleJavaPojo.class,500); + SimpleJavaPojo supervisor = newInstance(SimpleJavaPojo.class,500); + link(supervisor,pojo,new OneForOneStrategy(3, 2000),new Class[]{Throwable.class}); + pojo.throwException(); + Thread.sleep(500); + Assert.assertTrue(pojo.pre); + Assert.assertTrue(pojo.post); + } + +} diff --git a/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java index d0c22470e2..d4b4fd7687 100644 --- a/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java +++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java @@ -38,7 +38,7 @@ public class RemoteInMemoryStateTest extends TestCase { } public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { - InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999); stateful.init(); stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class); @@ -51,7 +51,7 @@ public class RemoteInMemoryStateTest extends TestCase { } public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999); stateful.init(); stateful.setVectorState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired @@ -59,10 +59,10 @@ public class RemoteInMemoryStateTest extends TestCase { } public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { - InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999); stateful.init(); stateful.setVectorState("init"); // set init state - InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class); + InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 10000, "localhost", 9999); //conf.getInstance(InMemFailer.class); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method fail("should have thrown an exception"); @@ -72,7 +72,7 @@ public class RemoteInMemoryStateTest extends TestCase { } public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999); stateful.init(); stateful.setRefState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired @@ -80,10 +80,10 @@ public class RemoteInMemoryStateTest extends TestCase { } public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { - InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); + InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999); stateful.init(); stateful.setRefState("init"); // set init state - InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class); + InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 10000, "localhost", 9999); //conf.getInstance(InMemFailer.class); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method fail("should have thrown an exception"); diff --git a/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/SimpleJavaPojo.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/SimpleJavaPojo.java new file mode 100644 index 0000000000..c783fd2902 --- /dev/null +++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/SimpleJavaPojo.java @@ -0,0 +1,36 @@ +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.actor.annotation.prerestart; +import se.scalablesolutions.akka.actor.annotation.postrestart; + +public class SimpleJavaPojo { + + public boolean pre = false; + public boolean post = false; + + private String name; + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @prerestart + public void pre() { + System.out.println("** pre()"); + pre = true; + } + + @postrestart + public void post() { + System.out.println("** post()"); + post = true; + } + + public void throwException() { + throw new RuntimeException(); + } +} diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index 31234fe4a8..7ba65646e3 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -148,1044 +148,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(enum_scope:LifeCycleType) } - public static final class LifeCycleProtocol extends - com.google.protobuf.GeneratedMessage { - // Use LifeCycleProtocol.newBuilder() to construct. - private LifeCycleProtocol() { - initFields(); - } - private LifeCycleProtocol(boolean noInit) {} - - private static final LifeCycleProtocol defaultInstance; - public static LifeCycleProtocol getDefaultInstance() { - return defaultInstance; - } - - public LifeCycleProtocol getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_LifeCycleProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_LifeCycleProtocol_fieldAccessorTable; - } - - // required .LifeCycleType lifeCycle = 1; - public static final int LIFECYCLE_FIELD_NUMBER = 1; - private boolean hasLifeCycle; - private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleType lifeCycle_; - public boolean hasLifeCycle() { return hasLifeCycle; } - public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleType getLifeCycle() { return lifeCycle_; } - - // optional string preRestart = 2; - public static final int PRERESTART_FIELD_NUMBER = 2; - private boolean hasPreRestart; - private java.lang.String preRestart_ = ""; - public boolean hasPreRestart() { return hasPreRestart; } - public java.lang.String getPreRestart() { return preRestart_; } - - // optional string postRestart = 3; - public static final int POSTRESTART_FIELD_NUMBER = 3; - private boolean hasPostRestart; - private java.lang.String postRestart_ = ""; - public boolean hasPostRestart() { return hasPostRestart; } - public java.lang.String getPostRestart() { return postRestart_; } - - private void initFields() { - lifeCycle_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleType.PERMANENT; - } - public final boolean isInitialized() { - if (!hasLifeCycle) return false; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (hasLifeCycle()) { - output.writeEnum(1, getLifeCycle().getNumber()); - } - if (hasPreRestart()) { - output.writeString(2, getPreRestart()); - } - if (hasPostRestart()) { - output.writeString(3, getPostRestart()); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (hasLifeCycle()) { - size += com.google.protobuf.CodedOutputStream - .computeEnumSize(1, getLifeCycle().getNumber()); - } - if (hasPreRestart()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(2, getPreRestart()); - } - if (hasPostRestart()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(3, getPostRestart()); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder- * val actorRef = ActorRef.fromBinary(bytes) + * val actorRef = ActorRef.fromBinaryToRemoteActorRef(bytes) * actorRef ! message // send message to remote actor through its reference ** @@ -74,7 +74,8 @@ object ActorRef { /** * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ - private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = + private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { + Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol) RemoteActorRef( protocol.getUuid, protocol.getActorClassname, @@ -82,6 +83,7 @@ object ActorRef { protocol.getHomeAddress.getPort, protocol.getTimeout, loader) + } /** * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. @@ -99,11 +101,15 @@ object ActorRef { * Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance. */ private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - val serializerClass = - if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) - else Class.forName(protocol.getSerializerClassname) - val serializer = serializerClass.newInstance.asInstanceOf[Serializer] - + Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) + + val serializer = if (protocol.hasSerializerClassname) { + val serializerClass = + if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) + else Class.forName(protocol.getSerializerClassname) + Some(serializerClass.newInstance.asInstanceOf[Serializer]) + } else None + val lifeCycle = if (protocol.hasLifeCycle) { val lifeCycleProtocol = protocol.getLifeCycle @@ -120,8 +126,9 @@ object ActorRef { if (protocol.hasSupervisor) Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) else None + val hotswap = - if (protocol.hasHotswapStack) Some(serializer + if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) .asInstanceOf[PartialFunction[Any, Unit]]) else None @@ -349,10 +356,12 @@ trait ActorRef extends TransactionManagement { /** * Returns the 'Serializer' instance for the Actor as an Option. * - * It returns 'Some(serializer)' if the Actor is serializable and 'None' if not. + * It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor + * trait (which has a Serializer defined) and 'None' if not. */ def serializer: Option[Serializer] = - if (isSerializable) Some(actor.asInstanceOf[SerializableActor].serializer) + if (actor.isInstanceOf[StatefulSerializerSerializableActor]) + Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer) else None /** @@ -710,15 +719,25 @@ sealed class LocalActorRef private[akka]( __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], __loader: ClassLoader, - __serializer: Serializer) = { + __serializer: Option[Serializer]) = { this(() => { val actorClass = __loader.loadClass(__actorClassName) val actorInstance = actorClass.newInstance - if (actorInstance.isInstanceOf[ProtobufSerializableActor[_]]) { - val instance = actorInstance.asInstanceOf[ProtobufSerializableActor[_]] + if (actorInstance.isInstanceOf[StatelessSerializableActor]) { + actorInstance.asInstanceOf[Actor] + } else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) { + __serializer.getOrElse(throw new IllegalStateException( + "No serializer defined for SerializableActor [" + actorClass.getName + "]")) + .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] + } else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) { + val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor] instance.fromBinary(__actorBytes) instance - } else __serializer.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] + } else throw new IllegalStateException( + "Can't deserialize Actor that is not an instance of one of:\n" + + "\n\t- StatelessSerializableActor" + + "\n\t- StatefulSerializerSerializableActor" + + "\n\t- StatefulWrappedSerializableActor") }) loader = Some(__loader) isDeserialized = true @@ -777,7 +796,8 @@ sealed class LocalActorRef private[akka]( protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard { if (!isSerializable) throw new IllegalStateException( - "Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") + "Can't serialize an ActorRef using SerializedActorRefProtocol" + + "\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { @@ -798,39 +818,43 @@ sealed class LocalActorRef private[akka]( } } - val serializerClassname = serializer - .getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined")) - .getClass.getName - val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build + val originalAddress = AddressProtocol.newBuilder + .setHostname(homeAddress.getHostName) + .setPort(homeAddress.getPort) + .build val builder = SerializedActorRefProtocol.newBuilder .setUuid(uuid) .setId(id) .setActorClassname(actorClass.getName) - .setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor].toBinary)) - .setSerializerClassname(serializerClassname) .setOriginalAddress(originalAddress) .setIsTransactor(isTransactor) .setTimeout(timeout) - + if (actor.isInstanceOf[StatefulSerializerSerializableActor]) builder.setActorInstance( + ByteString.copyFrom(actor.asInstanceOf[StatefulSerializerSerializableActor].toBinary)) + else if (actor.isInstanceOf[StatefulWrappedSerializableActor]) builder.setActorInstance( + ByteString.copyFrom(actor.asInstanceOf[StatefulWrappedSerializableActor].toBinary)) + serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName)) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) - supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol)) + supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol)) // FIXME: how to serialize the hotswap PartialFunction ?? - // hotswap.foreach(builder.setHotswapStack(_)) + //hotswap.foreach(builder.setHotswapStack(_)) builder.build } /** * Returns the mailbox. */ - protected[akka] def mailbox: Deque[MessageInvocation] = _mailbox + def mailbox: Deque[MessageInvocation] = _mailbox /** * Serializes the ActorRef instance into a byte array (Array[Byte]). */ def toBinary: Array[Byte] = { - if (isSerializable) toSerializedActorRefProtocol.toByteArray - else toRemoteActorRefProtocol.toByteArray + val protocol = if (isSerializable) toSerializedActorRefProtocol + else toRemoteActorRefProtocol + Actor.log.debug("Serializing ActorRef to binary:\n" + protocol) + protocol.toByteArray } /** diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index b9827fdb9c..efe1f49b7b 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -6,7 +6,10 @@ package se.scalablesolutions.akka.actor import scala.collection.mutable.ListBuffer import scala.reflect.Manifest -import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} + +import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap} +import java.util.{Set=>JSet} + import se.scalablesolutions.akka.util.ListenerManagement sealed trait ActorRegistryEvent @@ -27,8 +30,8 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent */ object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] - private val actorsById = new ConcurrentHashMap[String, List[ActorRef]] - private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]] + private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] + private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]] /** * Returns all actors in the system. @@ -73,16 +76,18 @@ object ActorRegistry extends ListenerManagement { * Finds all actors of the exact type specified by the class passed in as the Class argument. */ def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = { - if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName) - else Nil + if (actorsByClassName.containsKey(clazz.getName)) { + actorsByClassName.get(clazz.getName).toArray.toList.asInstanceOf[List[ActorRef]] + } else Nil } /** * Finds all actors that has a specific id. */ def actorsFor(id: String): List[ActorRef] = { - if (actorsById.containsKey(id)) actorsById.get(id) - else Nil + if (actorsById.containsKey(id)) { + actorsById.get(id).toArray.toList.asInstanceOf[List[ActorRef]] + } else Nil } /** @@ -103,27 +108,38 @@ object ActorRegistry extends ListenerManagement { // ID val id = actor.id if (id eq null) throw new IllegalStateException("Actor.id is null " + actor) - if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id)) - else actorsById.put(id, actor :: Nil) + if (actorsById.containsKey(id)) actorsById.get(id).add(actor) + else { + val set = new CopyOnWriteArraySet[ActorRef] + set.add(actor) + actorsById.put(id, set) + } // Class name val className = actor.actor.getClass.getName - if (actorsByClassName.containsKey(className)) { - actorsByClassName.put(className, actor :: actorsByClassName.get(className)) - } else actorsByClassName.put(className, actor :: Nil) + if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor) + else { + val set = new CopyOnWriteArraySet[ActorRef] + set.add(actor) + actorsByClassName.put(className, set) + } // notify listeners foreachListener(_ ! ActorRegistered(actor)) } /** - * FIXME: WRONG - unregisters all actors with the same id and class name, should remove the right one in each list * Unregisters an actor in the ActorRegistry. */ def unregister(actor: ActorRef) = { actorsByUUID remove actor.uuid - actorsById remove actor.id - actorsByClassName remove actor.getClass.getName + + val id = actor.id + if (actorsById.containsKey(id)) actorsById.get(id).remove(actor) + + val className = actor.getClass.getName + if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).remove(actor) + // notify listeners foreachListener(_ ! ActorUnregistered(actor)) } diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 5bc69c037d..e6485ff761 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -184,5 +184,3 @@ trait StmUtil { }.execute() } } - - diff --git a/akka-core/src/test/scala/ActorPatternsTest.scala b/akka-core/src/test/scala/ActorPatternsTest.scala index f6205c2a91..65c9dbf9f4 100644 --- a/akka-core/src/test/scala/ActorPatternsTest.scala +++ b/akka-core/src/test/scala/ActorPatternsTest.scala @@ -1,6 +1,5 @@ package se.scalablesolutions.akka.patterns -import java.util.concurrent.atomic.AtomicInteger import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor._ @@ -11,8 +10,11 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.matchers.MustMatchers import org.junit.{Before, After, Test} + import scala.collection.mutable.HashSet -import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{CountDownLatch, TimeUnit} @RunWith(classOf[JUnitRunner]) class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMatchers with Logging { diff --git a/akka-core/src/test/scala/SerializableActorSpec.scala b/akka-core/src/test/scala/SerializableActorSpec.scala index a743a5eb0b..2db077c354 100644 --- a/akka-core/src/test/scala/SerializableActorSpec.scala +++ b/akka-core/src/test/scala/SerializableActorSpec.scala @@ -20,7 +20,6 @@ class SerializableActorSpec extends describe("SerializableActor") { it("should be able to serialize and deserialize a JavaSerializableActor") { val actor1 = actorOf[JavaSerializableTestActor].start - val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) (actor1 !! "hello").getOrElse("_") should equal("world 1") val bytes = actor1.toBinary @@ -32,7 +31,6 @@ class SerializableActorSpec extends it("should be able to serialize and deserialize a ProtobufSerializableActor") { val actor1 = actorOf[ProtobufSerializableTestActor].start - val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) (actor1 !! "hello").getOrElse("_") should equal("world 1") (actor1 !! "hello").getOrElse("_") should equal("world 2") @@ -43,33 +41,16 @@ class SerializableActorSpec extends (actor2 !! "hello").getOrElse("_") should equal("world 3") } - -/* - it("should be able to serialize and deserialize a JavaJSONSerializableActor") { - val actor1 = actorOf[JavaJSONSerializableTestActor].start - val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) - (actor1 !! "hello").getOrElse("_") should equal("world 1") - (actor1 !! "hello").getOrElse("_") should equal("world 2") + it("should be able to serialize and deserialize a StatelessSerializableActor") { + val actor1 = actorOf[StatelessSerializableTestActor].start + (actor1 !! "hello").getOrElse("_") should equal("world") val bytes = actor1.toBinary val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes) actor2.start - (actor2 !! "hello").getOrElse("_") should equal("world 3") + (actor2 !! "hello").getOrElse("_") should equal("world") } - - it("should be able to serialize and deserialize a ScalaJSONSerializableActor") { - val actor1 = actorOf[ScalaJSONSerializableTestActor].start - val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) - (actor1 !! "hello").getOrElse("_") should equal("world 1") - - val bytes = actor1.toBinary - val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes) - - actor2.start - (actor2 !! "hello").getOrElse("_") should equal("world 2") - } -*/ } } @@ -82,6 +63,13 @@ class SerializableActorSpec extends } } +class StatelessSerializableTestActor extends StatelessSerializableActor { + def receive = { + case "hello" => + self.reply("world") + } +} + class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufProtocol.Counter] { val clazz = classOf[ProtobufProtocol.Counter] private var count = 0 @@ -95,21 +83,3 @@ class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufPr self.reply("world " + count) } } - -class JavaJSONSerializableTestActor extends JavaJSONSerializableActor { - private var count = 0 - def receive = { - case "hello" => - count = count + 1 - self.reply("world " + count) - } -} - -@scala.reflect.BeanInfo class ScalaJSONSerializableTestActor extends ScalaJSONSerializableActor { - private var count = 0 - def receive = { - case "hello" => - count = count + 1 - self.reply("world " + count) - } -} \ No newline at end of file diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala index 1544936446..37914069a8 100644 --- a/akka-core/src/test/scala/StmSpec.scala +++ b/akka-core/src/test/scala/StmSpec.scala @@ -104,26 +104,21 @@ class StmSpec extends describe("Transactor") { it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") { import GlobalTransactionVectorTestActor._ - try { - val actor = actorOf[NestedTransactorLevelOneActor].start - actor !! Add(2) - val size1 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) - size1 should equal(2) - actor !! Add(7) - actor ! "HiLevelOne" - val size2 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) - size2 should equal(7) - actor !! Add(0) - actor ! "HiLevelTwo" - val size3 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) - size3 should equal(0) - actor !! Add(3) - val size4 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) - size4 should equal(3) - } catch { - case e => - fail(e.toString) - } + val actor = actorOf[NestedTransactorLevelOneActor].start + actor !! Add(2) + val size1 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) + size1 should equal(2) + actor !! Add(7) + actor ! "HiLevelOne" + val size2 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) + size2 should equal(7) + actor !! Add(0) + actor ! "HiLevelTwo" + val size3 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) + size3 should equal(0) + actor !! Add(3) + val size4 = (actor !! Size).as[Int].getOrElse(fail("Could not get size")) + size4 should equal(3) } } /* diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala index b020c473f6..c5a6fb3fba 100644 --- a/akka-http/src/main/scala/AkkaCometServlet.scala +++ b/akka-http/src/main/scala/AkkaCometServlet.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.comet import se.scalablesolutions.akka.util.Logging import java.util.{List => JList} -import javax.servlet.ServletConfig +import javax.servlet.{ServletConfig,ServletContext} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import com.sun.jersey.spi.container.servlet.ServletContainer @@ -43,14 +43,32 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce * Used by the Akka Kernel to bootstrap REST and Comet. */ class AkkaServlet extends AtmosphereServlet with Logging { + import se.scalablesolutions.akka.config.Config.{config => c} + addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true") addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName) + addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";")) + addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(",")) - lazy val servlet = createRestServlet - - protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet { + val servlet = new AtmosphereRestServlet { override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key) + override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames() } + + override def getInitParameter(key : String) = Option(super.getInitParameter(key)).getOrElse(initParams.get(key)) + + override def getInitParameterNames() = { + val names = new java.util.Vector[String]() + + val i = initParams.keySet.iterator + while(i.hasNext) names.add(i.next.toString) + + val e = super.getInitParameterNames + while(e.hasMoreElements) names.add(e.nextElement.toString) + + names.elements + } + /** * We override this to avoid Atmosphere looking for it's atmosphere.xml file * Instead we specify what semantics we want in code. diff --git a/akka-http/src/main/scala/Security.scala b/akka-http/src/main/scala/Security.scala index 284d82d98e..bbc6242bfc 100644 --- a/akka-http/src/main/scala/Security.scala +++ b/akka-http/src/main/scala/Security.scala @@ -37,10 +37,6 @@ import javax.annotation.security.{DenyAll, PermitAll, RolesAllowed} import java.security.Principal import java.util.concurrent.TimeUnit -import net.liftweb.util.{SecurityHelpers, StringHelpers, IoHelpers} - -object Enc extends SecurityHelpers with StringHelpers with IoHelpers - case object OK /** @@ -249,7 +245,7 @@ trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] { * rest-part of the akka config */ trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging { - import Enc._ + import LiftUtils._ private object InvalidateNonces @@ -483,3 +479,87 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] w } } + +/* +* Copyright 2006-2010 WorldWide Conferencing, LLC +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +object LiftUtils { + import java.security.{MessageDigest,SecureRandom} + val random = new SecureRandom() + + def md5(in: Array[Byte]): Array[Byte] = (MessageDigest.getInstance("MD5")).digest(in) + + /** + * Create a random string of a given size + * @param size size of the string to create. Must be a positive or nul integer + * @return the generated string + */ + def randomString(size: Int): String = { + def addChar(pos: Int, lastRand: Int, sb: StringBuilder): StringBuilder = { + if (pos >= size) sb + else { + val randNum = if ((pos % 6) == 0) random.nextInt else lastRand + sb.append((randNum & 0x1f) match { + case n if n < 26 => ('A' + n).toChar + case n => ('0' + (n - 26)).toChar + }) + addChar(pos + 1, randNum >> 5, sb) + } + } + addChar(0, 0, new StringBuilder(size)).toString + } + +/** encode a Byte array as hexadecimal characters */ + def hexEncode(in: Array[Byte]): String = { + val sb = new StringBuilder + val len = in.length + def addDigit(in: Array[Byte], pos: Int, len: Int, sb: StringBuilder) { + if (pos < len) { + val b: Int = in(pos) + val msb = (b & 0xf0) >> 4 + val lsb = (b & 0x0f) + sb.append((if (msb < 10) ('0' + msb).asInstanceOf[Char] else ('a' + (msb - 10)).asInstanceOf[Char])) + sb.append((if (lsb < 10) ('0' + lsb).asInstanceOf[Char] else ('a' + (lsb - 10)).asInstanceOf[Char])) + addDigit(in, pos + 1, len, sb) + } + } + addDigit(in, 0, len, sb) + sb.toString + } + + + /** + * Splits a string of the form <name1=value1, name2=value2, ... > and unquotes the quoted values. + * The result is a Map[String, String] + */ + def splitNameValuePairs(props: String): Map[String, String] = { + /** + * If str is surrounded by quotes it return the content between the quotes + */ + def unquote(str: String) = { + if ((str ne null) && str.length >= 2 && str.charAt(0) == '\"' && str.charAt(str.length - 1) == '\"') + str.substring(1, str.length - 1) + else + str + } + + val list = props.split(",").toList.map(in => { + val pair = in match { case null => Nil case s => s.split("=").toList.map(_.trim).filter(_.length > 0) } + (pair(0), unquote(pair(1))) + }) + val map: Map[String, String] = Map.empty + (map /: list)((m, next) => m + (next)) + } +} diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/EmbeddedAppServer.scala index 8d9982c7e2..8f5495b5c1 100644 --- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala +++ b/akka-kernel/src/main/scala/EmbeddedAppServer.scala @@ -61,12 +61,6 @@ trait EmbeddedAppServer extends Bootable with Logging { "org.atmosphere.container.GrizzlyCometSupport") adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass", "com.sun.jersey.api.core.PackagesResourceConfig") - adapter.addInitParameter("com.sun.jersey.config.property.packages", - config.getList("akka.rest.resource_packages").mkString(";") - ) - adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters", - config.getList("akka.rest.filters").mkString(",") - ) if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root") log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath) diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml index f0932ce1b3..36645a936d 100644 --- a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml +++ b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml @@ -1,11 +1,24 @@
true.
+ */
+ override def shouldGenerateIdAsFallback = true
+}
\ No newline at end of file
diff --git a/akka-spring/src/main/scala/CamelServiceFactoryBean.scala b/akka-spring/src/main/scala/CamelServiceFactoryBean.scala
new file mode 100644
index 0000000000..040473951e
--- /dev/null
+++ b/akka-spring/src/main/scala/CamelServiceFactoryBean.scala
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB camelService if defined, then
+ * creates and starts the {@link CamelService} singleton.
+ */
+ def afterPropertiesSet = {
+ if (camelContext ne null) {
+ CamelContextManager.init(camelContext)
+ }
+ instance = CamelService.newInstance
+ instance.load
+ }
+
+ /**
+ * Stops the {@link CamelService} singleton.
+ */
+ def destroy = {
+ instance.unload
+ }
+}
\ No newline at end of file
diff --git a/akka-spring/src/main/scala/PropertyEntries.scala b/akka-spring/src/main/scala/PropertyEntries.scala
new file mode 100644
index 0000000000..935baac4f4
--- /dev/null
+++ b/akka-spring/src/main/scala/PropertyEntries.scala
@@ -0,0 +1,18 @@
+package se.scalablesolutions.akka.spring
+
+import org.springframework.beans.factory.support.BeanDefinitionBuilder
+
+import scala.collection.mutable._
+
+/**
+* Simple container for Properties
+* @author Johan Rask
+*/
+class PropertyEntries {
+
+ var entryList:ListBuffer[PropertyEntry] = ListBuffer[PropertyEntry]()
+
+ def add(entry:PropertyEntry) = {
+ entryList.append(entry)
+ }
+}
\ No newline at end of file
diff --git a/akka-spring/src/main/scala/PropertyEntry.scala b/akka-spring/src/main/scala/PropertyEntry.scala
new file mode 100644
index 0000000000..a01241635b
--- /dev/null
+++ b/akka-spring/src/main/scala/PropertyEntry.scala
@@ -0,0 +1,17 @@
+package se.scalablesolutions.akka.spring
+
+/**
+* Represents a property element
+* @author Johan Rask
+*/
+class PropertyEntry {
+
+ var name:String = _
+ var value:String = null
+ var ref:String = null
+
+
+ override def toString(): String = {
+ format("name = %s,value = %s, ref = %s", name,value,ref)
+ }
+}
\ No newline at end of file
diff --git a/akka-spring/src/main/scala/SupervisionFactoryBean.scala b/akka-spring/src/main/scala/SupervisionFactoryBean.scala
index d82d329f79..d8c44c3502 100644
--- a/akka-spring/src/main/scala/SupervisionFactoryBean.scala
+++ b/akka-spring/src/main/scala/SupervisionFactoryBean.scala
@@ -40,7 +40,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[ActiveObjectConfigurato
*/
private[akka] def createComponent(props: ActiveObjectProperties): Component = {
import StringReflect._
- val lifeCycle = if (!props.lifecyclye.isEmpty && props.lifecyclye.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
+ val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
val isRemote = (props.host != null) && (!props.host.isEmpty)
val withInterface = (props.interface != null) && (!props.interface.isEmpty)
if (isRemote) {
diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml
new file mode 100644
index 0000000000..3dc4d63794
--- /dev/null
+++ b/akka-spring/src/test/resources/appContext.xml
@@ -0,0 +1,21 @@
+
+