diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 79cf2cc920..ddc26313d5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -139,8 +139,24 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def setFaultHandler(handler: FaultHandlingStrategy) def getFaultHandler(): FaultHandlingStrategy - @volatile - private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher + + /** + * Akka Java API + * A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent) + *

+ * Can be one of: + * + * import static akka.config.Supervision.*; + *

+   * getContext().setLifeCycle(permanent());
+   * 
+ * Or: + *
+   * getContext().setLifeCycle(temporary());
+   * 
+ */ + def setLifeCycle(lifeCycle: LifeCycle): Unit + def getLifeCycle(): LifeCycle /** * Akka Java API @@ -597,6 +613,8 @@ class LocalActorRef private[akka] ( private var restartsWithinTimeRangeTimestamp: Long = 0L @volatile private var _mailbox: AnyRef = _ + @volatile + private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } @@ -1208,6 +1226,8 @@ private[akka] case class RemoteActorRef private[akka] ( ensureRemotingEnabled + val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port)) + id = classOrServiceName timeout = _timeout @@ -1242,8 +1262,6 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None - val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port)) - // ==== NOT SUPPORTED ==== def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported @@ -1313,9 +1331,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => */ def id: String - def id_=(id: String): Unit - - /** + def id_=(id: String): Unit /** * User overridable callback/setting. *

* Defines the life-cycle for a supervised actor. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index bf0a479f7f..3476304a67 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -37,10 +37,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends ListenerManagement { - private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] - private val actorsById = new Index[String,ActorRef] - private val remoteActorSets = Map[Address, RemoteActorSet]() - private val guard = new ReadWriteGuard + private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] + private val actorsById = new Index[String,ActorRef] + private val remoteActorSets = Map[Address, RemoteActorSet]() + private val guard = new ReadWriteGuard /** * Returns all actors in the system. @@ -50,7 +50,7 @@ object ActorRegistry extends ListenerManagement { /** * Returns the number of actors in the system. */ - def size : Int = actorsByUUID.size + def size: Int = actorsByUUID.size /** * Invokes a function for all actors. @@ -68,8 +68,7 @@ object ActorRegistry extends ListenerManagement { val elements = actorsByUUID.elements while (elements.hasMoreElements) { val element = elements.nextElement - if(f isDefinedAt element) - return Some(f(element)) + if (f isDefinedAt element) return Some(f(element)) } None } @@ -88,9 +87,7 @@ object ActorRegistry extends ListenerManagement { val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorId = elements.nextElement - if (p(actorId)) { - all += actorId - } + if (p(actorId)) all += actorId } all.toArray } @@ -105,7 +102,7 @@ object ActorRegistry extends ListenerManagement { * Finds any actor that matches T. */ def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = - find({ case a:ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a }) + find({ case a: ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a }) /** * Finds all actors of type or sub-type specified by the class passed in as the Class argument. @@ -132,13 +129,11 @@ object ActorRegistry extends ListenerManagement { * Invokes a function for all typed actors. */ def foreachTypedActor(f: (AnyRef) => Unit) = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val elements = actorsByUUID.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined) { - f(proxy.get) - } + if (proxy.isDefined) f(proxy.get) } } @@ -147,12 +142,11 @@ object ActorRegistry extends ListenerManagement { * Returns None if the function never returns Some */ def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val elements = actorsByUUID.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) - if(proxy.isDefined && (f isDefinedAt proxy)) - return Some(f(proxy)) + if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy)) } None } @@ -161,14 +155,12 @@ object ActorRegistry extends ListenerManagement { * Finds all typed actors that satisfy a predicate. */ def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val all = new ListBuffer[AnyRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined && p(proxy.get)) { - all += proxy.get - } + if (proxy.isDefined && p(proxy.get)) all += proxy.get } all.toArray } @@ -177,7 +169,7 @@ object ActorRegistry extends ListenerManagement { * Finds all typed actors that are subtypes of the class passed in as the Manifest argument. */ def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) } @@ -185,20 +177,20 @@ object ActorRegistry extends ListenerManagement { * Finds any typed actor that matches T. */ def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled - def predicate(proxy: AnyRef) : Boolean = { + TypedActorModule.ensureEnabled + def predicate(proxy: AnyRef): Boolean = { val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass) } - findTypedActor({ case a:AnyRef if predicate(a) => a }) + findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a }) } /** * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument. */ def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled - def predicate(proxy: AnyRef) : Boolean = { + TypedActorModule.ensureEnabled + def predicate(proxy: AnyRef): Boolean = { val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) } @@ -209,7 +201,7 @@ object ActorRegistry extends ListenerManagement { * Finds all typed actors that have a specific id. */ def typedActorsFor(id: String): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val actorRefs = actorsById values id actorRefs.flatMap(typedActorFor(_)) } @@ -218,12 +210,10 @@ object ActorRegistry extends ListenerManagement { * Finds the typed actor that has a specific UUID. */ def typedActorFor(uuid: Uuid): Option[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val actorRef = actorsByUUID get uuid - if (actorRef eq null) - None - else - typedActorFor(actorRef) + if (actorRef eq null) None + else typedActorFor(actorRef) } /** @@ -265,20 +255,15 @@ object ActorRegistry extends ListenerManagement { */ def shutdownAll() { log.info("Shutting down all actors in the system...") - if (TypedActorModule.isTypedActorEnabled) { + if (TypedActorModule.isEnabled) { val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorRef = elements.nextElement val proxy = typedActorFor(actorRef) - if (proxy.isDefined) { - TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) - } else { - actorRef.stop - } + if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) + else actorRef.stop } - } else { - foreach(_.stop) - } + } else foreach(_.stop) actorsByUUID.clear actorsById.clear log.info("All actors have been shut down and unregistered from ActorRegistry") @@ -301,14 +286,18 @@ object ActorRegistry extends ListenerManagement { private[akka] def actors(address: Address) = actorsFor(address).actors private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid + private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories private[akka] def typedActors(address: Address) = actorsFor(address).typedActors private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid + private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories private[akka] class RemoteActorSet { private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef] private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef] private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + private[ActorRegistry] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] } } @@ -337,16 +326,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { if (set ne null) { set.synchronized { - if (set.isEmpty) { - retry = true //IF the set is empty then it has been removed, so signal retry - } + if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry else { //Else add the value to the set and signal that retry is not needed added = set add v retry = false } } - } - else { + } else { val newSet = new ConcurrentSkipListSet[V] newSet add v @@ -354,24 +340,20 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { val oldSet = container.putIfAbsent(k,newSet) if (oldSet ne null) { oldSet.synchronized { - if (oldSet.isEmpty) { - retry = true //IF the set is empty then it has been removed, so signal retry - } + if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry else { //Else try to add the value to the set and signal that retry is not needed added = oldSet add v retry = false } } - } else { - added = true - } + } else added = true } - if (retry) spinPut(k,v) + if (retry) spinPut(k, v) else added } - spinPut(key,value) + spinPut(key, value) } /** @@ -390,10 +372,8 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { def findValue(key: K)(f: (V) => Boolean): Option[V] = { import scala.collection.JavaConversions._ val set = container get key - if (set ne null) - set.iterator.find(f) - else - None + if (set ne null) set.iterator.find(f) + else None } /** @@ -420,8 +400,7 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set true //Remove succeeded - } - else false //Remove failed + } else false //Remove failed } } else false //Remove failed } @@ -434,5 +413,5 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { /** * Removes all keys and all values */ - def clear = foreach { case (k,v) => remove(k,v) } + def clear = foreach { case (k, v) => remove(k, v) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 83ff50427a..ee5dd890b8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -5,7 +5,7 @@ package akka.dispatch import akka.actor.{ActorRef, IllegalActorStateException} -import akka.util.ReflectiveAccess.EnterpriseModule +import akka.util.ReflectiveAccess.AkkaCloudModule import java.util.Queue import akka.util.Switch @@ -123,10 +123,10 @@ class ExecutorBasedEventDrivenDispatcher( */ def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { // FIXME make generic (work for TypedActor as well) - case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index ff71b607ce..7e81d4a598 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -5,7 +5,7 @@ package akka.dispatch import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} -import akka.util.ReflectiveAccess.EnterpriseModule +import akka.util.ReflectiveAccess.AkkaCloudModule import akka.AkkaException import java.util.{Queue, List} @@ -42,15 +42,15 @@ case class BoundedMailbox( if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") } -abstract class DurableMailboxType(val serializer: EnterpriseModule.Serializer) extends MailboxType { +abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType { if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null") } -case class FileBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class RedisBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class BeanstalkBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class ZooKeeperBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class AMQPBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class JMSBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) +case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 5257d596f0..b5a4c7edb7 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -20,13 +20,13 @@ object ReflectiveAccess extends Logging { val loader = getClass.getClassLoader - lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled - lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled - lazy val isEnterpriseEnabled = EnterpriseModule.isEnterpriseEnabled + lazy val isRemotingEnabled = RemoteClientModule.isEnabled + lazy val isTypedActorEnabled = TypedActorModule.isEnabled + lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled - def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled - def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled - def ensureEnterpriseEnabled = EnterpriseModule.ensureEnterpriseEnabled + def ensureRemotingEnabled = RemoteClientModule.ensureEnabled + def ensureTypedActorEnabled = TypedActorModule.ensureEnabled + def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled /** * Reflective access to the RemoteClient module. @@ -56,32 +56,32 @@ object ReflectiveAccess extends Logging { def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient } - lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined + lazy val isEnabled = remoteClientObjectInstance.isDefined - def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException( + def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") val remoteClientObjectInstance: Option[RemoteClientObject] = getObjectFor("akka.remote.RemoteClient$") def register(address: InetSocketAddress, uuid: Uuid) = { - ensureRemotingEnabled + ensureEnabled remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid) } def unregister(address: InetSocketAddress, uuid: Uuid) = { - ensureRemotingEnabled + ensureEnabled remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid) } def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = { - ensureRemotingEnabled + ensureEnabled val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress) remoteClient.registerSupervisorForActor(actorRef) } def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = { - ensureRemotingEnabled + ensureEnabled remoteClientObjectInstance.get.clientFor(hostname, port, loader) } @@ -95,7 +95,7 @@ object ReflectiveAccess extends Logging { actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType): Option[CompletableFuture[T]] = { - ensureRemotingEnabled + ensureEnabled clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T]( message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) } @@ -126,17 +126,17 @@ object ReflectiveAccess extends Logging { getObjectFor("akka.remote.RemoteNode$") def registerActor(address: InetSocketAddress, actorRef: ActorRef) = { - ensureRemotingEnabled + RemoteClientModule.ensureEnabled remoteServerObjectInstance.get.registerActor(address, actorRef) } def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = { - ensureRemotingEnabled + RemoteClientModule.ensureEnabled remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy) } def unregister(actorRef: ActorRef) = { - ensureRemotingEnabled + RemoteClientModule.ensureEnabled remoteNodeObjectInstance.get.unregister(actorRef) } } @@ -156,16 +156,16 @@ object ReflectiveAccess extends Logging { def stop(anyRef: AnyRef) : Unit } - lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined + lazy val isEnabled = typedActorObjectInstance.isDefined - def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( + def ensureEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( "Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath") val typedActorObjectInstance: Option[TypedActorObject] = getObjectFor("akka.actor.TypedActor$") def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = { - ensureTypedActorEnabled + ensureEnabled if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) { future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) } @@ -173,7 +173,7 @@ object ReflectiveAccess extends Logging { } } - object EnterpriseModule { + object AkkaCloudModule { type Mailbox = { def enqueue(message: MessageInvocation) @@ -185,27 +185,27 @@ object ReflectiveAccess extends Logging { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef } - lazy val isEnterpriseEnabled = clusterObjectInstance.isDefined + lazy val isEnabled = clusterObjectInstance.isDefined val clusterObjectInstance: Option[AnyRef] = - getObjectFor("akka.cluster.Cluster$") + getObjectFor("akka.cloud.cluster.Cluster$") val serializerClass: Option[Class[_]] = getClassFor("akka.serialization.Serializer") - def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException( - "Feature is only available in Akka Enterprise edition") + def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( + "Feature is only available in Akka Cloud") - def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.FileBasedMailbox", actorRef) + def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef) - def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef) + def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef) - def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.BeanstalkBasedMailbox", actorRef) + def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef) - def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.RedisBasedMailbox", actorRef) + def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef) private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = { - ensureEnterpriseEnabled + ensureEnabled createInstance( mailboxClassname, Array(classOf[ActorRef]), diff --git a/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala index 6148b04f53..f951281f2e 100644 --- a/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -27,7 +27,6 @@ object ActorRegistrySpec { self.reply("got ping") } } - } class ActorRegistrySpec extends JUnitSuite { diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala index bd10420a5b..23940fbadf 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala @@ -18,7 +18,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { override protected def beforeAll(): Unit = { try { - val dir = "./akka-persistence/akka-persistence-voldemort/target/scala_2.8.0/test-resources" + val dir = "./akka-persistence/akka-persistence-voldemort/target/test-resources" val home = new File(dir) log.info("Creating Voldemort Config") val config = VoldemortConfig.loadFromVoldemortHome(home.getCanonicalPath) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala index 3d4e81e336..1d79978559 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala @@ -284,6 +284,21 @@ class RemoteServer extends Logging with ListenerManagement { else registerTypedActor(id, typedActor, typedActors) } + /** + * Register typed actor by interface name. + */ + def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory) + + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + */ + def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = synchronized { + log.debug("Registering server side typed remote session actor with id [%s]", id) + registerTypedPerSessionActor(id, () => factory, typedActorsFactories) + } + /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ @@ -300,15 +315,36 @@ class RemoteServer extends Logging with ListenerManagement { else register(id, actorRef, actors) } + /** + * Register Remote Session Actor by a specific 'id' passed as argument. + *

+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. + */ + def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized { + log.debug("Registering server side remote session actor with id [%s]", id) + registerPerSession(id, () => factory, actorsFactories) + } + private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - if (_isRunning && !registry.contains(id)) { + if (_isRunning) { + registry.put(id, actorRef) //TODO change to putIfAbsent if (!actorRef.isRunning) actorRef.start - registry.put(id, actorRef) } } + private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { + if (_isRunning) + registry.put(id, factory) //TODO change to putIfAbsent + } + private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) + if (_isRunning) + registry.put(id, typedActor) //TODO change to putIfAbsent + } + + private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { + if (_isRunning) + registry.put(id, factory) //TODO change to putIfAbsent } /** @@ -339,6 +375,18 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Unregister Remote Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterPerSession(id: String):Unit = { + if (_isRunning) { + log.info("Unregistering server side remote session actor with id [%s]", id) + actorsFactories.remove(id) + } + } + /** * Unregister Remote Typed Actor by specific 'id'. *

@@ -352,14 +400,28 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Unregister Remote Typed Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedPerSessionActor(id: String):Unit = { + if (_isRunning) { + typedActorsFactories.remove(id) + } + } + protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) + private[akka] def actors = ActorRegistry.actors(address) private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address) + private[akka] def actorsFactories = ActorRegistry.actorsFactories(address) private[akka] def typedActors = ActorRegistry.typedActors(address) private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address) + private[akka] def typedActorsFactories = ActorRegistry.typedActorsFactories(address) } object RemoteServerSslContext { @@ -427,6 +489,9 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern + val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]() + val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]() + applicationLoader.foreach(MessageSerializer.setClassLoader(_)) /** @@ -437,6 +502,8 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) + typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) @@ -456,6 +523,22 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) + // stop all session actors + val channelActors = sessionActors.remove(event.getChannel) + if (channelActors ne null) { + val channelActorsIterator = channelActors.elements + while (channelActorsIterator.hasMoreElements) { + channelActorsIterator.nextElement.stop + } + } + + val channelTypedActors = typedSessionActors.remove(event.getChannel) + if (channelTypedActors ne null) { + val channelTypedActorsIterator = channelTypedActors.elements + while (channelTypedActorsIterator.hasMoreElements) { + TypedActor.stop(channelTypedActorsIterator.nextElement) + } + } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -510,7 +593,7 @@ class RemoteServerHandler( val actorRef = try { - createActor(actorInfo).start + createActor(actorInfo, channel).start } catch { case e: SecurityException => channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) @@ -542,7 +625,7 @@ class RemoteServerHandler( val exception = f.exception if (exception.isDefined) { - log.debug("Returning exception from actor invocation [%s]".format(exception.get)) + log.debug("Returning exception from actor invocation [%s]",exception.get) try { channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) } catch { @@ -584,7 +667,7 @@ class RemoteServerHandler( val typedActorInfo = actorInfo.getTypedActorInfo log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface) - val typedActor = createTypedActor(actorInfo) + val typedActor = createTypedActor(actorInfo, channel) val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList val argClasses = args.map(_.getClass) @@ -592,26 +675,35 @@ class RemoteServerHandler( val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) else { - val result = messageReceiver.invoke(typedActor, args: _*) match { - case f: Future[_] => f.await.result.get //TODO replace this with a Listener on the Future to avoid blocking - case other => other + //Sends the response + def sendResponse(result: Either[Any,Throwable]): Unit = try { + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + None, + Right(request.getUuid), + actorInfo.getId, + actorInfo.getTarget, + actorInfo.getTimeout, + result, + true, + None, + None, + AkkaActorType.TypedActor, + None) + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + channel.write(messageBuilder.build) + log.debug("Returning result from remote typed actor invocation [%s]", result) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) } - log.debug("Returning result from remote typed actor invocation [%s]", result) - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - None, - Right(request.getUuid), - actorInfo.getId, - actorInfo.getTarget, - actorInfo.getTimeout, - Left(result), - true, - None, - None, - AkkaActorType.TypedActor, - None) - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(messageBuilder.build) + messageReceiver.invoke(typedActor, args: _*) match { + case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed + f.onComplete( future => { + val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) + sendResponse(result) + }) + case other => sendResponse(Left(other)) + } } } catch { case e: InvocationTargetException => @@ -631,10 +723,26 @@ class RemoteServerHandler( server.actorsByUuid.get(uuid) } + private def findActorFactory(id: String) : () => ActorRef = { + server.actorsFactories.get(id) + } + + private def findSessionActor(id: String, channel: Channel) : ActorRef = { + sessionActors.get(channel).get(id) + } + private def findTypedActorById(id: String) : AnyRef = { server.typedActors.get(id) } + private def findTypedActorFactory(id: String) : () => AnyRef = { + server.typedActorsFactories.get(id) + } + + private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { + typedSessionActors.get(channel).get(id) + } + private def findTypedActorByUuid(uuid: String) : AnyRef = { server.typedActorsByUuid.get(uuid) } @@ -653,6 +761,60 @@ class RemoteServerHandler( actorRefOrNull } + /** + * gets the actor from the session, or creates one if there is a factory for it + */ + private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { + val uuid = actorInfo.getUuid + val id = actorInfo.getId + val sessionActorRefOrNull = findSessionActor(id, channel) + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else + { + // we dont have it in the session either, see if we have a factory for it + val actorFactoryOrNull = findActorFactory(id) + if (actorFactoryOrNull ne null) { + val actorRef = actorFactoryOrNull() + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + sessionActors.get(channel).put(id, actorRef) + actorRef + } + else + null + } + } + + + private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = { + val uuid = actorInfo.getUuid + val id = actorInfo.getId + val timeout = actorInfo.getTimeout + val name = actorInfo.getTarget + + try { + if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( + "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + + log.info("Creating a new remote actor [%s:%s]", name, uuid) + val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) + else Class.forName(name) + val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + actorRef.id = id + actorRef.timeout = timeout + actorRef.remoteAddress = None + server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid + actorRef + } catch { + case e => + log.error(e, "Could not create remote actor instance") + server.notifyListeners(RemoteServerError(e, server)) + throw e + } + + } + /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -660,72 +822,91 @@ class RemoteServerHandler( * * Does not start the actor. */ - private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { + private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - val name = actorInfo.getTarget - val timeout = actorInfo.getTimeout - val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (actorRefOrNull eq null) { - try { - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - - log.info("Creating a new remote actor [%s:%s]", name, uuid) - val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) - else Class.forName(name) - val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) - actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) - actorRef.id = id - actorRef.timeout = timeout - actorRef.remoteAddress = None - server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid - actorRef - } catch { - case e => - log.error(e, "Could not create remote actor instance") - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - } else actorRefOrNull + if (actorRefOrNull ne null) + actorRefOrNull + else + { + // the actor has not been registered globally. See if we have it in the session + val sessionActorRefOrNull = createSessionActor(actorInfo, channel) + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else // maybe it is a client managed actor + createClientManagedActor(actorInfo) + } } - private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { + /** + * gets the actor from the session, or creates one if there is a factory for it + */ + private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={ + val id = actorInfo.getId + val sessionActorRefOrNull = findTypedSessionActor(id, channel) + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else { + val actorFactoryOrNull = findTypedActorFactory(id) + if (actorFactoryOrNull ne null) { + val newInstance = actorFactoryOrNull() + typedSessionActors.get(channel).put(id, newInstance) + newInstance + } + else + null + } + + } + + private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = { + val typedActorInfo = actorInfo.getTypedActorInfo + val interfaceClassname = typedActorInfo.getInterface + val targetClassname = actorInfo.getTarget + val uuid = actorInfo.getUuid + + try { + if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( + "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + + log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname) + + val (interfaceClass, targetClass) = + if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), + applicationLoader.get.loadClass(targetClassname)) + else (Class.forName(interfaceClassname), Class.forName(targetClassname)) + + val newInstance = TypedActor.newInstance( + interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] + server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid + newInstance + } catch { + case e => + log.error(e, "Could not create remote typed actor instance") + server.notifyListeners(RemoteServerError(e, server)) + throw e + } + } + + private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - - if (typedActorOrNull eq null) { - val typedActorInfo = actorInfo.getTypedActorInfo - val interfaceClassname = typedActorInfo.getInterface - val targetClassname = actorInfo.getTarget - - try { - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - - log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname) - - val (interfaceClass, targetClass) = - if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), - applicationLoader.get.loadClass(targetClassname)) - else (Class.forName(interfaceClassname), Class.forName(targetClassname)) - - val newInstance = TypedActor.newInstance( - interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid - newInstance - } catch { - case e => - log.error(e, "Could not create remote typed actor instance") - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - } else typedActorOrNull + if (typedActorOrNull ne null) + typedActorOrNull + else + { + // the actor has not been registered globally. See if we have it in the session + val sessionActorRefOrNull = createTypedSessionActor(actorInfo, channel) + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else // maybe it is a client managed actor + createClientManagedTypedActor(actorInfo) + } } private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = { diff --git a/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java new file mode 100644 index 0000000000..8a6c2e6373 --- /dev/null +++ b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java @@ -0,0 +1,8 @@ +package akka.actor; + +public interface RemoteTypedSessionActor { + + public void login(String user); + public String getUser(); + public void doSomethingFunny() throws Exception; +} diff --git a/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java new file mode 100644 index 0000000000..b4140f74ed --- /dev/null +++ b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java @@ -0,0 +1,49 @@ +package akka.actor.remote; + +import akka.actor.*; + +import java.util.Set; +import java.util.HashSet; + +import java.util.concurrent.CountDownLatch; + +public class RemoteTypedSessionActorImpl extends TypedActor implements RemoteTypedSessionActor { + + + private static Set instantiatedSessionActors = new HashSet(); + + public static Set getInstances() { + return instantiatedSessionActors; + } + + @Override + public void preStart() { + instantiatedSessionActors.add(this); + } + + @Override + public void postStop() { + instantiatedSessionActors.remove(this); + } + + + private String user="anonymous"; + + @Override + public void login(String user) { + this.user = user; + } + + @Override + public String getUser() + { + return this.user; + } + + @Override + public void doSomethingFunny() throws Exception + { + throw new Exception("Bad boy"); + } + +} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index ef60732852..5881687c01 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -3,7 +3,8 @@ package akka.actor.remote import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} - +import akka.util._ + import akka.remote.{RemoteServer, RemoteClient} import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala new file mode 100644 index 0000000000..173898c1fe --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.actor.remote + +import org.scalatest._ +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit + +import akka.remote.{RemoteServer, RemoteClient} +import akka.actor._ +import akka.actor.Actor._ +import RemoteTypedActorLog._ + +object ServerInitiatedRemoteSessionActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + case class Login(user:String) + case class GetUser() + case class DoSomethingFunny() + + var instantiatedSessionActors= Set[ActorRef]() + + class RemoteStatefullSessionActorSpec extends Actor { + + var user : String= "anonymous" + + override def preStart = { + instantiatedSessionActors += self + } + + override def postStop = { + instantiatedSessionActors -= self + } + + def receive = { + case Login(user) => + this.user = user + case GetUser() => + self.reply(this.user) + case DoSomethingFunny() => + throw new Exception("Bad boy") + } + } + +} + +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteSessionActorSpec extends + FlatSpec with + ShouldMatchers with + BeforeAndAfterEach { + import ServerInitiatedRemoteSessionActorSpec._ + + private val unit = TimeUnit.MILLISECONDS + + + override def beforeEach = { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + + server.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) + + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterEach = { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + "A remote session Actor" should "create a new session actor per connection" in { + clearMessageLogs + + val session1 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + val default1 = session1 !! GetUser() + default1.get.asInstanceOf[String] should equal ("anonymous") + session1 ! Login("session[1]") + val result1 = session1 !! GetUser() + result1.get.asInstanceOf[String] should equal ("session[1]") + + session1.stop() + + RemoteClient.shutdownAll + + //RemoteClient.clientFor(HOSTNAME, PORT).connect + + val session2 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + // since this is a new session, the server should reset the state + val default2 = session2 !! GetUser() + default2.get.asInstanceOf[String] should equal ("anonymous") + + session2.stop() + + } + + it should "stop the actor when the client disconnects" in { + + val session1 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + + val default1 = session1 !! GetUser() + default1.get.asInstanceOf[String] should equal ("anonymous") + + instantiatedSessionActors should have size (1) + + RemoteClient.shutdownAll + Thread.sleep(1000) + instantiatedSessionActors should have size (0) + + } + + it should "stop the actor when there is an error" in { + + val session1 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + + session1 ! DoSomethingFunny() + session1.stop() + + Thread.sleep(1000) + + instantiatedSessionActors should have size (0) + } + + + it should "be able to unregister" in { + server.registerPerSession("my-service-1", actorOf[RemoteStatefullSessionActorSpec]) + server.actorsFactories.get("my-service-1") should not be (null) + server.unregisterPerSession("my-service-1") + server.actorsFactories.get("my-service-1") should be (null) + } + +} + diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala new file mode 100644 index 0000000000..0ae4ca2dee --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.actor.remote + +import org.scalatest._ +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit + +import akka.remote.{RemoteServer, RemoteClient} +import akka.actor._ +import RemoteTypedActorLog._ + +object ServerInitiatedRemoteTypedSessionActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null +} + +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteTypedSessionActorSpec extends + FlatSpec with + ShouldMatchers with + BeforeAndAfterEach { + import ServerInitiatedRemoteTypedActorSpec._ + + private val unit = TimeUnit.MILLISECONDS + + + override def beforeEach = { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + + server.registerTypedPerSessionActor("typed-session-actor-service", + TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) + + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterEach = { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + "A remote session Actor" should "create a new session actor per connection" in { + clearMessageLogs + + val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session1.getUser() should equal ("anonymous") + session1.login("session[1]") + session1.getUser() should equal ("session[1]") + + RemoteClient.shutdownAll + + val session2 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session2.getUser() should equal ("anonymous") + + } + + it should "stop the actor when the client disconnects" in { + + val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session1.getUser() should equal ("anonymous") + + RemoteTypedSessionActorImpl.getInstances() should have size (1) + RemoteClient.shutdownAll + Thread.sleep(1000) + RemoteTypedSessionActorImpl.getInstances() should have size (0) + + } + + it should "stop the actor when there is an error" in { + + val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session1.doSomethingFunny() + + RemoteClient.shutdownAll + Thread.sleep(1000) + RemoteTypedSessionActorImpl.getInstances() should have size (0) + + } + + + it should "be able to unregister" in { + server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) + + server.typedActorsFactories.get("my-service-1") should not be (null) + server.unregisterTypedPerSessionActor("my-service-1") + server.typedActorsFactories.get("my-service-1") should be (null) + } + +} + diff --git a/akka-remote/src/test/scala/ticket/Ticket506Spec.scala b/akka-remote/src/test/scala/ticket/Ticket506Spec.scala new file mode 100644 index 0000000000..7a15c00a2e --- /dev/null +++ b/akka-remote/src/test/scala/ticket/Ticket506Spec.scala @@ -0,0 +1,61 @@ +package ticket + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers + +import akka.remote.{RemoteClient, RemoteNode, RemoteServer} +import akka.actor.{Actor, ActorRef} +import akka.serialization.RemoteActorSerialization +import akka.actor.Actor.actorOf + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +object State { + val latch = new CountDownLatch(1) +} + +case class RecvActorRef(bytes:Array[Byte]) + +class ActorRefService extends Actor { + import self._ + + def receive:Receive = { + case RecvActorRef(bytes) => + val ref = RemoteActorSerialization.fromBinaryToRemoteActorRef(bytes) + ref ! "hello" + case "hello" => + State.latch.countDown + } +} + +class Ticket506Spec extends Spec with ShouldMatchers { + val hostname:String = "localhost" + val port:Int = 9440 + + describe("a RemoteActorRef serialized") { + it("should be remotely usable") { + val s1,s2 = new RemoteServer + s1.start(hostname, port) + s2.start(hostname, port + 1) + + val a1,a2 = actorOf[ActorRefService] + a1.homeAddress = (hostname, port) + a2.homeAddress = (hostname, port+1) + + s1.register("service", a1) + s2.register("service", a2) + + // connect to the first server/service + val c1 = RemoteClient.actorFor("service", hostname, port) + + val bytes = RemoteActorSerialization.toRemoteActorRefProtocol(a2).toByteArray + c1 ! RecvActorRef(bytes) + + State.latch.await(1000, TimeUnit.MILLISECONDS) should be(true) + + RemoteClient.shutdownAll + s1.shutdown + s2.shutdown + } + } +} \ No newline at end of file diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index e4d1308d7a..8644311030 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -59,7 +59,7 @@ trait AkkaProject extends AkkaBaseProject { val akkaVersion = "1.0-SNAPSHOT" // convenience method - def akkaModule(module: String) = "akka" %% ("akka-" + module) % akkaVersion + def akkaModule(module: String) = "se.scalablesolutions.akka" % ("akka-" + module) % akkaVersion // akka actor dependency by default val akkaActor = akkaModule("actor") diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 581c2bdfef..291aefff6d 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -313,7 +313,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- // Miscellaneous // ------------------------------------------------------------------------------------------------------------------- - override def artifactID: String = this.name + override def disableCrossPaths = true override def mainClass = Some("akka.kernel.Main") @@ -383,7 +383,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def pomExtra = 2009 - http://akkasource.org + http://akka.io Scalable Solutions AB http://scalablesolutions.se @@ -956,7 +956,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------ class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject { - override def artifactID: String = this.name + override def disableCrossPaths = true lazy val sourceArtifact = Artifact(this.artifactID, "source", "jar", Some("sources"), Nil, None) lazy val docsArtifact = Artifact(this.artifactID, "doc", "jar", Some("docs"), Nil, None) override def runClasspath = super.runClasspath +++ (AkkaParentProject.this.info.projectPath / "config")