diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala
index 3167b02099..b0f90c3e31 100644
--- a/akka-actors/src/main/scala/actor/ActiveObject.scala
+++ b/akka-actors/src/main/scala/actor/ActiveObject.scala
@@ -7,11 +7,11 @@ package se.scalablesolutions.akka.actor
import java.lang.reflect.{InvocationTargetException, Method}
import java.net.InetSocketAddress
-import reactor.{MessageDispatcher, FutureResult}
-import nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
-import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
-import config.ScalaConfig._
-import util._
+import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
+import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
+import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util._
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala
index f89edefc8a..3a0aed800e 100644
--- a/akka-actors/src/main/scala/actor/Actor.scala
+++ b/akka-actors/src/main/scala/actor/Actor.scala
@@ -8,7 +8,7 @@ import java.net.InetSocketAddress
import java.util.HashSet
import se.scalablesolutions.akka.Config._
-import se.scalablesolutions.akka.reactor._
+import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
@@ -514,10 +514,10 @@ trait Actor extends Logging with TransactionManagement {
try {
tryToCommitTransactions
- if (currentTransaction.get.isDefined && !currentTransaction.get.get.isActive) {
- currentTransaction.set(None) // need to clear currentTransaction before call to supervisor
- setThreadLocalTransaction(null)
- }
+ //if (currentTransaction.get.isDefined && !currentTransaction.get.get.isActive) {
+ // currentTransaction.set(None) // need to clear currentTransaction before call to supervisor
+ // setThreadLocalTransaction(null)
+ //}
if (isInExistingTransaction) joinExistingTransaction
else if (isTransactional) startNewTransaction(messageHandle)
@@ -535,16 +535,15 @@ trait Actor extends Logging with TransactionManagement {
val tx = currentTransaction.get
rollback(tx)
- if (!(tx.isDefined && tx.get.isTopLevel)) {
+ if (tx.isDefined && tx.get.isTopLevel) {
val done = tx.get.retry
if (done) {
if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace
}
}
- currentTransaction.set(None) // need to clear currentTransaction before call to supervisor
- setThreadLocalTransaction(null)
-
+ clearTransaction
+
case e =>
e.printStackTrace
decrementTransaction
@@ -555,8 +554,7 @@ trait Actor extends Logging with TransactionManagement {
if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace
- currentTransaction.set(None) // need to clear currentTransaction before call to supervisor
- setThreadLocalTransaction(null)
+ clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
@@ -564,8 +562,7 @@ trait Actor extends Logging with TransactionManagement {
} finally {
if (currentTransaction.get.isDefined && currentTransaction.get.get.isAborted) removeTransactionIfTopLevel(currentTransaction.get.get)
else tryToPrecommitTransactions
- currentTransaction.set(None)
- setThreadLocalTransaction(null)
+ clearTransaction
}
}
diff --git a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 9c8e57578d..060b1234a0 100644
--- a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -7,8 +7,8 @@ package se.scalablesolutions.akka.config
import com.google.inject._
import ScalaConfig._
-import akka.actor.{Supervisor, ActiveObjectFactory, Dispatcher}
-import akka.util.Logging
+import se.scalablesolutions.akka.actor.{Supervisor, ActiveObjectFactory, Dispatcher}
+import se.scalablesolutions.akka.util.Logging
//import org.apache.camel.impl.{DefaultCamelContext}
//import org.apache.camel.{CamelContext, Endpoint, Routes}
diff --git a/akka-actors/src/main/scala/config/Config.scala b/akka-actors/src/main/scala/config/Config.scala
index f9a6f25d1f..bb548efef9 100644
--- a/akka-actors/src/main/scala/config/Config.scala
+++ b/akka-actors/src/main/scala/config/Config.scala
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config
import reflect.BeanProperty
import actor.Actor
-import reactor.MessageDispatcher
+import dispatch.MessageDispatcher
/**
* Configuration classes - not to be used as messages.
diff --git a/akka-actors/src/main/scala/reactor/Dispatchers.scala b/akka-actors/src/main/scala/dispatch/Dispatchers.scala
similarity index 97%
rename from akka-actors/src/main/scala/reactor/Dispatchers.scala
rename to akka-actors/src/main/scala/dispatch/Dispatchers.scala
index 30846752b6..4c65957954 100644
--- a/akka-actors/src/main/scala/reactor/Dispatchers.scala
+++ b/akka-actors/src/main/scala/dispatch/Dispatchers.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka.reactor
+package se.scalablesolutions.akka.dispatch
import actor.Actor
diff --git a/akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/akka-actors/src/main/scala/dispatch/EventBasedSingleThreadDispatcher.scala
similarity index 97%
rename from akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
rename to akka-actors/src/main/scala/dispatch/EventBasedSingleThreadDispatcher.scala
index de200a007d..39feb82603 100644
--- a/akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/EventBasedSingleThreadDispatcher.scala
@@ -8,7 +8,7 @@
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
-package se.scalablesolutions.akka.reactor
+package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, List}
diff --git a/akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala
similarity index 97%
rename from akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
rename to akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala
index 18a9286402..4de755be52 100644
--- a/akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala
@@ -2,19 +2,21 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka.reactor
+package se.scalablesolutions.akka.dispatch
import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
+
import java.util.{Collection, HashSet, HashMap, LinkedList, List}
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
- * val endpoint = AMQP.newEndpoint(CONFIG, HOSTNAME, PORT, EXCHANGE, ExchangeType.Direct, Serializer.Java, None, 100)
+ * val endpoint = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, EXCHANGE, ExchangeType.Direct, Serializer.Java, None, 100)
*
- * endpoint ! MessageConsumer(QUEUE, ROUTING_KEY, new Actor() {
+ * endpoint ! MessageConsumerListener(QUEUE, ROUTING_KEY, new Actor() {
* def receive: PartialFunction[Any, Unit] = {
* case Message(payload, _, _, _, _) => log.debug("Received message: %s", payload)
* }
* })
*
- * val client = AMQP.newClient(CONFIG, HOSTNAME, PORT, EXCHANGE, Serializer.Java, None, None, 100)
- * client ! Message("Hi", ROUTING_KEY)
+ * val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, EXCHANGE, Serializer.Java, None, None, 100)
+ * producer ! Message("Hi", ROUTING_KEY)
*
*
* @author Jonas Bonér
@@ -57,10 +57,10 @@ object AMQP extends Actor {
new Message(payload, routingKey, false, false, null)
}
- case class MessageConsumer(queueName: String, routingKey: String, actor: Actor) {
+ case class MessageConsumerListener(queueName: String, routingKey: String, actor: Actor) {
var tag: Option[String] = None
- override def toString(): String = "MessageConsumer[actor=" + actor + ", queue=" + queueName + ", routingKey=" + routingKey + "]"
+ override def toString(): String = "MessageConsumerListener[actor=" + actor + ", queue=" + queueName + ", routingKey=" + routingKey + "]"
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
@@ -71,13 +71,13 @@ object AMQP extends Actor {
override def equals(that: Any): Boolean = synchronized {
that != null &&
- that.isInstanceOf[MessageConsumer] &&
- that.asInstanceOf[MessageConsumer].queueName== queueName &&
- that.asInstanceOf[MessageConsumer].routingKey == routingKey
+ that.isInstanceOf[MessageConsumerListener] &&
+ that.asInstanceOf[MessageConsumerListener].queueName== queueName &&
+ that.asInstanceOf[MessageConsumerListener].routingKey == routingKey
}
}
- case class CancelMessageConsumer(consumer: MessageConsumer)
+ case class CancelMessageConsumerListener(consumer: MessageConsumerListener)
case class Reconnect(delay: Long)
case class Failure(cause: Throwable)
case object Stop
@@ -108,7 +108,7 @@ object AMQP extends Actor {
}
}
- def newClient(
+ def newProducer(
config: ConnectionParameters,
hostname: String,
port: Int,
@@ -116,8 +116,8 @@ object AMQP extends Actor {
serializer: Serializer,
returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener],
- initReconnectDelay: Long): Client = {
- val client = new Client(
+ initReconnectDelay: Long): Producer = {
+ val producer = new Producer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
@@ -125,11 +125,11 @@ object AMQP extends Actor {
returnListener,
shutdownListener,
initReconnectDelay)
- startLink(client)
- client
+ startLink(producer)
+ producer
}
- def newEndpoint(
+ def newConsumer(
config: ConnectionParameters,
hostname: String,
port: Int,
@@ -137,15 +137,21 @@ object AMQP extends Actor {
exchangeType: ExchangeType,
serializer: Serializer,
shutdownListener: Option[ShutdownListener],
- initReconnectDelay: Long): Endpoint = {
- val endpoint = new Endpoint(
+ initReconnectDelay: Long,
+ passive: Boolean,
+ durable: Boolean,
+ configurationArguments: Map[String, AnyRef]): Consumer = {
+ val endpoint = new Consumer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
exchangeType,
serializer,
shutdownListener,
- initReconnectDelay)
+ initReconnectDelay,
+ passive,
+ durable,
+ configurationArguments)
startLink(endpoint)
endpoint
}
@@ -162,7 +168,7 @@ object AMQP extends Actor {
}
/**
- * AMQP client actor.
+ * AMQP producer actor.
* Usage:
*
* val params = new ConnectionParameters
@@ -170,13 +176,13 @@ object AMQP extends Actor {
* params.setPassword("obama")
* params.setVirtualHost("/")
* params.setRequestedHeartbeat(0)
- * val client = AMQP.newClient(params, "localhost", 5672, "exchangeName", Serializer.Java, None, None, 100)
- * client ! Message("hi")
+ * val producer = AMQP.newProducer(params, "localhost", 5672, "exchangeName", Serializer.Java, None, None, 100)
+ * producer ! Message("hi")
*
*
* @author Jonas Bonér
*/
- class Client private[amqp] (
+ class Producer private[amqp] (
val connectionFactory: ConnectionFactory,
val hostname: String,
val port: Int,
@@ -189,14 +195,15 @@ object AMQP extends Actor {
setupChannel
- log.info("AMQP.Client [%s] is started", toString)
+ log.info("AMQP.Producer [%s] is started", toString)
def receive: PartialFunction[Any, Unit] = {
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload))
case Stop =>
- disconnect; stop
+ disconnect
+ stop
}
def setupChannel = {
@@ -225,96 +232,109 @@ object AMQP extends Actor {
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
- override def toString(): String = "AMQP.Client[hostname=" + hostname + ", port=" + port + ", exchange=" + exchangeName + "]"
+ override def toString(): String =
+ "AMQP.Producer[hostname=" + hostname +
+ ", port=" + port +
+ ", exchange=" + exchangeName + "]"
}
/**
* @author Jonas Bonér
*/
- class Endpoint private[amqp] (
+ class Consumer private[amqp] (
val connectionFactory: ConnectionFactory,
val hostname: String,
val port: Int,
- exchangeName: String,
- exchangeType: ExchangeType,
- serializer: Serializer,
- shutdownListener: Option[ShutdownListener],
- val initReconnectDelay: Long)
- extends FaultTolerantConnectionActor {
+ val exchangeName: String,
+ val exchangeType: ExchangeType,
+ val serializer: Serializer,
+ val shutdownListener: Option[ShutdownListener],
+ val initReconnectDelay: Long,
+ val passive: Boolean,
+ val durable: Boolean,
+ val configurationArguments: Map[java.lang.String, Object])
+ extends FaultTolerantConnectionActor { self: Consumer =>
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = true
- val consumers = new HashMap[MessageConsumer, MessageConsumer]
- val endpoint = this
+ val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener]
setupChannel
- log.info("AMQP.Endpoint [%s] is started", toString)
+ log.info("AMQP.Consumer [%s] is started", toString)
def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
- channel.exchangeDeclare(exchangeName, exchangeType.toString)
- consumers.elements.toList.map(_._2).foreach(setupConsumer)
+ channel.exchangeDeclare(exchangeName.toString, exchangeType.toString,
+ passive, durable,
+ configurationArguments.asJava)
+ listeners.elements.toList.map(_._2).foreach(setupConsumer)
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
- def setupConsumer(consumer: MessageConsumer) = {
- channel.queueDeclare(consumer.queueName)
- channel.queueBind(consumer.queueName, exchangeName, consumer.routingKey)
+ def setupConsumer(listener: MessageConsumerListener) = {
+ channel.queueDeclare(listener.queueName)
+ channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
- val consumerTag = channel.basicConsume(consumer.queueName, false, new DefaultConsumer(channel) with Logging {
+ val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
try {
- consumer.actor ! Message(serializer.in(payload, None), envelope.getRoutingKey)
+ listener.actor ! Message(serializer.in(payload, None), envelope.getRoutingKey)
channel.basicAck(envelope.getDeliveryTag, false)
} catch {
- case cause => endpoint ! Failure(cause) // pass on and rethrow exception in endpoint actor to trigger restart and reconnect
+ case cause => self ! Failure(cause) // pass on and re-throw exception in endpoint actor to trigger restart and reconnect
}
}
- override def handleShutdownSignal(consumerTag: String, signal: ShutdownSignalException) = {
- consumers.elements.toList.map(_._2).find(_.tag == consumerTag) match {
- case None => log.warning("Could not find message consumer for tag [%s]; can't shut consumer down", consumerTag)
- case Some(consumer) =>
- log.warning("Message consumer [%s] is being shutdown by [%s] due to [%s]", consumer, signal.getReference, signal.getReason)
- endpoint ! CancelMessageConsumer(consumer)
+ override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = {
+ listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match {
+ case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
+ case Some(listener) =>
+ log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener, signal.getReference, signal.getReason)
+ self ! CancelMessageConsumerListener(listener)
}
}
})
- consumer.tag = Some(consumerTag)
+ listener.tag = Some(listenerTag)
}
def receive: PartialFunction[Any, Unit] = {
- case consumer: MessageConsumer =>
- startLink(consumer.actor)
- consumers.put(consumer, consumer)
- setupConsumer(consumer)
- log.info("Message consumer is registered [%s]", consumer)
+ case listener: MessageConsumerListener =>
+ startLink(listener.actor)
+ listeners.put(listener, listener)
+ setupConsumer(listener)
+ log.info("Message consumer listener is registered [%s]", listener)
- case CancelMessageConsumer(hash) =>
- consumers.get(hash) match {
- case None => log.warning("Can't unregister message consumer [%s]; no such consumer", hash)
- case Some(consumer) =>
- consumers - consumer
- consumer.tag match {
- case None => log.warning("Can't unregister message consumer [%s]; no consumer tag", consumer)
+ case CancelMessageConsumerListener(hash) =>
+ listeners.get(hash) match {
+ case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", hash)
+ case Some(listener) =>
+ listeners - listener
+ listener.tag match {
+ case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener)
case Some(tag) =>
channel.basicCancel(tag)
- unlink(consumer.actor)
- consumer.actor.stop
- log.info("Message consumer is cancelled and shut down [%s]", consumer)
+ unlink(listener.actor)
+ listener.actor.stop
+ log.info("Message consumer is cancelled and shut down [%s]", listener)
}
}
case Reconnect(delay) => reconnect(delay)
case Failure(cause) => log.error(cause, ""); throw cause
case Stop => disconnect; stop
- case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Endpoint [" + this + "]")
+ case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
}
- override def toString(): String = "AMQP.Endpoint[hostname=" + hostname + ", port=" + port + ", exchange=" + exchangeName + ", type=" + exchangeType + "]"
+ override def toString(): String =
+ "AMQP.Consumer[hostname=" + hostname +
+ ", port=" + port +
+ ", exchange=" + exchangeName +
+ ", type=" + exchangeType +
+ ", passive=" + passive +
+ ", durable=" + durable + "]"
}
trait FaultTolerantConnectionActor extends Actor {
diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala
index b3ec9a22c1..dedc6bf5ba 100644
--- a/akka-amqp/src/main/scala/ExampleSession.scala
+++ b/akka-amqp/src/main/scala/ExampleSession.scala
@@ -4,9 +4,10 @@
package se.scalablesolutions.akka.amqp
-import akka.serialization.Serializer
+import se.scalablesolutions.akka.serialization.Serializer
+import se.scalablesolutions.akka.actor.Actor
+
import com.rabbitmq.client.ConnectionParameters
-import actor.Actor
object ExampleSession {
import AMQP._
@@ -29,29 +30,29 @@ object ExampleSession {
}
def direct = {
- val endpoint = AMQP.newEndpoint(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, SERIALIZER, None, 100)
- endpoint ! MessageConsumer("@george_bush", "direct", new Actor() {
+ val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, SERIALIZER, None, 100, false, false, Map[String, AnyRef]())
+ consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload)
}
})
- val client = AMQP.newClient(CONFIG, HOSTNAME, PORT, IM, SERIALIZER, None, None, 100)
- client ! Message("@jonas_boner: You sucked!!", "direct")
+ val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, SERIALIZER, None, None, 100)
+ producer ! Message("@jonas_boner: You sucked!!", "direct")
}
def fanout = {
- val endpoint = AMQP.newEndpoint(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, SERIALIZER, None, 100)
- endpoint ! MessageConsumer("@george_bush", "", new Actor() {
+ val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, SERIALIZER, None, 100, false, false, Map[String, AnyRef]())
+ consumer ! MessageConsumerListener("@george_bush", "", new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload)
}
})
- endpoint ! MessageConsumer("@barack_obama", "", new Actor() {
+ consumer ! MessageConsumerListener("@barack_obama", "", new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", payload)
}
})
- val client = AMQP.newClient(CONFIG, HOSTNAME, PORT, CHAT, SERIALIZER, None, None, 100)
- client ! Message("@jonas_boner: I'm going surfing", "")
+ val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, SERIALIZER, None, None, 100)
+ producer ! Message("@jonas_boner: I'm going surfing", "")
}
}
\ No newline at end of file
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index 46538d0c48..3f3f65bde3 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -10,7 +10,7 @@ import com.google.inject.Scopes;
import junit.framework.TestCase;
import se.scalablesolutions.akka.Config;
-import se.scalablesolutions.akka.reactor.EventBasedThreadPoolDispatcher;
+import se.scalablesolutions.akka.dispatch.EventBasedThreadPoolDispatcher;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import java.util.concurrent.ThreadPoolExecutor;
@@ -104,7 +104,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
try {
foo.longRunning();
fail("exception should have been thrown");
- } catch (se.scalablesolutions.akka.reactor.FutureTimeoutException e) {
+ } catch (se.scalablesolutions.akka.dispatch.FutureTimeoutException e) {
}
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
index 22ff23bcab..8fa2ffcd81 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
@@ -56,7 +56,8 @@ public class InMemStateful {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
- nested.success(key, msg);
+ nested.success(key, msg);
+ System.out.println("--- after success ");
}
public String failure(String key, String msg, InMemFailer failer) {
diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala
index d5aab7e1d3..7831fd2d01 100755
--- a/akka-kernel/src/main/scala/AkkaServlet.scala
+++ b/akka-kernel/src/main/scala/AkkaServlet.scala
@@ -4,8 +4,8 @@
package se.scalablesolutions.akka.rest
-import config.ConfiguratorRepository
-import util.Logging
+import se.scalablesolutions.akka.config.ConfiguratorRepository
+import se.scalablesolutions.akka.util.Logging
import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorage.scala
index bae0570255..fcc5766ed4 100644
--- a/akka-persistence/src/main/scala/CassandraStorage.scala
+++ b/akka-persistence/src/main/scala/CassandraStorage.scala
@@ -29,6 +29,7 @@ object CassandraStorage extends MapStorage
val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
val REF_COLUMN_PARENT = new ColumnParent("ref", null)
val REF_KEY = "item".getBytes("UTF-8")
+ val EMPTY_BYTE_ARRAY = new Array[Byte](0)
val CASSANDRA_SERVER_HOSTNAME = config.getString("akka.storage.cassandra.hostname", "127.0.0.1")
val CASSANDRA_SERVER_PORT = config.getInt("akka.storage.cassandra.port", 9160)
@@ -126,7 +127,7 @@ object CassandraStorage extends MapStorage
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
}
- if (column.isDefined) serializer.in(column.get.getColumn.value, None)
+ if (column.isDefined) serializer.in(column.get.column.value, None)
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
}
@@ -191,17 +192,17 @@ object CassandraStorage extends MapStorage
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
- throw new UnsupportedOperationException
- /*
- val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
- .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
- for {
- column <- columns
- col = (column.columnName, column.value)
- } yield col
- */
+ val size = getMapStorageSizeFor(name)
+ sessions.withSession { session =>
+ val columns = session / (name, MAP_COLUMN_PARENT, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, true, size, CONSISTENCY_LEVEL)
+ for {
+ columnOrSuperColumn <- columns
+ entry = (serializer.in(columnOrSuperColumn.column.name, None), serializer.in(columnOrSuperColumn.column.value, None))
+ } yield entry
+ }
}
+
def getMapStorageSizeFor(name: String): Int = {
sessions.withSession {
_ |# (name, MAP_COLUMN_PARENT)
diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala
index 7363da5be5..e32acff6c9 100644
--- a/akka-persistence/src/main/scala/PersistentState.scala
+++ b/akka-persistence/src/main/scala/PersistentState.scala
@@ -116,7 +116,7 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
} catch { case e: Exception => false }
override def size: Int = try {
- storage.getMapStorageSizeFor(uuid) + newAndUpdatedEntries.size
+ storage.getMapStorageSizeFor(uuid)
} catch { case e: Exception => 0 }
override def get(key: AnyRef): Option[AnyRef] = {
@@ -137,7 +137,7 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
private var elements = newAndUpdatedEntries.toList ::: originalList.reverse
override def next: Tuple2[AnyRef, AnyRef]= synchronized {
val element = elements.head
- elements = elements.tail
+ elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
@@ -151,7 +151,9 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
}
/**
- * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ * Implements a persistent transaction
+
+ al map based on the Cassandra distributed P2P key-value storage.
*
* @author Debasish Ghosh
*/
diff --git a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala
index cfdff7bc3e..28fe92651b 100644
--- a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala
+++ b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala
@@ -5,7 +5,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import junit.framework.TestCase
-import reactor._
+import dispatch._
import org.junit.{Test, Before}
import org.junit.Assert._
diff --git a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala
index 81eeb13261..4fc18e8967 100644
--- a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala
+++ b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala
@@ -1,11 +1,11 @@
package se.scalablesolutions.akka.state
-import akka.actor.Actor
+import se.scalablesolutions.akka.actor.Actor
import junit.framework.TestCase
import org.junit.{Test, Before}
import org.junit.Assert._
-import dispatch.json._
-import dispatch.json.Js._
+import _root_.dispatch.json._
+import _root_.dispatch.json.Js._
/**
* A persistent actor based on MongoDB storage.
diff --git a/akka-persistence/src/test/scala/MongoStorageSpec.scala b/akka-persistence/src/test/scala/MongoStorageSpec.scala
index 2650b49506..32171be87a 100644
--- a/akka-persistence/src/test/scala/MongoStorageSpec.scala
+++ b/akka-persistence/src/test/scala/MongoStorageSpec.scala
@@ -4,8 +4,8 @@ import junit.framework.TestCase
import org.junit.{Test, Before}
import org.junit.Assert._
-import dispatch.json._
-import dispatch.json.Js._
+import _root_.dispatch.json._
+import _root_.dispatch.json.Js._
class MongoStorageSpec extends TestCase {
@@ -81,8 +81,6 @@ class MongoStorageSpec extends TestCase {
val JsString(str) = MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
assertEquals("debasish", str)
- import dispatch.json.Js._
-
val l = MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
val num_list = list ! num
val num_list(l0) = l
diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java
index 1c2f100f26..29152ddee4 100644
--- a/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java
+++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java
@@ -1,5 +1,6 @@
package se.scalablesolutions.akka.stm;
+import org.multiverse.api.Stm;
import static org.multiverse.api.StmUtils.retry;
import org.multiverse.api.Transaction;
import org.multiverse.api.exceptions.LoadUncommittedException;
@@ -7,6 +8,8 @@ import org.multiverse.api.exceptions.ReadonlyException;
import org.multiverse.datastructures.refs.ManagedRef;
import org.multiverse.stms.alpha.*;
import org.multiverse.stms.alpha.mixins.FastAtomicObjectMixin;
+import org.multiverse.templates.AtomicTemplate;
+import org.multiverse.utils.GlobalStmInstance;
import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction;
import static java.lang.String.format;
@@ -25,6 +28,63 @@ public final class Ref