org.jboss.netty
netty
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 79ae48a9a7..5cbe7688cd 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -14,9 +14,7 @@ import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
-import se.scalablesolutions.akka.util.{HashCode, Logging}
-
-import org.codehaus.aspectwerkz.proxy.Uuid
+import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
import org.multiverse.api.ThreadLocalTransaction._
@@ -238,7 +236,7 @@ trait Actor extends TransactionManagement {
implicit protected val transactionFamily: String = this.getClass.getName
// Only mutable for RemoteServer in order to maintain identity across nodes
- private[akka] var _uuid = Uuid.newUuid.toString
+ private[akka] var _uuid = UUID.newUuid.toString
def uuid = _uuid
@@ -291,7 +289,7 @@ trait Actor extends TransactionManagement {
* Identifier for actor, does not have to be a unique one.
* Default is the class name.
*
- * This field is used for logging, AspectRegistry.actorsFor etc.
+ * This field is used for logging, AspectRegistry.actorsFor, identifier for remote actor in RemoteServer etc.
* But also as the identifier for persistence, which means that you can
* use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
@@ -817,8 +815,8 @@ trait Actor extends TransactionManagement {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
- .setTimeout(timeout)
- .setUuid(uuid)
+ .setTimeout(this.timeout)
+ .setUuid(this.id)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
@@ -862,8 +860,8 @@ trait Actor extends TransactionManagement {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
- .setTimeout(timeout)
- .setUuid(uuid)
+ .setTimeout(this.timeout)
+ .setUuid(this.id)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
index 5cbfb39645..85ec2dc8ca 100644
--- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
+++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
@@ -44,7 +44,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
abstract override def onLoad = {
applicationLoader = runApplicationBootClasses
- super.onLoad
+ super.onLoad
}
abstract override def onUnload = {
diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala
index 5aa3ec2183..3618573465 100644
--- a/akka-core/src/main/scala/actor/Supervisor.scala
+++ b/akka-core/src/main/scala/actor/Supervisor.scala
@@ -9,6 +9,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.util.Helpers._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.dispatch.Dispatchers
+import se.scalablesolutions.akka.remote.RemoteServer
import java.util.concurrent.ConcurrentHashMap
@@ -116,6 +117,9 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
actors.put(actor.getClass.getName, actor)
actor.lifeCycle = Some(lifeCycle)
startLink(actor)
+ remoteAddress.foreach(address => println("----- ADDING actor for " + address.get.hostname + " - " + address.get.port))
+ remoteAddress.foreach(address => println("----- " + RemoteServer.Address(address.hostname, address.port).hashCode))
+ remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.id, actor))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start
diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 46fbab8c30..9868edb98b 100644
--- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -6,8 +6,9 @@ package se.scalablesolutions.akka.config
import com.google.inject._
-import ScalaConfig._
+import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher}
+import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
//import org.apache.camel.impl.{DefaultCamelContext}
@@ -50,8 +51,8 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
val (proxy, targetInstance, component) =
activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException(
- "Class [" + clazz.getName + "] has not been put under supervision " +
- "(by passing in the config to the 'configure' and then invoking 'supervise') method"))
+ "Class [" + clazz.getName + "] has not been put under supervision" +
+ "\n(by passing in the config to the 'configure' and then invoking 'supervise') method"))
injector.injectMembers(targetInstance)
proxy.asInstanceOf[T]
}
@@ -104,11 +105,14 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
- if (component.remoteAddress.isDefined)
- Some(new InetSocketAddress(
- component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ if (remoteAddress.isDefined) {
+ RemoteServer
+ .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ .activeObjects.put(targetClass.getName, proxy)
+ }
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
@@ -121,11 +125,14 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
- if (component.remoteAddress.isDefined)
- Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
- val proxy = ActiveObject.newInstance(
- targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ if (remoteAddress.isDefined) {
+ RemoteServer
+ .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ .activeObjects.put(targetClass.getName, proxy)
+ }
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala
index 828b7cd516..e993573972 100644
--- a/akka-core/src/main/scala/config/Config.scala
+++ b/akka-core/src/main/scala/config/Config.scala
@@ -54,7 +54,7 @@ object ScalaConfig {
case object Permanent extends Scope
case object Temporary extends Scope
- case class RemoteAddress(hostname: String, port: Int) extends ConfigElement
+ case class RemoteAddress(val hostname: String, val port: Int) extends ConfigElement
class Component(_intf: Class[_],
val target: Class[_],
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index 995f53ef32..fa291d343b 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -85,13 +85,12 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val timer = new HashedWheelTimer
private val remoteAddress = new InetSocketAddress(hostname, port)
+ private[remote] var connection: ChannelFuture = _
- bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer))
+ bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
- private var connection: ChannelFuture = _
-
def connect = synchronized {
if (!isRunning) {
connection = bootstrap.connect(remoteAddress)
@@ -99,9 +98,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
// Wait until the connection attempt succeeds or fails.
connection.awaitUninterruptibly
- if (!connection.isSuccess) {
- log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
- }
+ if (!connection.isSuccess) log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
isRunning = true
}
}
@@ -148,7 +145,8 @@ class RemoteClientPipelineFactory(name: String,
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
- timer: HashedWheelTimer) extends ChannelPipelineFactory {
+ timer: HashedWheelTimer,
+ client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
@@ -166,7 +164,7 @@ class RemoteClientPipelineFactory(name: String,
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
- pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer))
+ pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client))
pipeline
}
}
@@ -180,7 +178,8 @@ class RemoteClientHandler(val name: String,
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
- val timer: HashedWheelTimer)
+ val timer: HashedWheelTimer,
+ val client: RemoteClient)
extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self
@@ -225,7 +224,11 @@ class RemoteClientHandler(val name: String,
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
log.debug("Remote client reconnecting to [%s]", remoteAddress)
- bootstrap.connect(remoteAddress)
+ client.connection = bootstrap.connect(remoteAddress)
+
+ // Wait until the connection attempt succeeds or fails.
+ client.connection.awaitUninterruptibly
+ if (!client.connection.isSuccess) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS)
}
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index a5ebad9af1..670bf52191 100755
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -7,6 +7,7 @@ package se.scalablesolutions.akka.remote
import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util._
@@ -62,6 +63,41 @@ object RemoteServer {
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
level
}
+
+ object Address {
+ def apply(hostname: String, port: Int) = new Address(hostname, port)
+ }
+ class Address(val hostname: String, val port: Int) {
+ override def hashCode: Int = {
+ var result = HashCode.SEED
+ result = HashCode.hash(result, hostname)
+ result = HashCode.hash(result, port)
+ result
+ }
+ override def equals(that: Any): Boolean = {
+ that != null &&
+ that.isInstanceOf[Address] &&
+ that.asInstanceOf[Address].hostname == hostname &&
+ that.asInstanceOf[Address].port == port
+ }
+ }
+
+ class RemoteActorSet {
+ val actors = new ConcurrentHashMap[String, Actor]
+ val activeObjects = new ConcurrentHashMap[String, AnyRef]
+ }
+
+ private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
+
+ def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
+ val set = remoteActorSets.get(remoteServerAddress)
+ if (set ne null) set
+ else {
+ val remoteActorSet = new RemoteActorSet
+ remoteActorSets.put(remoteServerAddress, remoteActorSet)
+ remoteActorSet
+ }
+ }
}
/**
@@ -84,7 +120,7 @@ class RemoteServer extends Logging {
val name = "RemoteServer@" + hostname + ":" + port
private var hostname = RemoteServer.HOSTNAME
- private var port = RemoteServer.PORT
+ private var port = RemoteServer.PORT
@volatile private var isRunning = false
@volatile private var isConfigured = false
@@ -96,7 +132,7 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
- private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-server")
+ private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
def start: Unit = start(None)
@@ -110,7 +146,12 @@ class RemoteServer extends Logging {
hostname = _hostname
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
- bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, openChannels, loader))
+ println("======= ADDING actor for " + hostname + " - " + port)
+ println("======= " + RemoteServer.Address(hostname, port).hashCode)
+ println("======= " + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.size)
+ val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
+ val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects)
+ bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
@@ -135,9 +176,11 @@ class RemoteServer extends Logging {
* @author Jonas Bonér
*/
class RemoteServerPipelineFactory(
- name: String,
- openChannels: ChannelGroup,
- loader: Option[ClassLoader]) extends ChannelPipelineFactory {
+ val name: String,
+ val openChannels: ChannelGroup,
+ val loader: Option[ClassLoader],
+ val actors: JMap[String, Actor],
+ val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
@@ -156,7 +199,7 @@ class RemoteServerPipelineFactory(
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder)
- pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader))
+ pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader, actors, activeObjects))
pipeline
}
}
@@ -167,13 +210,12 @@ class RemoteServerPipelineFactory(
@ChannelPipelineCoverage {val value = "all"}
class RemoteServerHandler(
val name: String,
- openChannels: ChannelGroup,
- val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
+ val openChannels: ChannelGroup,
+ val applicationLoader: Option[ClassLoader],
+ val actors: JMap[String, Actor],
+ val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
- private val activeObjects = new ConcurrentHashMap[String, AnyRef]
- private val actors = new ConcurrentHashMap[String, Actor]
-
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 758717c88c..b2e69c7d63 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -6,11 +6,10 @@ package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.collection._
+import se.scalablesolutions.akka.util.UUID
import org.multiverse.datastructures.refs.manual.Ref;
-import org.codehaus.aspectwerkz.proxy.Uuid
-
/**
* Example Scala usage:
*
@@ -75,7 +74,7 @@ object TransactionalRef {
class TransactionalRef[T] extends Transactional {
implicit val txInitName = "TransactionalRef:Init"
import org.multiverse.api.ThreadLocalTransaction._
- val uuid = Uuid.newUuid.toString
+ val uuid = UUID.newUuid.toString
private[this] val ref: Ref[T] = atomic { new Ref }
@@ -127,7 +126,7 @@ object TransactionalMap {
*/
class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
protected[this] val ref = TransactionalRef[HashTrie[K, V]]
- val uuid = Uuid.newUuid.toString
+ val uuid = UUID.newUuid.toString
ref.swap(new HashTrie[K, V])
@@ -179,7 +178,7 @@ object TransactionalVector {
* @author Jonas Bonér
*/
class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
- val uuid = Uuid.newUuid.toString
+ val uuid = UUID.newUuid.toString
private[this] val ref = TransactionalRef[Vector[T]]
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index 0ff83eeece..d328f2452d 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -65,7 +65,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
String str = conf.getInstance(String.class);
fail("exception should have been thrown");
} catch (Exception e) {
- assertEquals("Class [java.lang.String] has not been put under supervision (by passing in the config to the 'configure' and then invoking 'supervise') method", e.getMessage());
+ assertEquals(IllegalStateException.class, e.getClass());
}
}
diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml
index c32504f5a1..e01b7272fd 100755
--- a/akka-kernel/pom.xml
+++ b/akka-kernel/pom.xml
@@ -47,6 +47,11 @@
${project.groupId}
${project.version}
+