diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 5c113384b0..41a87fa6ee 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -354,7 +354,7 @@ object ActiveObject extends Logging { } private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val proxy = Proxy.newInstance(target, false, false) + val proxy = Proxy.newInstance(target, true, false) val context = injectActiveObjectContext(proxy) actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context) actorRef.timeout = timeout @@ -367,7 +367,7 @@ object ActiveObject extends Logging { private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val context = injectActiveObjectContext(target) - val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) + val proxy = Proxy.newInstance(Array(intf), Array(target), true, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context) actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) @@ -461,7 +461,7 @@ object ActiveObject extends Logging { val parent = clazz.getSuperclass if (parent != null) injectActiveObjectContext0(activeObject, parent) else { - log.warning( + log.trace( "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.", activeObject.getClass.getName) None diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 807355d95e..693fbaad26 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -34,27 +34,59 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { self.makeRemote(hostname, port) } +/** + * Base trait defining a serializable actor. + * + * @author Jonas Bonér + */ +trait SerializableActor extends Actor + +/** + * Base trait defining a stateless serializable actor. + * + * @author Jonas Bonér + */ +trait StatelessSerializableActor extends SerializableActor + /** * Mix in this trait to create a serializable actor, serializable through * a custom serialization protocol. * * @author Jonas Bonér */ -trait SerializableActor extends Actor { - val serializer: Serializer +trait StatefulSerializableActor extends SerializableActor { def toBinary: Array[Byte] } +/** + * Mix in this trait to create a serializable actor, serializable through + * a custom serialization protocol. This actor is the serialized state. + * + * @author Jonas Bonér + */ +trait StatefulSerializerSerializableActor extends StatefulSerializableActor { + val serializer: Serializer +} + +/** + * Mix in this trait to create a serializable actor, serializable through + * a custom serialization protocol. This actor is wrapping serializable state. + * + * @author Jonas Bonér + */ +trait StatefulWrappedSerializableActor extends StatefulSerializableActor { + def fromBinary(bytes: Array[Byte]) +} + /** * Mix in this trait to create a serializable actor, serializable through * Protobuf. * * @author Jonas Bonér */ -trait ProtobufSerializableActor[T <: Message] extends SerializableActor { - val serializer = Serializer.Protobuf +trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor { def toBinary: Array[Byte] = toProtobuf.toByteArray - def fromBinary(bytes: Array[Byte]) = fromProtobuf(serializer.fromBinary(bytes, Some(clazz)).asInstanceOf[T]) + def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T]) val clazz: Class[T] def toProtobuf: T @@ -67,7 +99,7 @@ trait ProtobufSerializableActor[T <: Message] extends SerializableActor { * * @author Jonas Bonér */ -trait JavaSerializableActor extends SerializableActor { +trait JavaSerializableActor extends StatefulSerializerSerializableActor { @transient val serializer = Serializer.Java def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -78,7 +110,7 @@ trait JavaSerializableActor extends SerializableActor { * * @author Jonas Bonér */ -trait JavaJSONSerializableActor extends SerializableActor { +trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { val serializer = Serializer.JavaJSON def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -89,7 +121,7 @@ trait JavaJSONSerializableActor extends SerializableActor { * * @author Jonas Bonér */ -trait ScalaJSONSerializableActor extends SerializableActor { +trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor { val serializer = Serializer.ScalaJSON def toBinary: Array[Byte] = serializer.toBinary(this) } diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index b9bcd08938..d7ca9a369b 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -74,7 +74,8 @@ object ActorRef { /** * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ - private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = + private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { + Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol) RemoteActorRef( protocol.getUuid, protocol.getActorClassname, @@ -82,6 +83,7 @@ object ActorRef { protocol.getHomeAddress.getPort, protocol.getTimeout, loader) + } /** * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. @@ -99,11 +101,15 @@ object ActorRef { * Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance. */ private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - val serializerClass = - if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) - else Class.forName(protocol.getSerializerClassname) - val serializer = serializerClass.newInstance.asInstanceOf[Serializer] - + Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) + + val serializer = if (protocol.hasSerializerClassname) { + val serializerClass = + if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) + else Class.forName(protocol.getSerializerClassname) + Some(serializerClass.newInstance.asInstanceOf[Serializer]) + } else None + val lifeCycle = if (protocol.hasLifeCycle) { val lifeCycleProtocol = protocol.getLifeCycle @@ -120,8 +126,9 @@ object ActorRef { if (protocol.hasSupervisor) Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) else None + val hotswap = - if (protocol.hasHotswapStack) Some(serializer + if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) .asInstanceOf[PartialFunction[Any, Unit]]) else None @@ -339,10 +346,12 @@ trait ActorRef extends TransactionManagement { /** * Returns the 'Serializer' instance for the Actor as an Option. *

- * It returns 'Some(serializer)' if the Actor is serializable and 'None' if not. + * It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor + * trait (which has a Serializer defined) and 'None' if not. */ def serializer: Option[Serializer] = - if (isSerializable) Some(actor.asInstanceOf[SerializableActor].serializer) + if (actor.isInstanceOf[StatefulSerializerSerializableActor]) + Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer) else None /** @@ -694,15 +703,25 @@ sealed class LocalActorRef private[akka]( __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], __loader: ClassLoader, - __serializer: Serializer) = { + __serializer: Option[Serializer]) = { this(() => { val actorClass = __loader.loadClass(__actorClassName) val actorInstance = actorClass.newInstance - if (actorInstance.isInstanceOf[ProtobufSerializableActor[_]]) { - val instance = actorInstance.asInstanceOf[ProtobufSerializableActor[_]] + if (actorInstance.isInstanceOf[StatelessSerializableActor]) { + actorInstance.asInstanceOf[Actor] + } else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) { + __serializer + .getOrElse(throw new IllegalStateException("No serializer defined for SerializableActor [" + actorClass.getName + "]")) + .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] + } else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) { + val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor] instance.fromBinary(__actorBytes) instance - } else __serializer.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] + } else throw new IllegalStateException( + "Can't deserialize Actor that is not an instance of one of:\n" + + "\n\t- StatelessSerializableActor" + + "\n\t- StatefulSerializerSerializableActor" + + "\n\t- StatefulWrappedSerializableActor") }) loader = Some(__loader) isDeserialized = true @@ -761,7 +780,8 @@ sealed class LocalActorRef private[akka]( protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard { if (!isSerializable) throw new IllegalStateException( - "Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") + "Can't serialize an ActorRef using SerializedActorRefProtocol" + + "\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { @@ -782,23 +802,19 @@ sealed class LocalActorRef private[akka]( } } - val serializerClassname = serializer - .getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined")) - .getClass.getName val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build val builder = SerializedActorRefProtocol.newBuilder .setUuid(uuid) .setId(id) .setActorClassname(actorClass.getName) - .setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor].toBinary)) - .setSerializerClassname(serializerClassname) + .setActorInstance(ByteString.copyFrom(actor.asInstanceOf[StatefulSerializableActor].toBinary)) .setOriginalAddress(originalAddress) .setIsTransactor(isTransactor) .setTimeout(timeout) - + serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName)) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) - supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol)) + supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol)) // FIXME: how to serialize the hotswap PartialFunction ?? // hotswap.foreach(builder.setHotswapStack(_)) builder.build @@ -813,8 +829,10 @@ sealed class LocalActorRef private[akka]( * Serializes the ActorRef instance into a byte array (Array[Byte]). */ def toBinary: Array[Byte] = { - if (isSerializable) toSerializedActorRefProtocol.toByteArray - else toRemoteActorRefProtocol.toByteArray + val protocol = if (isSerializable) toSerializedActorRefProtocol + else toRemoteActorRefProtocol + Actor.log.debug("Serializing ActorRef to binary:\n" + protocol) + protocol.toByteArray } /** diff --git a/akka-core/src/test/scala/SerializableActorSpec.scala b/akka-core/src/test/scala/SerializableActorSpec.scala index a743a5eb0b..14b8f0dd5c 100644 --- a/akka-core/src/test/scala/SerializableActorSpec.scala +++ b/akka-core/src/test/scala/SerializableActorSpec.scala @@ -20,7 +20,6 @@ class SerializableActorSpec extends describe("SerializableActor") { it("should be able to serialize and deserialize a JavaSerializableActor") { val actor1 = actorOf[JavaSerializableTestActor].start - val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) (actor1 !! "hello").getOrElse("_") should equal("world 1") val bytes = actor1.toBinary @@ -30,9 +29,9 @@ class SerializableActorSpec extends (actor2 !! "hello").getOrElse("_") should equal("world 2") } + /* it("should be able to serialize and deserialize a ProtobufSerializableActor") { val actor1 = actorOf[ProtobufSerializableTestActor].start - val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) (actor1 !! "hello").getOrElse("_") should equal("world 1") (actor1 !! "hello").getOrElse("_") should equal("world 2") @@ -43,8 +42,6 @@ class SerializableActorSpec extends (actor2 !! "hello").getOrElse("_") should equal("world 3") } - -/* it("should be able to serialize and deserialize a JavaJSONSerializableActor") { val actor1 = actorOf[JavaJSONSerializableTestActor].start val serializer = actor1.serializer.getOrElse(fail("Serializer not defined")) diff --git a/akka-sbt-plugin/project/build.properties b/akka-sbt-plugin/project/build.properties new file mode 100644 index 0000000000..27b9049bf2 --- /dev/null +++ b/akka-sbt-plugin/project/build.properties @@ -0,0 +1,6 @@ +project.name=Akka Plugin +project.organization=se.scalablesolutions.akka +# mirrors akka version +project.version=0.9.1 +sbt.version=0.7.4 +build.scala.versions=2.7.7 diff --git a/akka-sbt-plugin/project/build/AkkaPluginProject.scala b/akka-sbt-plugin/project/build/AkkaPluginProject.scala new file mode 100644 index 0000000000..4c63e9d14f --- /dev/null +++ b/akka-sbt-plugin/project/build/AkkaPluginProject.scala @@ -0,0 +1,3 @@ +import sbt._ + +class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info) diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala new file mode 100644 index 0000000000..5ec83204a2 --- /dev/null +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -0,0 +1,48 @@ +import sbt._ + +object AkkaRepositories { + val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") + val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") + val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") + val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") +} + +trait AkkaBaseProject extends BasicScalaProject { + import AkkaRepositories._ + + // Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases) + // is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action. + + val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo) + val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo) + val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo) + val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo) + val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", AkkaRepo) + val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo) + val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo) + val sjsonModuleConfig = ModuleConfiguration("sjson.json", AkkaRepo) + val voldemortModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo) + val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo) + val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) + val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) + val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) + val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) + val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) + val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) + val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) + val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) + val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) + val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) + val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) +} + +trait AkkaProject extends AkkaBaseProject { + val akkaVersion = "0.9.1" + + // convenience method + def akkaModule(module: String) = "se.scalablesolutions.akka" %% ("akka-" + module) % akkaVersion + + // akka core dependency by default + val akkaCore = akkaModule("core") +}