diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index bb3a6e740a..7fd3d76abc 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -53,7 +53,7 @@ object AMQP { initReconnectDelay: Long) = supervisor.newProducer( config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay) - + def newConsumer( config: ConnectionParameters, hostname: String, @@ -76,9 +76,9 @@ object AMQP { /** * @author Jonas Bonér */ - class AMQPSupervisor extends Actor { + class AMQPSupervisor extends Actor with Logging { import scala.collection.JavaConversions._ - + private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor] faultHandler = Some(OneForOneStrategy(5, 5000)) @@ -114,7 +114,7 @@ object AMQP { initReconnectDelay: Long, passive: Boolean, durable: Boolean, - autoDelete: Boolean, + autoDelete: Boolean, configurationArguments: Map[String, AnyRef]): Consumer = { val consumer = new Consumer( new ConnectionFactory(config), @@ -185,10 +185,10 @@ object AMQP { */ class MessageConsumerListener(val queueName: String, val routingKey: String, - val exclusive: Boolean, - val autoDelete: Boolean, + val exclusive: Boolean, + val autoDelete: Boolean, val isUsingExistingQueue: Boolean, - val actor: Actor) extends AMQPMessage { + val actor: Actor) extends AMQPMessage { /** * Creates a non-exclusive, non-autodelete message listener. */ @@ -238,8 +238,8 @@ object AMQP { object MessageConsumerListener { def apply(queueName: String, routingKey: String, - exclusive: Boolean, - autoDelete: Boolean, + exclusive: Boolean, + autoDelete: Boolean, isUsingExistingQueue: Boolean, actor: Actor) = new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor) @@ -356,7 +356,7 @@ object AMQP { val initReconnectDelay: Long, val passive: Boolean, val durable: Boolean, - val autoDelete: Boolean, + val autoDelete: Boolean, val configurationArguments: Map[java.lang.String, Object]) extends FaultTolerantConnectionActor { consumer: Consumer => @@ -365,7 +365,7 @@ object AMQP { faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) - + //FIXME use better strategy to convert scala.immutable.Map to java.util.Map private val jConfigMap = configurationArguments.foldLeft(new java.util.HashMap[String,Object]){ (m,kv) => { m.put(kv._1,kv._2); m } } @@ -424,13 +424,13 @@ object AMQP { private def registerListener(listener: MessageConsumerListener) = { log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName)) listeners.put(listener, listener) - + if (!listener.isUsingExistingQueue) { log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName) channel.queueDeclare( - listener.queueName, - passive, durable, - listener.exclusive, listener.autoDelete, + listener.queueName, + passive, durable, + listener.exclusive, listener.autoDelete, jConfigMap) } @@ -516,7 +516,7 @@ object AMQP { /** * @author Jonas Bonér */ - trait FaultTolerantConnectionActor extends Actor { + trait FaultTolerantConnectionActor extends Actor with Logging { val reconnectionTimer = new Timer var connection: Connection = _ @@ -548,7 +548,7 @@ object AMQP { def bindQueue(name: String) { channel.queueBind(name, exchangeName, name) } - + def createBindQueue: String = { val name = createQueue channel.queueBind(name, exchangeName, name) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index f466333388..5862fb835b 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -12,7 +12,7 @@ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} @@ -29,7 +29,7 @@ import java.util.concurrent.locks.{Lock, ReentrantLock} * Implements the Transactor abstraction. E.g. a transactional actor. *
* Equivalent to invoking themakeTransactionRequired method in the body of the ActorJonas Bonér
*/
trait Transactor extends Actor {
@@ -40,7 +40,7 @@ trait Transactor extends Actor {
* Extend this abstract class to create a remote actor.
*
* Equivalent to invoking the makeRemote(..) method in the body of the ActorJonas Bonér
*/
abstract class RemoteActor(hostname: String, port: Int) extends Actor {
@@ -72,7 +72,7 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
/**
* Utility class with factory methods for creating Actors.
- *
+ *
* @author Jonas Bonér
*/
object Actor extends Logging {
@@ -82,14 +82,14 @@ object Actor extends Logging {
val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender {
- @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'")
+ @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'")
object Self
}
/**
* Use to create an anonymous event-driven actor.
*
- * The actor is created with a 'permanent' life-cycle configuration, which means that
+ * The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
*
* The actor is started when created.
@@ -111,7 +111,7 @@ object Actor extends Logging {
/**
* Use to create an anonymous transactional event-driven actor.
*
- * The actor is created with a 'permanent' life-cycle configuration, which means that
+ * The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
*
* The actor is started when created.
@@ -131,7 +131,7 @@ object Actor extends Logging {
}
/**
- * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
+ * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
* which means that if the actor is supervised and dies it will *not* be restarted.
*
* The actor is started when created.
@@ -153,7 +153,7 @@ object Actor extends Logging {
/**
* Use to create an anonymous event-driven remote actor.
*
- * The actor is created with a 'permanent' life-cycle configuration, which means that
+ * The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
*
* The actor is started when created.
@@ -176,7 +176,7 @@ object Actor extends Logging {
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
*
- * The actor is created with a 'permanent' life-cycle configuration, which means that
+ * The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
*
* The actor is started when created.
@@ -203,10 +203,10 @@ object Actor extends Logging {
}
/**
- * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
+ * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
*
- * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
+ * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
*
@@ -242,7 +242,7 @@ object Actor extends Logging {
*
* @author Jonas Bonér
*/
-trait Actor extends TransactionManagement with Logging {
+trait Actor extends TransactionManagement {
implicit protected val self: Option[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
@@ -264,7 +264,7 @@ trait Actor extends TransactionManagement with Logging {
private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
/**
- * This lock ensures thread safety in the dispatching: only one message can
+ * This lock ensures thread safety in the dispatching: only one message can
* be dispatched at once on the actor.
*/
private[akka] val _dispatcherLock: Lock = new ReentrantLock
@@ -495,7 +495,7 @@ trait Actor extends TransactionManagement with Logging {
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
*
*
- * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
+ * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
* if invoked from within an Actor. If not then no sender is available.
*
* actor ! message
@@ -567,7 +567,7 @@ trait Actor extends TransactionManagement with Logging {
/**
* Forwards the message and passes the original sender actor as the sender.
*
- * Works with both '!' and '!!'.
+ * Works with both '!' and '!!'.
*/
def forward(message: Any)(implicit sender: Option[Actor] = None) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
@@ -807,7 +807,7 @@ trait Actor extends TransactionManagement with Logging {
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
joinTransaction(message)
-
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -829,11 +829,17 @@ trait Actor extends TransactionManagement with Logging {
requestBuilder.setSourceUuid(s.uuid)
val (host, port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
-
+
Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
+
+ if (RemoteServer.serverFor(host, port).isEmpty) {
+ val server = new RemoteServer
+ server.start(host, port)
+ }
+ RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
@@ -846,13 +852,13 @@ trait Actor extends TransactionManagement with Logging {
else invocation.send
}
}
-
+
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = {
joinTransaction(message)
-
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -882,7 +888,7 @@ trait Actor extends TransactionManagement with Logging {
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
// FIXME test to run bench without this trace call
- Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
+ Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
getTransactionSetInScope, toString, message)
getTransactionSetInScope.incParties
}
@@ -930,7 +936,7 @@ trait Actor extends TransactionManagement with Logging {
else {
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
if (isTransactor) {
- Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
+ Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
toString, messageHandle)
Some(createNewTransactionSet)
} else None
@@ -959,13 +965,13 @@ trait Actor extends TransactionManagement with Logging {
case e: IllegalStateException => {}
case e =>
// abort transaction set
- if (isTransactionSetInScope) try {
- getTransactionSetInScope.abort
+ if (isTransactionSetInScope) try {
+ getTransactionSetInScope.abort
} catch { case e: IllegalStateException => {} }
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
-
+
clearTransaction
if (topLevelTransaction) clearTransactionSet
diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala
index bba8e36623..be0e881eb6 100644
--- a/akka-core/src/main/scala/remote/Cluster.scala
+++ b/akka-core/src/main/scala/remote/Cluster.scala
@@ -63,9 +63,9 @@ trait Cluster {
*/
trait ClusterActor extends Actor with Cluster {
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
-
+
@volatile protected var serializer : Serializer = _
-
+
private[remote] def setSerializer(s : Serializer) : Unit = serializer = s
}
@@ -95,7 +95,7 @@ private[akka] object ClusterActor {
* Provides most of the behavior out of the box
* only needs to be gives hooks into the underlaying cluster impl.
*/
-abstract class BasicClusterActor extends ClusterActor {
+abstract class BasicClusterActor extends ClusterActor with Logging {
import ClusterActor._
type ADDR_T
@@ -235,13 +235,13 @@ abstract class BasicClusterActor extends ClusterActor {
object Cluster extends Cluster with Logging {
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
- @volatile private[remote] var clusterActor: Option[ClusterActor] = None
+ @volatile private[remote] var clusterActor: Option[ClusterActor] = None
private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = {
val name = config.getString("akka.remote.cluster.actor")
if (name.isEmpty) throw new IllegalArgumentException(
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
-
+
val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
serializer.classLoader = Some(loader)
try {
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 29dbc18b2a..1fcbbe8551 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -33,7 +33,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
*
* RemoteNode.start(hostname, port)
*
- *
+ *
* You can specify the class loader to use to load the remote actors.
*
* RemoteNode.start(hostname, port, classLoader)
@@ -87,15 +87,15 @@ object RemoteServer {
that.asInstanceOf[Address].port == port
}
}
-
+
class RemoteActorSet {
val actors = new ConcurrentHashMap[String, Actor]
- val activeObjects = new ConcurrentHashMap[String, AnyRef]
+ val activeObjects = new ConcurrentHashMap[String, AnyRef]
}
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
private val remoteServers = new ConcurrentHashMap[Address, RemoteServer]
-
+
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
val set = remoteActorSets.get(remoteServerAddress)
if (set ne null) set
@@ -106,7 +106,7 @@ object RemoteServer {
}
}
- private[remote] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
+ private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
val server = remoteServers.get(Address(hostname, port))
if (server eq null) None
else Some(server)
@@ -114,7 +114,7 @@ object RemoteServer {
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
remoteServers.put(Address(hostname, port), server)
-
+
private[remote] def unregister(hostname: String, port: Int) =
remoteServers.remove(Address(hostname, port))
}
@@ -141,8 +141,7 @@ class RemoteServer extends Logging {
private var hostname = RemoteServer.HOSTNAME
private var port = RemoteServer.PORT
- @volatile private var isRunning = false
- @volatile private var isConfigured = false
+ @volatile private var _isRunning = false
private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool,
@@ -153,6 +152,8 @@ class RemoteServer extends Logging {
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
+ def isRunning = _isRunning
+
def start: Unit = start(None)
def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
@@ -161,7 +162,7 @@ class RemoteServer extends Logging {
def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized {
try {
- if (!isRunning) {
+ if (!_isRunning) {
hostname = _hostname
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
@@ -174,20 +175,22 @@ class RemoteServer extends Logging {
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
- isRunning = true
+ _isRunning = true
Cluster.registerLocalNode(hostname, port)
- }
+ }
} catch {
case e => log.error(e, "Could not start up remote server")
}
}
- def shutdown = if (isRunning) {
- RemoteServer.unregister(hostname, port)
- openChannels.disconnect
- openChannels.close.awaitUninterruptibly
- bootstrap.releaseExternalResources
- Cluster.deregisterLocalNode(hostname, port)
+ def shutdown = synchronized {
+ if (_isRunning) {
+ RemoteServer.unregister(hostname, port)
+ openChannels.disconnect
+ openChannels.close.awaitUninterruptibly
+ bootstrap.releaseExternalResources
+ Cluster.deregisterLocalNode(hostname, port)
+ }
}
// TODO: register active object in RemoteServer as well
@@ -195,17 +198,21 @@ class RemoteServer extends Logging {
/**
* Register Remote Actor by the Actor's 'id' field.
*/
- def register(actor: Actor) = if (isRunning) {
- log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
- RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
+ def register(actor: Actor) = synchronized {
+ if (_isRunning) {
+ log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
+ RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
+ }
}
/**
- * Register Remote Actor by a specific 'id' passed as argument.
+ * Register Remote Actor by a specific 'id' passed as argument.
*/
- def register(id: String, actor: Actor) = if (isRunning) {
- log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
- RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
+ def register(id: String, actor: Actor) = synchronized {
+ if (_isRunning) {
+ log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
+ RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
+ }
}
}
@@ -232,9 +239,9 @@ class RemoteServerPipelineFactory(
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
case _ => None
}
- val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
+ val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
- val stages: Array[ChannelHandler] =
+ val stages: Array[ChannelHandler] =
zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer))
.getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer))
new StaticChannelPipeline(stages: _*)
@@ -294,7 +301,8 @@ class RemoteServerHandler(
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
-
+ actor.start
+
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
if (request.hasSourceHostname && request.hasSourcePort) {
@@ -389,19 +397,6 @@ class RemoteServerHandler(
}
}
- /*
- private def continueTransaction(request: RemoteRequest) = {
- val tx = request.tx
- if (tx.isDefined) {
- tx.get.reinit
- TransactionManagement.threadBoundTx.set(tx)
- setThreadLocalTransaction(tx.transaction)
- } else {
- TransactionManagement.threadBoundTx.set(None)
- setThreadLocalTransaction(null)
- }
- }
- */
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
val unescapedArgs = new Array[AnyRef](args.size)
val unescapedArgClasses = new Array[Class[_]](args.size)
@@ -410,7 +405,7 @@ class RemoteServerHandler(
val arg = args(i)
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) {
val argString = arg.asInstanceOf[String]
- val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
+ val proxyName = argString.replace(AW_PROXY_PREFIX, "")
val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject
unescapedArgClasses(i) = Class.forName(proxyName)
@@ -440,6 +435,11 @@ class RemoteServerHandler(
} else activeObjectOrNull
}
+ /**
+ * Creates a new instance of the actor with name, uuid and timeout specified as arguments.
+ * If actor already created then just return it from the registry.
+ * Does not start the actor.
+ */
private def createActor(name: String, uuid: String, timeout: Long): Actor = {
val actorOrNull = actors.get(uuid)
if (actorOrNull eq null) {
@@ -452,7 +452,6 @@ class RemoteServerHandler(
newInstance.timeout = timeout
newInstance._remoteAddress = None
actors.put(uuid, newInstance)
- newInstance.start
newInstance
} catch {
case e =>
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 401c2379ee..371d57ad88 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -51,10 +51,9 @@ object TransactionManagement extends TransactionManagement {
}
}
-trait TransactionManagement extends Logging {
+trait TransactionManagement {
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
- log.trace("Creating new transaction set")
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
TransactionManagement.transactionSet.set(Some(txSet))
txSet
@@ -67,12 +66,10 @@ trait TransactionManagement extends Logging {
if (tx.isDefined) TransactionManagement.transaction.set(tx)
private[akka] def clearTransactionSet = {
- log.trace("Clearing transaction set")
TransactionManagement.transactionSet.set(None)
}
private[akka] def clearTransaction = {
- log.trace("Clearing transaction")
TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
@@ -90,4 +87,4 @@ trait TransactionManagement extends Logging {
val option = TransactionManagement.transaction.get
(option ne null) && option.isDefined
}
-}
\ No newline at end of file
+}
diff --git a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala
index d58eed5a16..71d8c0b0e2 100644
--- a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala
+++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala
@@ -46,6 +46,27 @@ class RemoteActorSpecActorAsyncSender extends Actor {
}
}
+class SendOneWayAndReplyReceiverActor extends Actor {
+ def receive = {
+ case "Hello" =>
+ reply("World")
+ }
+}
+
+class SendOneWayAndReplySenderActor extends Actor {
+ var state: Option[AnyRef] = None
+ var sendTo: Actor = _
+ var latch: CountDownLatch = _
+
+ def sendOff = sendTo ! "Hello"
+
+ def receive = {
+ case msg: AnyRef =>
+ state = Some(msg)
+ latch.countDown
+ }
+}
+
class ClientInitiatedRemoteActorSpec extends JUnitSuite {
se.scalablesolutions.akka.config.Config.config
@@ -82,11 +103,30 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
val actor = new RemoteActorSpecActorUnidirectional
actor.makeRemote(HOSTNAME, PORT1)
actor.start
- val result = actor ! "OneWay"
+ actor ! "OneWay"
assert(Global.oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
+ @Test
+ def shouldSendOneWayAndReceiveReply = {
+ val actor = new SendOneWayAndReplyReceiverActor
+ actor.makeRemote(HOSTNAME, PORT1)
+ actor.start
+ val latch = new CountDownLatch(1)
+ val sender = new SendOneWayAndReplySenderActor
+ sender.setReplyToAddress(HOSTNAME, PORT2)
+ sender.sendTo = actor
+ sender.latch = latch
+ sender.start
+ sender.sendOff
+ assert(latch.await(1, TimeUnit.SECONDS))
+ assert(sender.state.isDefined === true)
+ assert("World" === sender.state.get.asInstanceOf[String])
+ actor.stop
+ sender.stop
+ }
+
@Test
def shouldSendReplyAsync = {
val actor = new RemoteActorSpecActorBidirectional
@@ -101,6 +141,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
def shouldSendRemoteReply = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
+ actor.setReplyToAddress(HOSTNAME, PORT2)
actor.makeRemote(HOSTNAME, PORT2)
actor.start
@@ -128,3 +169,4 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor.stop
}
}
+
diff --git a/akka-core/src/test/scala/RemoteSupervisorSpec.scala b/akka-core/src/test/scala/RemoteSupervisorSpec.scala
index ae3cfce6df..3790a88268 100644
--- a/akka-core/src/test/scala/RemoteSupervisorSpec.scala
+++ b/akka-core/src/test/scala/RemoteSupervisorSpec.scala
@@ -77,7 +77,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
new Thread(new Runnable() {
def run = {
- RemoteNode.start
+ RemoteNode.start(RemoteServer.HOSTNAME, 9988)
}
}).start
Thread.sleep(1000)
@@ -335,7 +335,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
// implementation of the Actors we want to use.
pingpong1 = new RemotePingPong1Actor
- pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
SupervisorConfig(
@@ -350,7 +350,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = new RemotePingPong1Actor
- pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
SupervisorConfig(
@@ -364,11 +364,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
- pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor
- pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor
- pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
SupervisorConfig(
@@ -390,11 +390,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
- pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor
- pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor
- pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
SupervisorConfig(
@@ -416,11 +416,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
- pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor
- pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor
- pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
SupervisorConfig(
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
index c82b29afc9..ba39a108b3 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
@@ -65,7 +65,7 @@ class Transformer(producer: Actor) extends Actor {
}
}
-class Subscriber(name:String, uri: String) extends Actor with Consumer {
+class Subscriber(name:String, uri: String) extends Actor with Consumer with Logging {
def endpointUri = uri
protected def receive = {
@@ -89,4 +89,4 @@ class PublisherBridge(uri: String, publisher: Actor) extends Actor with Consumer
reply("message published")
}
}
-}
\ No newline at end of file
+}
diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
index 7f1070ec7a..c57075125a 100644
--- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
@@ -50,7 +50,7 @@ class Boot {
*/
@Path("/scalacount")
class SimpleService extends Transactor {
-
+
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
@@ -127,7 +127,7 @@ class PersistentSimpleService extends Transactor {
}
@Path("/chat")
-class Chat extends Actor {
+class Chat extends Actor with Logging {
case class Chat(val who: String, val what: String, val msg: String)
@Suspend
diff --git a/akka-security/src/main/scala/Security.scala b/akka-security/src/main/scala/Security.scala
index 3552a2e77d..6b47eaacb3 100644
--- a/akka-security/src/main/scala/Security.scala
+++ b/akka-security/src/main/scala/Security.scala
@@ -87,7 +87,7 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
override def filter(request: ContainerRequest): ContainerRequest =
rolesAllowed match {
case Some(roles) => {
- val result : Option[AnyRef] = authenticator !! Authenticate(request, roles)
+ val result : Option[AnyRef] = authenticator !! Authenticate(request, roles)
result match {
case Some(OK) => request
case Some(r) if r.isInstanceOf[Response] =>
@@ -245,7 +245,7 @@ trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] {
* class to create an authenticator. Don't forget to set the authenticator FQN in the
* rest-part of the akka config
*/
-trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] {
+trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging {
import Enc._
private object InvalidateNonces
@@ -346,7 +346,7 @@ import org.ietf.jgss.GSSContext
import org.ietf.jgss.GSSCredential
import org.ietf.jgss.GSSManager
-trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] {
+trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] with Logging {
override def unauthorized =
Response.status(401).header("WWW-Authenticate", "Negotiate").build
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 6a916dc441..12ad4f5a51 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -10,12 +10,12 @@
* Simple and high-level abstractions for concurrency and parallelism.
* Asynchronous, non-blocking and highly performant event-driven programming model.
* Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM).
- * Supervision hierarchies with let-it-crash semantics. For writing highly
+ * Supervision hierarchies with let-it-crash semantics. For writing highly
fault-tolerant systems that never stop, systems that self-heal.
* Software Transactional Memory (STM). (Distributed transactions coming soon).
- * Transactors: combine actors and STM into transactional actors. Allows you to
+ * Transactors: combine actors and STM into transactional actors. Allows you to
compose atomic message flows with automatic rollback and retry.
- * Remoting: highly performant distributed actors with remote supervision and
+ * Remoting: highly performant distributed actors with remote supervision and
error management.
* Cluster membership management.
@@ -31,7 +31,7 @@
* Spring: Spring integration
* Guice: Guice integration
* Microkernel: Run Akka as a stand-alone kernel.
-
+
-------------------------------------------------------------------------------*/
import sbt._
@@ -50,7 +50,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val CASSANDRA_VERSION = "0.5.0"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.0.1-for-scala-2.8.0.Beta1-with-test-interfaces-0.3-SNAPSHOT"
-
+
// ------------------------------------------------------------
lazy val akkaHome = {
val home = System.getenv("AKKA_HOME")
@@ -97,10 +97,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_))
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterParentProject(_))
lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_core)
- lazy val akka_servlet = project("akka-servlet", "akka-servlet", new AkkaServletProject(_),
+ lazy val akka_servlet = project("akka-servlet", "akka-servlet", new AkkaServletProject(_),
akka_core, akka_rest, akka_camel)
- lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
- akka_core, akka_rest, akka_spring, akka_camel, akka_persistence,
+ lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
+ akka_core, akka_rest, akka_spring, akka_camel, akka_persistence,
akka_cluster, akka_amqp, akka_security, akka_comet, akka_patterns, akka_servlet)
// functional tests in java
@@ -113,11 +113,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// create executable jar
override def mainClass = Some("se.scalablesolutions.akka.kernel.Main")
- override def packageOptions =
+ override def packageOptions =
manifestClassPath.map(cp => ManifestAttributes(
- (Attributes.Name.CLASS_PATH, cp),
- (IMPLEMENTATION_TITLE, "Akka"),
- (IMPLEMENTATION_URL, "http://akkasource.org"),
+ (Attributes.Name.CLASS_PATH, cp),
+ (IMPLEMENTATION_TITLE, "Akka"),
+ (IMPLEMENTATION_URL, "http://akkasource.org"),
(IMPLEMENTATION_VENDOR, "The Akka Project")
)).toList :::
getMainClass(false).map(MainClass(_)).toList
@@ -126,7 +126,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
override def manifestClassPath = Some(allArtifacts.getFiles
.filter(_.getName.endsWith(".jar"))
.map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName)
- .mkString(" ") +
+ .mkString(" ") +
" scala-library.jar" +
" dist/akka-util_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-util-java_%s-%s.jar".format(buildScalaVersion, version) +
@@ -146,7 +146,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
" dist/akka-servlet_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-kernel_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version)
- )
+ )
// ------------------------------------------------------------
// publishing
@@ -159,7 +159,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// Credentials(Path.userHome / ".akka_publish_credentials", log)
- //override def documentOptions = encodingUtf8.map(SimpleDocOption(_))
+ //override def documentOptions = encodingUtf8.map(SimpleDocOption(_))
override def packageDocsJar = defaultJarPath("-doc.jar")
override def packageSrcJar= defaultJarPath("-src.jar")
override def packageToPublishActions = super.packageToPublishActions ++ Seq(packageDocs, packageSrc)
@@ -291,19 +291,19 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val google_coll = "com.google.collections" % "google-collections" % "1.0" % "test"
val slf4j = "org.slf4j" % "slf4j-api" % "1.5.8" % "test"
val slf4j_log4j = "org.slf4j" % "slf4j-log4j12" % "1.5.8" % "test"
- val log4j = "log4j" % "log4j" % "1.2.15" % "test"
+ val log4j = "log4j" % "log4j" % "1.2.15" % "test"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
lazy val dist = deployTask(info, distPath, true, true, true) dependsOn(`package`, packageDocs, packageSrc) describedAs("Deploying")
}
class AkkaPersistenceParentProject(info: ProjectInfo) extends ParentProject(info) {
- lazy val akka_persistence_common = project("akka-persistence-common", "akka-persistence-common",
+ lazy val akka_persistence_common = project("akka-persistence-common", "akka-persistence-common",
new AkkaPersistenceCommonProject(_), akka_core)
- lazy val akka_persistence_redis = project("akka-persistence-redis", "akka-persistence-redis",
+ lazy val akka_persistence_redis = project("akka-persistence-redis", "akka-persistence-redis",
new AkkaRedisProject(_), akka_persistence_common)
- lazy val akka_persistence_mongo = project("akka-persistence-mongo", "akka-persistence-mongo",
+ lazy val akka_persistence_mongo = project("akka-persistence-mongo", "akka-persistence-mongo",
new AkkaMongoProject(_), akka_persistence_common)
- lazy val akka_persistence_cassandra = project("akka-persistence-cassandra", "akka-persistence-cassandra",
+ lazy val akka_persistence_cassandra = project("akka-persistence-cassandra", "akka-persistence-cassandra",
new AkkaCassandraProject(_), akka_persistence_common)
}
@@ -319,9 +319,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaClusterParentProject(info: ProjectInfo) extends ParentProject(info) {
- lazy val akka_cluster_jgroups = project("akka-cluster-jgroups", "akka-cluster-jgroups",
+ lazy val akka_cluster_jgroups = project("akka-cluster-jgroups", "akka-cluster-jgroups",
new AkkaJgroupsProject(_), akka_core)
- lazy val akka_cluster_shoal = project("akka-cluster-shoal", "akka-cluster-shoal",
+ lazy val akka_cluster_shoal = project("akka-cluster-shoal", "akka-cluster-shoal",
new AkkaShoalProject(_), akka_core)
}
@@ -398,18 +398,18 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) {
- lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
+ lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
new AkkaSampleChatProject(_), akka_kernel)
- lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift",
+ lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift",
new AkkaSampleLiftProject(_), akka_kernel)
- lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",
+ lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",
new AkkaSampleRestJavaProject(_), akka_kernel)
- lazy val akka_sample_rest_scala = project("akka-sample-rest-scala", "akka-sample-rest-scala",
+ lazy val akka_sample_rest_scala = project("akka-sample-rest-scala", "akka-sample-rest-scala",
new AkkaSampleRestScalaProject(_), akka_kernel)
- lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel",
+ lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel",
new AkkaSampleCamelProject(_), akka_kernel)
- lazy val akka_sample_security = project("akka-sample-security", "akka-sample-security",
- new AkkaSampleSecurityProject(_), akka_kernel)
+ lazy val akka_sample_security = project("akka-sample-security", "akka-sample-security",
+ new AkkaSampleSecurityProject(_), akka_kernel)
}
// ------------------------------------------------------------
@@ -432,9 +432,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
descendents(path("lib") ##, "*.jar") +++
descendents(configurationPath(Configurations.Compile) ##, "*.jar"))
.filter(jar => // remove redundant libs
- !jar.toString.endsWith("stax-api-1.0.1.jar") ||
+ !jar.toString.endsWith("stax-api-1.0.1.jar") ||
!jar.toString.endsWith("scala-library-2.7.7.jar")
- )
+ )
}
def deployTask(info: ProjectInfo, toDir: Path, genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task {
@@ -444,7 +444,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// FIXME need to find out a way to grab these paths from the sbt system
- // binary
+ // binary
if (genJar) {
val JAR_FILE_NAME = moduleName + "_%s-%s.jar".format(buildScalaVersion, version)
val JAR_FILE_PATH = projectPath + "/target/scala_%s/".format(buildScalaVersion) + JAR_FILE_NAME
@@ -463,7 +463,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
log.info("Deploying docs " + toDoc)
FileUtilities.copyFile(fromDoc, toDoc, log)
}
-
+
// sources
if (genSource) {
val SRC_FILE_NAME = moduleName + "_%s-%s-%s.jar".format(buildScalaVersion, version, "src")