org.scala-tools
@@ -91,7 +91,7 @@
sjson.json
sjson
- 0.3
+ 0.4
diff --git a/akka-core/src/main/resources/META-INF/aop.xml b/akka-core/src/main/resources/META-INF/aop.xml
old mode 100755
new mode 100644
diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index 86e08c1a87..1858952f40 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -4,10 +4,9 @@
package se.scalablesolutions.akka.actor
-import java.net.InetSocketAddress
-
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
+import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
@@ -16,8 +15,8 @@ import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
+import java.net.InetSocketAddress
import java.lang.reflect.{InvocationTargetException, Method}
-import se.scalablesolutions.akka.dispatch.{Dispatchers, MessageDispatcher, FutureResult}
object Annotations {
import se.scalablesolutions.akka.annotation._
@@ -234,6 +233,7 @@ private[akka] sealed case class AspectInit(
* @author Jonas Bonér
*/
@Aspect("perInstance")
+// TODO: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false
private var target: Class[_] = _
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 7e038a7509..f8eb5e0347 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -12,7 +12,7 @@ import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
-import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder,RemoteServer, RemoteClient, RemoteRequestIdFactory}
+import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
@@ -72,18 +72,8 @@ object Actor extends Logging {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
- object Sender extends Actor {
+ object Sender{
implicit val Self: Option[Actor] = None
-
- def receive = {
- case unknown =>
- Actor.log.error(
- "Actor.Sender can't process messages. Received message [%s]." +
- "This error could occur if you either:" +
- "\n\t- Explicitly send a message to the Actor.Sender object." +
- "\n\t- Invoking the 'reply(..)' method or sending a message to the 'sender' reference " +
- "\n\t when you have sent the original request from a instance *not* being an actor.", unknown)
- }
}
/**
@@ -145,11 +135,15 @@ object Actor extends Logging {
* }
*
*/
- def spawn(body: => Unit): Actor = new Actor() {
- start
- body
- def receive = {
- case _ => throw new IllegalArgumentException("Actors created with 'actor(body: => Unit)' do not respond to messages.")
+ def spawn(body: => Unit): Actor = {
+ case object Spawn
+ new Actor() {
+ start
+ send(Spawn)
+ def receive = {
+ case Spawn => body
+ case _ => throw new IllegalArgumentException("Actors created with 'actor(body: => Unit)' do not respond to messages.")
+ }
}
}
@@ -225,7 +219,7 @@ trait Actor extends TransactionManagement {
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
- private[akka] var _contactAddress: Option[InetSocketAddress] = None
+ private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
// ====================================
@@ -445,6 +439,7 @@ trait Actor extends TransactionManagement {
_isShutDown = true
shutdown
ActorRegistry.unregister(this)
+ _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
}
@@ -480,7 +475,7 @@ trait Actor extends TransactionManagement {
*
*/
def !(message: Any)(implicit sender: Option[Actor]) = {
-//FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = {
+ //FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) postMessageToMailbox(message, sender)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
@@ -492,8 +487,7 @@ trait Actor extends TransactionManagement {
def send(message: Any) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) postMessageToMailbox(message, None)
- else throw new IllegalStateException(
- "Actor has not been started, you need to invoke 'actor.start' before using it")
+ else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
@@ -596,7 +590,7 @@ trait Actor extends TransactionManagement {
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
- "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network.")
+ "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
case Some(future) =>
future.completeWithResult(message)
}
@@ -634,18 +628,18 @@ trait Actor extends TransactionManagement {
def makeRemote(address: InetSocketAddress): Unit =
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else {
- _remoteAddress = Some(address)
- if(_contactAddress.isEmpty)
- setContactAddress(RemoteServer.HOSTNAME,RemoteServer.PORT)
+ _remoteAddress = Some(address)
+ RemoteClient.register(address.getHostName, address.getPort, uuid)
+ if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
}
/**
* Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
*/
- def setContactAddress(hostname: String, port: Int): Unit = setContactAddress(new InetSocketAddress(hostname, port))
+ def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
- def setContactAddress(address: InetSocketAddress): Unit = _contactAddress = Some(address)
+ def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address)
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
@@ -793,7 +787,7 @@ trait Actor extends TransactionManagement {
actor
}
- private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
+ protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -803,26 +797,24 @@ trait Actor extends TransactionManagement {
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
+
val id = registerSupervisorAsRemoteActor
- if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+ if(id.isDefined)
+ requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
- if (sender.isDefined) {
- requestBuilder.setSourceTarget(sender.get.getClass.getName)
- requestBuilder.setSourceUuid(sender.get.uuid)
- log.debug("Setting sending actor as %s, %s", sender.get.getClass.getName, _contactAddress)
+ if(sender.isDefined) {
+ val s = sender.get
+ requestBuilder.setSourceTarget(s.getClass.getName)
+ requestBuilder.setSourceUuid(s.uuid)
- if (sender.get._contactAddress.isDefined) {
- val addr = sender.get._contactAddress.get
- requestBuilder.setSourceHostname(addr.getHostName())
- requestBuilder.setSourcePort(addr.getPort())
- } else {
- // set the contact address to the default values from the
- // configuration file
- requestBuilder.setSourceHostname(Actor.HOSTNAME)
- requestBuilder.setSourcePort(Actor.PORT)
- }
+ val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
+
+ log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
+
+ requestBuilder.setSourceHostname(host)
+ requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
@@ -831,11 +823,13 @@ trait Actor extends TransactionManagement {
if (_isEventBased) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
- } else invocation.send
+ }
+ else
+ invocation.send
}
}
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(
+ protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
@@ -926,7 +920,7 @@ trait Actor extends TransactionManagement {
if (senderFuture.isEmpty) throw new StmException(
"Can't continue transaction in a one-way fire-forget message send" +
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
- "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
+ "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
atomic {
proceed
}
diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala
index 9a6dafcc58..63314ae051 100644
--- a/akka-core/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-core/src/main/scala/actor/ActorRegistry.scala
@@ -6,7 +6,8 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.util.Logging
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{ListBuffer, HashMap}
+import scala.reflect.Manifest
/**
* Registry holding all actor instances, mapped by class and the actor's id field (which can be set by user-code).
@@ -17,6 +18,30 @@ object ActorRegistry extends Logging {
private val actorsByClassName = new HashMap[String, List[Actor]]
private val actorsById = new HashMap[String, List[Actor]]
+ /**
+ * Returns all actors in the system.
+ */
+ def actors: List[Actor] = synchronized {
+ val all = new ListBuffer[Actor]
+ actorsById.values.foreach(all ++= _)
+ all.toList
+ }
+
+ /**
+ * Invokes a function for all actors.
+ */
+ def foreach(f: (Actor) => Unit) = actors.foreach(f)
+
+ /**
+ * Finds all actors that are subtypes of the class passed in as the Manifest argument.
+ */
+ def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = synchronized {
+ for (actor <- actors; if manifest.erasure.isAssignableFrom(actor.getClass)) yield actor.asInstanceOf[T]
+ }
+
+ /**
+ * Finds all actors of the exact type specified by the class passed in as the Class argument.
+ */
def actorsFor[T <: Actor](clazz: Class[T]): List[T] = synchronized {
actorsByClassName.get(clazz.getName) match {
case None => Nil
@@ -24,6 +49,9 @@ object ActorRegistry extends Logging {
}
}
+ /**
+ * Finds all actors that has a specific id.
+ */
def actorsFor(id : String): List[Actor] = synchronized {
actorsById.get(id) match {
case None => Nil
diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
index 85ec2dc8ca..1bacbf6f59 100644
--- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
+++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
@@ -16,10 +16,10 @@ import se.scalablesolutions.akka.Config._
trait BootableActorLoaderService extends Bootable with Logging {
val BOOT_CLASSES = config.getList("akka.boot")
- var applicationLoader: Option[ClassLoader] = None
+ lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader
- protected def runApplicationBootClasses : Option[ClassLoader] = {
- val loader =
+ protected def createApplicationClassLoader : Option[ClassLoader] = {
+ Some(
if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
val DEPLOY = HOME.get + "/deploy"
@@ -30,24 +30,21 @@ trait BootableActorLoaderService extends Bootable with Logging {
}
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
- new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
+ new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") ne null) {
getClass.getClassLoader
} else throw new IllegalStateException(
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
- for (clazz <- BOOT_CLASSES) {
+ )
+ }
+
+ abstract override def onLoad = {
+ for (loader <- applicationLoader; clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
- Some(loader)
- }
-
- abstract override def onLoad = {
- applicationLoader = runApplicationBootClasses
- super.onLoad
+ super.onLoad
}
- abstract override def onUnload = {
- ActorRegistry.shutdownAll
- }
+ abstract override def onUnload = ActorRegistry.shutdownAll
}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
index b3ca8d4cb7..429fdb61ec 100644
--- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
+++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
@@ -23,22 +23,26 @@ trait BootableRemoteActorService extends Bootable with Logging {
def startRemoteService = remoteServerThread.start
abstract override def onLoad = {
- super.onLoad //Make sure the actors facility is loaded before we load the remote service
if(config.getBool("akka.remote.server.service", true)){
+ log.info("Starting up Cluster Service")
+ Cluster.start
+ super.onLoad //Initialize BootableActorLoaderService before remote service
log.info("Initializing Remote Actors Service...")
startRemoteService
log.info("Remote Actors Service initialized!")
}
+ else
+ super.onLoad
+
}
abstract override def onUnload = {
- super.onUnload
- if (remoteServerThread.isAlive) {
+ super.onUnload
+ if (remoteServerThread.isAlive) {
log.info("Shutting down Remote Actors Service")
RemoteNode.shutdown
- log.info("Shutting down Cluster Service")
- Cluster.shutdown
remoteServerThread.join(1000)
}
+ Cluster.shutdown
}
}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala
index 61e42d4695..c2e9069a01 100644
--- a/akka-core/src/main/scala/remote/Cluster.scala
+++ b/akka-core/src/main/scala/remote/Cluster.scala
@@ -26,53 +26,220 @@ trait Cluster {
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
+
+ def foreach(f : (RemoteAddress) => Unit) : Unit
}
/**
- * Base class for cluster actor implementations.
+ * Base trait for Cluster implementations
+ *
+ * @author Viktor Klang
*/
-abstract class ClusterActor extends Actor with Cluster {
+trait ClusterActor extends Actor with Cluster {
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
}
/**
- * A singleton representing the Cluster.
+ * Companion object to ClusterActor that defines some common messages
+ *
+ * @author Viktor Klang
+ */
+private[remote] object ClusterActor {
+ sealed trait ClusterMessage
+
+ private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
+
+ private[remote] case class Node(endpoints: List[RemoteAddress])
+}
+
+/**
+ * Base class for cluster actor implementations.
+ * Provides most of the behavior out of the box
+ * only needs to be gives hooks into the underlaying cluster impl.
+ */
+abstract class BasicClusterActor extends ClusterActor {
+ import ClusterActor._
+
+ case class Message(sender : ADDR_T,msg : Array[Byte])
+ case object PapersPlease extends ClusterMessage
+ case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
+ case object Block extends ClusterMessage
+ case object Unblock extends ClusterMessage
+ case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage
+ case class Zombie(address: ADDR_T) extends ClusterMessage
+ case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
+ case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
+
+ type ADDR_T
+
+
+ @volatile private var local: Node = Node(Nil)
+ @volatile private var remotes: Map[ADDR_T, Node] = Map()
+
+ override def init = {
+ remotes = new HashMap[ADDR_T, Node]
+ }
+
+ override def shutdown = {
+ remotes = Map()
+ }
+
+ def receive = {
+ case v @ View(members) => {
+ // Not present in the cluster anymore = presumably zombies
+ // Nodes we have no prior knowledge existed = unknowns
+ val zombies = Set[ADDR_T]() ++ remotes.keySet -- members
+ val unknown = members -- remotes.keySet
+
+ log debug ("Updating view")
+ log debug ("Other memebers: [%s]",members)
+ log debug ("Zombies: [%s]",zombies)
+ log debug ("Unknowns: [%s]",unknown)
+
+ // Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
+ broadcast(zombies ++ unknown, PapersPlease)
+ remotes = remotes -- zombies
+ }
+
+ case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
+ log debug ("Killing Zombie Node: %s", x)
+ broadcast(x :: Nil, PapersPlease)
+ remotes = remotes - x
+ }
+
+ case rm @ RelayedMessage(_, _) => {
+ log debug ("Relaying message: %s", rm)
+ broadcast(rm)
+ }
+
+ case m @ Message(src,msg) => {
+ (Cluster.serializer in (msg, None)) match {
+
+ case PapersPlease => {
+ log debug ("Asked for papers by %s", src)
+ broadcast(src :: Nil, Papers(local.endpoints))
+
+ if (remotes.get(src).isEmpty) // If we were asked for papers from someone we don't know, ask them!
+ broadcast(src :: Nil, PapersPlease)
+ }
+
+ case Papers(x) => remotes = remotes + (src -> Node(x))
+
+ case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ send m)
+
+ case unknown => log debug ("Unknown message: %s", unknown.toString)
+ }
+ }
+
+ case RegisterLocalNode(s) => {
+ log debug ("RegisterLocalNode: %s", s)
+ local = Node(local.endpoints + s)
+ broadcast(Papers(local.endpoints))
+ }
+
+ case DeregisterLocalNode(s) => {
+ log debug ("DeregisterLocalNode: %s", s)
+ local = Node(local.endpoints - s)
+ broadcast(Papers(local.endpoints))
+ }
+ }
+
+ /**
+ * Implement this in a subclass to add node-to-node messaging
+ */
+ protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit
+
+ /**
+ * Implement this in a subclass to add node-to-many-nodes messaging
+ */
+ protected def toAllNodes(msg : Array[Byte]) : Unit
+
+ /**
+ * Sends the specified message to the given recipients using the serializer
+ * that's been set in the akka-conf
+ */
+ protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
+ lazy val m = Cluster.serializer out msg
+ for (r <- recipients) toOneNode(r,m)
+ }
+
+ /**
+ * Sends the specified message toall other nodes using the serializer
+ * that's been set in the akka-conf
+ */
+ protected def broadcast[T <: AnyRef](msg: T): Unit =
+ if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg)
+
+ /**
+ * Applies the given PartialFunction to all known RemoteAddresses
+ */
+ def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] =
+ remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress)
+
+ /**
+ * Applies the given function to all remote addresses known
+ */
+ def foreach(f : (RemoteAddress) => Unit) : Unit = remotes.values.toList.flatMap(_.endpoints).foreach(f)
+
+ /**
+ * Registers a local endpoint
+ */
+ def registerLocalNode(hostname: String, port: Int): Unit =
+ send(RegisterLocalNode(RemoteAddress(hostname, port)))
+
+ /**
+ * Deregisters a local endpoint
+ */
+ def deregisterLocalNode(hostname: String, port: Int): Unit =
+ send(DeregisterLocalNode(RemoteAddress(hostname, port)))
+
+ /**
+ * Broadcasts the specified message to all Actors of type Class on all known Nodes
+ */
+ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
+ send(RelayedMessage(to.getName, msg))
+}
+
+/**
+ * A singleton representing the Cluster.
*
* Loads a specified ClusterActor and delegates to that instance.
*/
object Cluster extends Cluster with Logging {
- private[remote] sealed trait ClusterMessage
- private[remote] case class Node(endpoints: List[RemoteAddress]) extends ClusterMessage
- private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
-
- private[remote] val clusterActor: Option[ClusterActor] = {
- val name = config.getString("akka.remote.cluster.actor","not defined")
- try {
- val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor]
- a.start
- Some(a)
- }
- catch {
- case e => log.error(e,"Couldn't load Cluster provider: [%s]",name)
- None
- }
- }
-
- private[remote] val supervisor: Option[Supervisor] = if (clusterActor.isDefined) {
- val sup = SupervisorFactory(
- SupervisorConfig(
- RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
- Supervise(clusterActor.get, LifeCycle(Permanent)) :: Nil)
- ).newInstance
- sup.start
- Some(sup)
- } else None
+ @volatile private[remote] var clusterActor: Option[ClusterActor] = None
+ @volatile private[remote] var supervisor: Option[Supervisor] = None
private[remote] lazy val serializer: Serializer = {
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
Class.forName(className).newInstance.asInstanceOf[Serializer]
}
+ private[remote] def createClusterActor : Option[ClusterActor] = {
+ val name = config.getString("akka.remote.cluster.actor")
+
+ try {
+ name map { fqn =>
+ val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
+ a.start
+ a
+ }
+ }
+ catch {
+ case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None
+ }
+ }
+
+ private[remote] def createSupervisor(actor : ClusterActor) : Option[Supervisor] = {
+ val sup = SupervisorFactory(
+ SupervisorConfig(
+ RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
+ Supervise(actor, LifeCycle(Permanent)) :: Nil)
+ ).newInstance
+ sup.start
+ Some(sup)
+ }
+
+
def name = clusterActor.map(_.name).getOrElse("No cluster")
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf))
@@ -82,6 +249,22 @@ object Cluster extends Cluster with Logging {
def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
+
+ def foreach(f : (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f))
- def shutdown = supervisor.foreach(_.stop)
+ def start : Unit = synchronized {
+ if(supervisor.isEmpty) {
+ for(actor <- createClusterActor;
+ sup <- createSupervisor(actor)) {
+ clusterActor = Some(actor)
+ supervisor = Some(sup)
+ }
+ }
+ }
+
+ def shutdown : Unit = synchronized {
+ supervisor.foreach(_.stop)
+ supervisor = None
+ clusterActor = None
+ }
}
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index e2f1e6d032..f97f014f06 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -4,8 +4,6 @@
package se.scalablesolutions.akka.remote
-import scala.collection.mutable.HashMap
-
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
@@ -13,6 +11,7 @@ import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config
import org.jboss.netty.channel._
+import group.DefaultChannelGroup
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
@@ -25,12 +24,15 @@ import java.net.{SocketAddress, InetSocketAddress}
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable.{HashSet, HashMap}
+
/**
* @author Jonas Bonér
*/
object RemoteRequestIdFactory {
private val nodeId = UUID.newUuid
private val id = new AtomicLong
+
def nextId: Long = id.getAndIncrement + nodeId
}
@@ -41,27 +43,121 @@ object RemoteClient extends Logging {
val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000)
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
- private val clients = new HashMap[String, RemoteClient]
+ private val remoteClients = new HashMap[String, RemoteClient]
+ private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
+
+ // FIXME: simplify overloaded methods when we have Scala 2.8
+
+ def actorFor(className: String, hostname: String, port: Int): Actor =
+ actorFor(className, className, 5000L, hostname, port)
+
+ def actorFor(actorId: String, className: String, hostname: String, port: Int): Actor =
+ actorFor(actorId, className, 5000L, hostname, port)
+
+ def actorFor(className: String, timeout: Long, hostname: String, port: Int): Actor =
+ actorFor(className, className, timeout, hostname, port)
+
+ def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): Actor = {
+ new Actor {
+ start
+ val remoteClient = RemoteClient.clientFor(hostname, port)
+
+ override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
+ val requestBuilder = RemoteRequest.newBuilder
+ .setId(RemoteRequestIdFactory.nextId)
+ .setTarget(className)
+ .setTimeout(timeout)
+ .setUuid(actorId)
+ .setIsActor(true)
+ .setIsOneWay(true)
+ .setIsEscaped(false)
+ if (sender.isDefined) {
+ val s = sender.get
+ requestBuilder.setSourceTarget(s.getClass.getName)
+ requestBuilder.setSourceUuid(s.uuid)
+ val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
+ requestBuilder.setSourceHostname(host)
+ requestBuilder.setSourcePort(port)
+ }
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ remoteClient.send(requestBuilder.build, None)
+ }
+
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout(
+ message: Any,
+ timeout: Long,
+ senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+ val requestBuilder = RemoteRequest.newBuilder
+ .setId(RemoteRequestIdFactory.nextId)
+ .setTarget(className)
+ .setTimeout(timeout)
+ .setUuid(actorId)
+ .setIsActor(true)
+ .setIsOneWay(false)
+ .setIsEscaped(false)
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ val future = remoteClient.send(requestBuilder.build, senderFuture)
+ if (future.isDefined) future.get
+ else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
+ }
+
+ def receive = {case _ => {}}
+ }
+ }
+
+ def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
- if (clients.contains(hash)) clients(hash)
+ if (remoteClients.contains(hash)) remoteClients(hash)
else {
val client = new RemoteClient(hostname, port)
client.connect
- clients += hash -> client
+ remoteClients += hash -> client
client
}
}
+ def shutdownClientFor(address: InetSocketAddress) = synchronized {
+ val hostname = address.getHostName
+ val port = address.getPort
+ val hash = hostname + ':' + port
+ if (remoteClients.contains(hash)) {
+ val client = remoteClients(hash)
+ client.shutdown
+ remoteClients - hash
+ }
+ }
+
/**
* Clean-up all open connections.
*/
- def shutdownAll() = synchronized {
- clients.foreach({case (addr, client) => client.shutdown})
- clients.clear
+ def shutdownAll = synchronized {
+ remoteClients.foreach({case (addr, client) => client.shutdown})
+ remoteClients.clear
+ }
+
+ private[akka] def register(hostname: String, port: Int, uuid: String) = synchronized {
+ actorsFor(RemoteServer.Address(hostname, port)) + uuid
+ }
+
+ // TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
+ private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized {
+ val set = actorsFor(RemoteServer.Address(hostname, port))
+ set - uuid
+ if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
+ }
+
+ private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[String] = {
+ val set = remoteActors.get(remoteServerAddress)
+ if (set.isDefined && (set.get ne null)) set.get
+ else {
+ val remoteActorSet = new HashSet[String]
+ remoteActors.put(remoteServerAddress, remoteActorSet)
+ remoteActorSet
+ }
}
}
@@ -69,9 +165,9 @@ object RemoteClient extends Logging {
* @author Jonas Bonér
*/
class RemoteClient(hostname: String, port: Int) extends Logging {
- val name = "RemoteClient@" + hostname
-
- @volatile private var isRunning = false
+ val name = "RemoteClient@" + hostname + "::" + port
+
+ @volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val supervisors = new ConcurrentHashMap[String, Actor]
@@ -80,6 +176,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
Executors.newCachedThreadPool)
private val bootstrap = new ClientBootstrap(channelFactory)
+ private val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName);
private val timer = new HashedWheelTimer
private val remoteAddress = new InetSocketAddress(hostname, port)
@@ -93,20 +190,22 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
if (!isRunning) {
connection = bootstrap.connect(remoteAddress)
log.info("Starting remote client connection to [%s:%s]", hostname, port)
-
// Wait until the connection attempt succeeds or fails.
- connection.awaitUninterruptibly
- if (!connection.isSuccess) log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
+ val channel = connection.awaitUninterruptibly.getChannel
+ openChannels.add(channel)
+ if (!connection.isSuccess) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
isRunning = true
}
}
def shutdown = synchronized {
- if (!isRunning) {
- connection.getChannel.getCloseFuture.awaitUninterruptibly
- channelFactory.releaseExternalResources
+ if (isRunning) {
+ isRunning = false
+ openChannels.close.awaitUninterruptibly
+ bootstrap.releaseExternalResources
+ timer.stop
+ log.info("%s has been shut down", name)
}
- timer.stop
}
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
@@ -116,11 +215,11 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFutureResult(request.getTimeout)
+ else new DefaultCompletableFutureResult(request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
- }
+ }
}
} else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
@@ -131,14 +230,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
def deregisterSupervisorForActor(actor: Actor) =
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
else supervisors.remove(actor._supervisor.get.uuid)
-
+
def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid)
}
/**
* @author Jonas Bonér
*/
-class RemoteClientPipelineFactory(name: String,
+class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
@@ -146,21 +245,21 @@ class RemoteClientPipelineFactory(name: String,
timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
- val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
- val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
- val lenPrep = new LengthFieldPrepender(4)
- val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance)
- val protobufEnc = new ProtobufEncoder
+ val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
+ val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
+ val lenPrep = new LengthFieldPrepender(4)
+ val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance)
+ val protobufEnc = new ProtobufEncoder
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
- case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
+ case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
case _ => None
}
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
- val stages: Array[ChannelHandler] =
- zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
- .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
+ val stages: Array[ChannelHandler] =
+ zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
+ .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
new StaticChannelPipeline(stages: _*)
}
}
@@ -168,7 +267,7 @@ class RemoteClientPipelineFactory(name: String,
/**
* @author Jonas Bonér
*/
-@ChannelPipelineCoverage { val value = "all" }
+@ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor],
@@ -176,7 +275,7 @@ class RemoteClientHandler(val name: String,
val remoteAddress: SocketAddress,
val timer: HashedWheelTimer,
val client: RemoteClient)
- extends SimpleChannelUpstreamHandler with Logging {
+ extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@@ -214,9 +313,9 @@ class RemoteClientHandler(val name: String,
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
- }
+ }
- override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
+ override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
log.debug("Remote client reconnecting to [%s]", remoteAddress)
@@ -245,7 +344,7 @@ class RemoteClientHandler(val name: String,
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
exceptionType
- .getConstructor(Array[Class[_]](classOf[String]): _*)
- .newInstance(exceptionMessage).asInstanceOf[Throwable]
+ .getConstructor(Array[Class[_]](classOf[String]): _*)
+ .newInstance(exceptionMessage).asInstanceOf[Throwable]
}
}
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 580cbc48cd..5288bd7bd4 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -33,6 +33,11 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
*
* RemoteNode.start(hostname, port)
*
+ *
+ * You can specify the class loader to use to load the remote actors.
+ *
+ * RemoteNode.start(hostname, port, classLoader)
+ *
*
* If you need to create more than one, then you can use the RemoteServer:
*
@@ -112,8 +117,6 @@ object RemoteServer {
private[remote] def unregister(hostname: String, port: Int) =
remoteServers.remove(Address(hostname, port))
-
- private[remote] def canShutDownCluster: Boolean = remoteServers.isEmpty
}
/**
@@ -179,14 +182,30 @@ class RemoteServer extends Logging {
}
}
- def shutdown = {
+ def shutdown = if (isRunning) {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
- openChannels.unbind
- openChannels.close.awaitUninterruptibly(1000)
+ openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
- if (RemoteServer.canShutDownCluster) Cluster.shutdown
+ }
+
+ // TODO: register active object in RemoteServer as well
+
+ /**
+ * Register Remote Actor by the Actor's 'id' field.
+ */
+ def register(actor: Actor) = if (isRunning) {
+ log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id)
+ RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
+ }
+
+ /**
+ * Register Remote Actor by a specific 'id' passed as argument.
+ */
+ def register(id: String, actor: Actor) = if (isRunning) {
+ log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
+ RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
}
}
@@ -255,8 +274,7 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage
- if (message eq null) throw new IllegalStateException(
- "Message in remote MessageEvent is null: " + event)
+ if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequest]) {
handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
}
diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala
index 3d1c05f423..3eb9315126 100644
--- a/akka-core/src/main/scala/serialization/Serializer.scala
+++ b/akka-core/src/main/scala/serialization/Serializer.scala
@@ -152,7 +152,14 @@ object Serializer {
// FIXME set ClassLoader on SJSONSerializer.SJSON
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = SJSONSerializer.SJSON.in(bytes)
- def in(json: String): AnyRef = SJSONSerializer.SJSON.in(json)
+ import scala.reflect.Manifest
+ def in[T](json: String)(implicit m: Manifest[T]): AnyRef = {
+ SJSONSerializer.SJSON.in(json)(m)
+ }
+
+ def in[T](bytes: Array[Byte])(implicit m: Manifest[T]): AnyRef = {
+ SJSONSerializer.SJSON.in(bytes)(m)
+ }
}
/**
diff --git a/akka-core/src/main/scala/stm/HashTrie.scala b/akka-core/src/main/scala/stm/HashTrie.scala
index c41ad85006..02b7ad2145 100644
--- a/akka-core/src/main/scala/stm/HashTrie.scala
+++ b/akka-core/src/main/scala/stm/HashTrie.scala
@@ -323,12 +323,12 @@ private[collection] class FullNode[K, +V](shift: Int)(table: Array[Node[K, V]])
val node = (table(i)(shift + 5, key, hash) = value)
if (node == table(i)) this else {
- val newTable = new Array[Node[K, A]](32)
- Array.copy(table, 0, newTable, 0, 32)
-
- newTable(i) = node
-
- new FullNode(shift)(newTable)
+ val newTable = new Array[Node[K, A]](32)
+ Array.copy(table, 0, newTable, 0, 32)
+
+ newTable(i) = node
+
+ new FullNode(shift)(newTable)
}
}
diff --git a/akka-core/src/main/scala/stm/ResultOrFailure.scala b/akka-core/src/main/scala/stm/ResultOrFailure.scala
index 99d88c0c5a..51ce6ddf68 100644
--- a/akka-core/src/main/scala/stm/ResultOrFailure.scala
+++ b/akka-core/src/main/scala/stm/ResultOrFailure.scala
@@ -32,10 +32,10 @@ import stm.Transaction
*
* scala> res0()
* java.lang.RuntimeException: Lets see what happens here...
- * at ResultOrFailure.apply(RefExcept.scala:11)
- * at .(:6)
- * at .()
- * at Re...
+ * at ResultOrFailure.apply(RefExcept.scala:11)
+ * at .(:6)
+ * at .()
+ * at Re...
*
*
* @author Jonas Bonér
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index b2e69c7d63..6003a89f89 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -5,6 +5,7 @@
package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.stm.Transaction.atomic
+import se.scalablesolutions.akka.stm.NoTransactionInScopeException
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.UUID
@@ -72,45 +73,87 @@ object TransactionalRef {
* @author Jonas Bonér
*/
class TransactionalRef[T] extends Transactional {
- implicit val txInitName = "TransactionalRef:Init"
import org.multiverse.api.ThreadLocalTransaction._
+
+ implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
private[this] val ref: Ref[T] = atomic { new Ref }
- def swap(elem: T) = ref.set(elem)
-
+ def swap(elem: T) = {
+ ensureIsInTransaction
+ ref.set(elem)
+ }
+
def get: Option[T] = {
+ ensureIsInTransaction
if (ref.isNull) None
else Some(ref.get)
}
- def getOrWait: T = ref.getOrAwait
+ def getOrWait: T = {
+ ensureIsInTransaction
+ ref.getOrAwait
+ }
def getOrElse(default: => T): T = {
+ ensureIsInTransaction
if (ref.isNull) default
else ref.get
}
- def isDefined: Boolean = !ref.isNull
+ def isDefined: Boolean = {
+ ensureIsInTransaction
+ !ref.isNull
+ }
- def isEmpty: Boolean = ref.isNull
+ def isEmpty: Boolean = {
+ ensureIsInTransaction
+ ref.isNull
+ }
- def map[B](f: T => B): Option[B] = if (isEmpty) None else Some(f(ref.get))
+ def map[B](f: T => B): Option[B] = {
+ ensureIsInTransaction
+ if (isEmpty) None else Some(f(ref.get))
+ }
- def flatMap[B](f: T => Option[B]): Option[B] = if (isEmpty) None else f(ref.get)
+ def flatMap[B](f: T => Option[B]): Option[B] = {
+ ensureIsInTransaction
+ if (isEmpty) None else f(ref.get)
+ }
- def filter(p: T => Boolean): Option[T] = if (isEmpty || p(ref.get)) Some(ref.get) else None
+ def filter(p: T => Boolean): Option[T] = {
+ ensureIsInTransaction
+ if (isEmpty || p(ref.get)) Some(ref.get) else None
+ }
- def foreach(f: T => Unit) { if (!isEmpty) f(ref.get) }
+ def foreach(f: T => Unit) {
+ ensureIsInTransaction
+ if (!isEmpty) f(ref.get)
+ }
- def elements: Iterator[T] = if (isEmpty) Iterator.empty else Iterator.fromValues(ref.get)
+ def elements: Iterator[T] = {
+ ensureIsInTransaction
+ if (isEmpty) Iterator.empty else Iterator.fromValues(ref.get)
+ }
- def toList: List[T] = if (isEmpty) List() else List(ref.get)
+ def toList: List[T] = {
+ ensureIsInTransaction
+ if (isEmpty) List() else List(ref.get)
+ }
- def toRight[X](left: => X) = if (isEmpty) Left(left) else Right(ref.get)
+ def toRight[X](left: => X) = {
+ ensureIsInTransaction
+ if (isEmpty) Left(left) else Right(ref.get)
+ }
- def toLeft[X](right: => X) = if (isEmpty) Right(right) else Left(ref.get)
+ def toLeft[X](right: => X) = {
+ ensureIsInTransaction
+ if (isEmpty) Right(right) else Left(ref.get)
+ }
+
+ private def ensureIsInTransaction =
+ if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
}
object TransactionalMap {
diff --git a/akka-core/src/test/scala/AllTest.scala b/akka-core/src/test/scala/AllTest.scala
index 37604e2e7a..fdf3351298 100644
--- a/akka-core/src/test/scala/AllTest.scala
+++ b/akka-core/src/test/scala/AllTest.scala
@@ -4,7 +4,7 @@ import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
-import se.scalablesolutions.akka.actor.{RemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest}
+import se.scalablesolutions.akka.actor.{ClientInitiatedRemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest}
object AllTest extends TestCase {
def suite(): Test = {
@@ -16,7 +16,7 @@ object AllTest extends TestCase {
suite.addTestSuite(classOf[ThreadBasedActorTest])
suite.addTestSuite(classOf[ReactorBasedSingleThreadEventDrivenDispatcherTest])
suite.addTestSuite(classOf[ReactorBasedThreadPoolEventDrivenDispatcherTest])
- suite.addTestSuite(classOf[RemoteActorTest])
+ suite.addTestSuite(classOf[ClientInitiatedRemoteActorTest])
suite.addTestSuite(classOf[InMemoryActorTest])
suite.addTestSuite(classOf[SchedulerTest])
//suite.addTestSuite(classOf[TransactionClasherTest])
diff --git a/akka-core/src/test/scala/RemoteActorTest.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala
similarity index 81%
rename from akka-core/src/test/scala/RemoteActorTest.scala
rename to akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala
index 6bb5d8e689..81fb4780da 100644
--- a/akka-core/src/test/scala/RemoteActorTest.scala
+++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala
@@ -46,7 +46,7 @@ class RemoteActorSpecActorAsyncSender extends Actor {
}
}
-class RemoteActorTest extends JUnitSuite {
+class ClientInitiatedRemoteActorTest extends JUnitSuite {
import Actor.Sender.Self
akka.Config.config
@@ -108,7 +108,7 @@ class RemoteActorTest extends JUnitSuite {
actor.start
val sender = new RemoteActorSpecActorAsyncSender
- sender.setContactAddress(HOSTNAME, PORT1)
+ sender.setReplyToAddress(HOSTNAME, PORT1)
sender.start
sender.send(actor)
Thread.sleep(1000)
@@ -116,26 +116,6 @@ class RemoteActorTest extends JUnitSuite {
actor.stop
}
- /*
- This test does not throw an exception since the
- _contactAddress is always defined via the
- global configuration if not set explicitly.
-
- @Test
- def shouldSendRemoteReplyException = {
- implicit val timeout = 500000000L
- val actor = new RemoteActorSpecActorBidirectional
- actor.makeRemote(HOSTNAME, PORT1)
- actor.start
-
- val sender = new RemoteActorSpecActorAsyncSender
- sender.start
- sender.send(actor)
- Thread.sleep(500)
- assert("exception" === Global.remoteReply)
- actor.stop
- }
- */
@Test
def shouldSendReceiveException = {
implicit val timeout = 500000000L
diff --git a/akka-core/src/test/scala/RemoteClientShutdownTest.scala b/akka-core/src/test/scala/RemoteClientShutdownTest.scala
new file mode 100644
index 0000000000..f6fbea1bb9
--- /dev/null
+++ b/akka-core/src/test/scala/RemoteClientShutdownTest.scala
@@ -0,0 +1,30 @@
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.remote.RemoteNode
+import se.scalablesolutions.akka.actor._
+import Actor.Sender.Self
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+class RemoteClientShutdownTest extends JUnitSuite {
+ @Test def shouldShutdownRemoteClient = {
+ RemoteNode.start("localhost", 9999)
+
+ var remote = new TravelingActor
+ remote.start
+ remote ! "sending a remote message"
+ remote.stop
+
+ Thread.sleep(1000)
+ RemoteNode.shutdown
+ println("======= REMOTE CLIENT SHUT DOWN FINE =======")
+ assert(true)
+ }
+}
+
+class TravelingActor extends RemoteActor("localhost", 9999) {
+ def receive = {
+ case _ => log.info("message received")
+ }
+}
diff --git a/akka-core/src/test/scala/SerializerTest.scala b/akka-core/src/test/scala/SerializerTest.scala
new file mode 100644
index 0000000000..e11e83a2f5
--- /dev/null
+++ b/akka-core/src/test/scala/SerializerTest.scala
@@ -0,0 +1,40 @@
+package se.scalablesolutions.akka.serialization
+
+import junit.framework.TestCase
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.{Test, Before, After}
+
+import scala.reflect.BeanInfo
+@BeanInfo
+case class Foo(foo: String) {
+ def this() = this(null)
+}
+
+@BeanInfo
+case class MyMessage(val id: String, val value: Tuple2[String, Int]) {
+ private def this() = this(null, null)
+}
+
+
+class SerializerTest extends JUnitSuite {
+ @Test
+ def shouldSerializeString = {
+ val f = Foo("debasish")
+ val json = Serializer.ScalaJSON.out(f)
+ assert(new String(json) == """{"foo":"debasish"}""")
+ val fo = Serializer.ScalaJSON.in[Foo](new String(json)).asInstanceOf[Foo]
+ assert(fo == f)
+ }
+
+ @Test
+ def shouldSerializeTuple2 = {
+ val message = MyMessage("id", ("hello", 34))
+ val json = Serializer.ScalaJSON.out(message)
+ assert(new String(json) == """{"id":"id","value":{"hello":34}}""")
+ val f = Serializer.ScalaJSON.in[MyMessage](new String(json)).asInstanceOf[MyMessage]
+ assert(f == message)
+ val g = Serializer.ScalaJSON.in[MyMessage](json).asInstanceOf[MyMessage]
+ assert(f == message)
+ }
+}
diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala
new file mode 100644
index 0000000000..f53f706f3b
--- /dev/null
+++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSample.scala
@@ -0,0 +1,34 @@
+package sample
+
+import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.remote.{RemoteClient, RemoteNode}
+import se.scalablesolutions.akka.util.Logging
+
+class HelloWorldActor extends Actor {
+ start
+ def receive = {
+ case "Hello" => reply("World")
+ }
+}
+
+object ServerInitiatedRemoteActorServer {
+
+ def run = {
+ RemoteNode.start("localhost", 9999)
+ RemoteNode.register("hello-service", new HelloWorldActor)
+ }
+
+ def main(args: Array[String]) = run
+}
+
+object ServerInitiatedRemoteActorClient extends Logging {
+
+ def run = {
+ val actor = RemoteClient.actorFor("hello-service", "localhost", 9999)
+ val result = actor !! "Hello"
+ log.info("Result from Remote Actor: %s", result)
+ }
+
+ def main(args: Array[String]) = run
+}
+
diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala
new file mode 100644
index 0000000000..2f1ef161c8
--- /dev/null
+++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala
@@ -0,0 +1,145 @@
+package se.scalablesolutions.akka.actor
+
+import java.util.concurrent.TimeUnit
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.{Test, Before, After}
+
+import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
+import se.scalablesolutions.akka.dispatch.Dispatchers
+
+object ServerInitiatedRemoteActorTest {
+ val HOSTNAME = "localhost"
+ val PORT = 9990
+ var server: RemoteServer = null
+
+ object Global {
+ var oneWay = "nada"
+ var remoteReply = "nada"
+ }
+
+ class RemoteActorSpecActorUnidirectional extends Actor {
+ dispatcher = Dispatchers.newThreadBasedDispatcher(this)
+ start
+
+ def receive = {
+ case "OneWay" =>
+ println("================== ONEWAY")
+ Global.oneWay = "received"
+ }
+ }
+
+ class RemoteActorSpecActorBidirectional extends Actor {
+ start
+ def receive = {
+ case "Hello" =>
+ reply("World")
+ case "Failure" =>
+ throw new RuntimeException("expected")
+ }
+ }
+
+ case class Send(actor: Actor)
+
+ class RemoteActorSpecActorAsyncSender extends Actor {
+ start
+ def receive = {
+ case Send(actor: Actor) =>
+ actor ! "Hello"
+ case "World" =>
+ Global.remoteReply = "replied"
+ }
+
+ def send(actor: Actor) {
+ this ! Send(actor)
+ }
+ }
+}
+
+class ServerInitiatedRemoteActorTest extends JUnitSuite {
+ import ServerInitiatedRemoteActorTest._
+
+ import Actor.Sender.Self
+ akka.Config.config
+
+ private val unit = TimeUnit.MILLISECONDS
+
+ @Before
+ def init() {
+ server = new RemoteServer()
+
+ server.start(HOSTNAME, PORT)
+
+ server.register(new RemoteActorSpecActorUnidirectional)
+ server.register(new RemoteActorSpecActorBidirectional)
+ server.register(new RemoteActorSpecActorAsyncSender)
+
+ Thread.sleep(1000)
+ }
+
+ // make sure the servers shutdown cleanly after the test has finished
+ @After
+ def finished() {
+ server.shutdown
+ RemoteClient.shutdownAll
+ Thread.sleep(1000)
+ }
+
+ @Test
+ def shouldSendOneWay = {
+ val actor = RemoteClient.actorFor(
+ "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorUnidirectional",
+ 5000L,
+ HOSTNAME, PORT)
+ val result = actor ! "OneWay"
+ Thread.sleep(1000)
+ assert("received" === Global.oneWay)
+ actor.stop
+ }
+
+ @Test
+ def shouldSendReplyAsync = {
+ val actor = RemoteClient.actorFor(
+ "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional",
+ 5000L,
+ HOSTNAME, PORT)
+ val result = actor !! "Hello"
+ assert("World" === result.get.asInstanceOf[String])
+ actor.stop
+ }
+
+ @Test
+ def shouldSendRemoteReply = {
+ implicit val timeout = 500000000L
+ val actor = RemoteClient.actorFor(
+ "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional",
+ timeout,
+ HOSTNAME, PORT)
+
+ val sender = new RemoteActorSpecActorAsyncSender
+ sender.setReplyToAddress(HOSTNAME, PORT)
+ sender.start
+ sender.send(actor)
+ Thread.sleep(1000)
+ assert("replied" === Global.remoteReply)
+ actor.stop
+ }
+
+ @Test
+ def shouldSendReceiveException = {
+ implicit val timeout = 500000000L
+ val actor = RemoteClient.actorFor(
+ "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional",
+ timeout,
+ HOSTNAME, PORT)
+ try {
+ actor !! "Failure"
+ fail("Should have thrown an exception")
+ } catch {
+ case e =>
+ assert("expected" === e.getMessage())
+ }
+ actor.stop
+ }
+}
+
\ No newline at end of file
diff --git a/akka-fun-test-java/pom.xml b/akka-fun-test-java/pom.xml
index 24868c06e4..beb19f25c5 100644
--- a/akka-fun-test-java/pom.xml
+++ b/akka-fun-test-java/pom.xml
@@ -11,7 +11,7 @@
akka
se.scalablesolutions.akka
- 0.6
+ 0.7-SNAPSHOT
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
index 70aa10b9b1..7fd3a65dfb 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
@@ -8,7 +8,7 @@ import se.scalablesolutions.akka.config.*;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
- import se.scalablesolutions.akka.Kernel;
+ import se.scalablesolutions.akka.Kernel;
import junit.framework.TestCase;
diff --git a/akka-fun-test-java/testng.xml b/akka-fun-test-java/testng.xml
old mode 100755
new mode 100644
diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml
old mode 100755
new mode 100644
index 5aab4575c5..4b1d114d45
--- a/akka-kernel/pom.xml
+++ b/akka-kernel/pom.xml
@@ -11,7 +11,7 @@
akka
se.scalablesolutions.akka
- 0.6
+ 0.7-SNAPSHOT
@@ -56,6 +56,11 @@
${project.groupId}
${project.version}
+