diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala index 7f2d2f8fa3..abefe853ce 100644 --- a/akka-actors/src/main/scala/actor/ActiveObject.scala +++ b/akka-actors/src/main/scala/actor/ActiveObject.scala @@ -67,49 +67,49 @@ class ActiveObjectFactory { def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(target, actor, None, timeout) } def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(intf, target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(intf, target, actor, None, timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @@ -173,49 +173,49 @@ object ActiveObject { def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(None) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(restartCallbacks) - actor.dispatcher = dispatcher + actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index d18092dd83..bd5276bc01 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -7,15 +7,16 @@ package se.scalablesolutions.akka.actor import com.google.protobuf.ByteString import java.net.InetSocketAddress import java.util.concurrent.CopyOnWriteArraySet - -import reactor._ -import config.ScalaConfig._ -import stm.TransactionManagement -import util.Helpers.ReadWriteLock -import nio.protobuf.RemoteProtocol.RemoteRequest -import util.Logging -import serialization.{Serializer, Serializable, SerializationProtocol} -import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} + +import se.scalablesolutions.akka.reactor._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.util.Helpers.ReadWriteLock +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest +import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.Config._ sealed abstract class LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage @@ -42,7 +43,6 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { * @author Jonas Bonér */ object Actor { - import Config._ val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) } @@ -100,7 +100,7 @@ trait Actor extends Logging with TransactionManagement { * .buildThreadPool * */ - protected[akka] var dispatcher: MessageDispatcher = { + protected[akka] var messageDispatcher: MessageDispatcher = { val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName) mailbox = dispatcher.messageQueue dispatcher.registerHandler(this, new ActorMessageInvoker(this)) @@ -198,7 +198,7 @@ trait Actor extends Logging with TransactionManagement { */ def start = synchronized { if (!isRunning) { - dispatcher.start + messageDispatcher.start isRunning = true } } @@ -208,7 +208,7 @@ trait Actor extends Logging with TransactionManagement { */ def stop = synchronized { if (isRunning) { - dispatcher.unregisterHandler(this) + messageDispatcher.unregisterHandler(this) isRunning = false shutdown } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") @@ -265,14 +265,16 @@ trait Actor extends Logging with TransactionManagement { case Some(future) => future.completeWithResult(message) } + def dispatcher = messageDispatcher + /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def setDispatcher(disp: MessageDispatcher) = synchronized { + def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized { if (!isRunning) { - dispatcher = disp - mailbox = dispatcher.messageQueue - dispatcher.registerHandler(this, new ActorMessageInvoker(this)) + messageDispatcher = dispatcher + mailbox = messageDispatcher.messageQueue + messageDispatcher.registerHandler(this, new ActorMessageInvoker(this)) } else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started") } @@ -361,7 +363,7 @@ trait Actor extends Logging with TransactionManagement { */ protected[this] def spawn[T <: Actor](actorClass: Class[T]): T = { val actor = actorClass.newInstance.asInstanceOf[T] - actor.dispatcher = dispatcher + actor.messageDispatcher = messageDispatcher actor.mailbox = mailbox actor.start actor @@ -375,7 +377,7 @@ trait Actor extends Logging with TransactionManagement { protected[this] def spawnRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = { val actor = actorClass.newInstance.asInstanceOf[T] actor.makeRemote(hostname, port) - actor.dispatcher = dispatcher + actor.messageDispatcher = messageDispatcher actor.mailbox = mailbox actor.start actor @@ -587,9 +589,9 @@ trait Actor extends Logging with TransactionManagement { private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized { - dispatcher = disp - mailbox = dispatcher.messageQueue - dispatcher.registerHandler(this, new ActorMessageInvoker(this)) + messageDispatcher = disp + mailbox = messageDispatcher.messageQueue + messageDispatcher.registerHandler(this, new ActorMessageInvoker(this)) } private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { diff --git a/akka-actors/src/main/scala/reactor/Dispatchers.scala b/akka-actors/src/main/scala/reactor/Dispatchers.scala index 30846752b6..ad6c94139b 100644 --- a/akka-actors/src/main/scala/reactor/Dispatchers.scala +++ b/akka-actors/src/main/scala/reactor/Dispatchers.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.reactor -import actor.Actor +import se.scalablesolutions.akka.actor.Actor /** * Scala API. Dispatcher factory. diff --git a/akka-actors/src/main/scala/reactor/ThreadBasedDispatcher.scala b/akka-actors/src/main/scala/reactor/ThreadBasedDispatcher.scala index aa04414169..8c70446dba 100644 --- a/akka-actors/src/main/scala/reactor/ThreadBasedDispatcher.scala +++ b/akka-actors/src/main/scala/reactor/ThreadBasedDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.reactor import java.util.concurrent.LinkedBlockingQueue import java.util.Queue -import actor.{Actor, ActorMessageInvoker} +import se.scalablesolutions.akka.actor.{Actor, ActorMessageInvoker} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -41,8 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: selectorThread.interrupt } - def registerHandler(key: AnyRef, handler: MessageInvoker) = throw new UnsupportedOperationException - def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException + def registerHandler(key: AnyRef, handler: MessageInvoker) = {} + def unregisterHandler(key: AnyRef) = {} } class BlockingMessageQueue(name: String) extends MessageQueue { diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index ddc18a61ef..b328684edf 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -42,7 +42,7 @@ class TransactionalState { @serializable trait Transactional { // FIXME: won't work across the cluster - val uuid = Uuid.newUuid.toString + var uuid = Uuid.newUuid.toString private[akka] def begin private[akka] def commit diff --git a/akka-actors/src/test/scala/AllTest.scala b/akka-actors/src/test/scala/AllTest.scala index 6e782232e7..60f18d7d39 100644 --- a/akka-actors/src/test/scala/AllTest.scala +++ b/akka-actors/src/test/scala/AllTest.scala @@ -4,7 +4,7 @@ import junit.framework.Test import junit.framework.TestCase import junit.framework.TestSuite -import actor.{ActorSpec, RemoteActorSpec, InMemoryActorSpec, SupervisorSpec, RemoteSupervisorSpec,SchedulerSpec} +import actor.{ThreadBasedActorSpec, RemoteActorSpec, InMemoryActorSpec, SupervisorSpec, RemoteSupervisorSpec,SchedulerSpec} import reactor.{EventBasedSingleThreadDispatcherTest, EventBasedThreadPoolDispatcherTest} object AllTest extends TestCase { @@ -14,7 +14,9 @@ object AllTest extends TestCase { suite.addTestSuite(classOf[RemoteSupervisorSpec]) suite.addTestSuite(classOf[EventBasedSingleThreadDispatcherTest]) suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest]) - suite.addTestSuite(classOf[ActorSpec]) + suite.addTestSuite(classOf[ThreadBasedActorSpec]) + suite.addTestSuite(classOf[EventBasedSingleThreadDispatcherTest]) + suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest]) suite.addTestSuite(classOf[RemoteActorSpec]) suite.addTestSuite(classOf[InMemoryActorSpec]) suite.addTestSuite(classOf[SchedulerSpec]) diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadActorSpec.scala b/akka-actors/src/test/scala/EventBasedSingleThreadActorSpec.scala new file mode 100644 index 0000000000..97da0d6a58 --- /dev/null +++ b/akka-actors/src/test/scala/EventBasedSingleThreadActorSpec.scala @@ -0,0 +1,69 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.TimeUnit + +import junit.framework.Assert._ + +import se.scalablesolutions.akka.reactor.Dispatchers + +class EventBasedSingleThreadActorSpec extends junit.framework.TestCase { + private val unit = TimeUnit.MILLISECONDS + + class TestActor extends Actor { + dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(name) + + def receive: PartialFunction[Any, Unit] = { + case "Hello" => + reply("World") + case "Failure" => + throw new RuntimeException("expected") + } + } + + def testSendOneWay = { + implicit val timeout = 5000L + var oneWay = "nada" + val actor = new Actor { + def receive: PartialFunction[Any, Unit] = { + case "OneWay" => oneWay = "received" + } + } + actor.start + val result = actor ! "OneWay" + Thread.sleep(100) + assertEquals("received", oneWay) + actor.stop + } + + def testSendReplySync = { + implicit val timeout = 5000L + val actor = new TestActor + actor.start + val result: String = actor !? "Hello" + assertEquals("World", result) + actor.stop + } + + def testSendReplyAsync = { + implicit val timeout = 5000L + val actor = new TestActor + actor.start + val result = actor !! "Hello" + assertEquals("World", result.get.asInstanceOf[String]) + actor.stop + } + + def testSendReceiveException = { + implicit val timeout = 5000L + val actor = new TestActor + actor.start + try { + actor !! "Failure" + fail("Should have thrown an exception") + } catch { + case e => + assertEquals("expected", e.getMessage()) + } + actor.stop + } +} diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala index 758f9d6cd0..489b20b737 100644 --- a/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala +++ b/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala @@ -108,8 +108,8 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) diff --git a/akka-actors/src/test/scala/ActorSpec.scala b/akka-actors/src/test/scala/EventBasedThreadPoolActorSpec.scala similarity index 93% rename from akka-actors/src/test/scala/ActorSpec.scala rename to akka-actors/src/test/scala/EventBasedThreadPoolActorSpec.scala index 74ebd13f25..767f30574d 100644 --- a/akka-actors/src/test/scala/ActorSpec.scala +++ b/akka-actors/src/test/scala/EventBasedThreadPoolActorSpec.scala @@ -2,9 +2,9 @@ package se.scalablesolutions.akka.actor import java.util.concurrent.TimeUnit -import org.junit.Assert._ +import junit.framework.Assert._ -class ActorSpec extends junit.framework.TestCase { +class EventBasedThreadPoolActorSpec extends junit.framework.TestCase { private val unit = TimeUnit.MILLISECONDS class TestActor extends Actor { @@ -15,7 +15,7 @@ class ActorSpec extends junit.framework.TestCase { throw new RuntimeException("expected") } } - + def testSendOneWay = { implicit val timeout = 5000L var oneWay = "nada" diff --git a/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala index a57ad0b825..9e5db36751 100644 --- a/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala +++ b/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala @@ -152,8 +152,8 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) diff --git a/akka-actors/src/test/scala/Messages.scala b/akka-actors/src/test/scala/Messages.scala index 7e4d5ca66f..59e884121e 100644 --- a/akka-actors/src/test/scala/Messages.scala +++ b/akka-actors/src/test/scala/Messages.scala @@ -34,7 +34,7 @@ case class User(val usernamePassword: Tuple2[String, String], def toBytes: Array[Byte] = toByteArray(this) } -case class RemotePing extends TestMessage +case object RemotePing extends TestMessage case object RemotePong extends TestMessage case object RemoteOneWay extends TestMessage case object RemoteDie extends TestMessage diff --git a/akka-actors/src/test/scala/ThreadBasedActorSpec.scala b/akka-actors/src/test/scala/ThreadBasedActorSpec.scala new file mode 100644 index 0000000000..412e0cfae5 --- /dev/null +++ b/akka-actors/src/test/scala/ThreadBasedActorSpec.scala @@ -0,0 +1,69 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.TimeUnit + +import junit.framework.Assert._ + +import se.scalablesolutions.akka.reactor.Dispatchers + +class ThreadBasedActorSpec extends junit.framework.TestCase { + private val unit = TimeUnit.MILLISECONDS + + class TestActor extends Actor { + dispatcher = Dispatchers.newThreadBasedDispatcher(this) + + def receive: PartialFunction[Any, Unit] = { + case "Hello" => + reply("World") + case "Failure" => + throw new RuntimeException("expected") + } + } + + def testSendOneWay = { + implicit val timeout = 5000L + var oneWay = "nada" + val actor = new Actor { + def receive: PartialFunction[Any, Unit] = { + case "OneWay" => oneWay = "received" + } + } + actor.start + val result = actor ! "OneWay" + Thread.sleep(100) + assertEquals("received", oneWay) + actor.stop + } + + def testSendReplySync = { + implicit val timeout = 5000L + val actor = new TestActor + actor.start + val result: String = actor !? "Hello" + assertEquals("World", result) + actor.stop + } + + def testSendReplyAsync = { + implicit val timeout = 5000L + val actor = new TestActor + actor.start + val result = actor !! "Hello" + assertEquals("World", result.get.asInstanceOf[String]) + actor.stop + } + + def testSendReceiveException = { + implicit val timeout = 5000L + val actor = new TestActor + actor.start + try { + actor !! "Failure" + fail("Should have thrown an exception") + } catch { + case e => + assertEquals("expected", e.getMessage()) + } + actor.stop + } +} diff --git a/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala b/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala index a76d02815d..308191412b 100644 --- a/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala @@ -73,7 +73,7 @@ class ThreadBasedDispatcherTest extends TestCase { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation("id", new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation("id", new java.lang.Integer(i), None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 3acf773644..2c166cfcdd 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -5,11 +5,9 @@ import se.scalablesolutions.akka.state.*; @transactionrequired public class PersistentStateful { - private PersistentState factory = new PersistentState(); - private TransactionalMap mapState = factory.newMap(new CassandraStorageConfig()); - private TransactionalVector vectorState = factory.newVector(new CassandraStorageConfig());; - private TransactionalRef refState = factory.newRef(new CassandraStorageConfig()); - + private PersistentMap mapState = PersistentState.newMap(new CassandraStorageConfig()); + private PersistentVector vectorState = PersistentState.newVector(new CassandraStorageConfig());; + private PersistentRef refState = PersistentState.newRef(new CassandraStorageConfig()); public String getMapState(String key) { return (String) mapState.get(key).get(); diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java index 6f26427118..ced4bc1ac1 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java @@ -5,11 +5,9 @@ import se.scalablesolutions.akka.state.*; @transactionrequired public class PersistentStatefulNested { - private PersistentState factory = new PersistentState(); - private TransactionalMap mapState = factory.newMap(new CassandraStorageConfig()); - private TransactionalVector vectorState = factory.newVector(new CassandraStorageConfig());; - private TransactionalRef refState = factory.newRef(new CassandraStorageConfig()); - + private PersistentMap mapState = PersistentState.newMap(new CassandraStorageConfig()); + private PersistentVector vectorState = PersistentState.newVector(new CassandraStorageConfig()); + private PersistentRef refState = PersistentState.newRef(new CassandraStorageConfig()); public String getMapState(String key) { return (String) mapState.get(key).get(); diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala index 0b8bfb8fd1..c4d45ebd89 100644 --- a/akka-persistence/src/main/scala/PersistentState.scala +++ b/akka-persistence/src/main/scala/PersistentState.scala @@ -13,10 +13,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} sealed abstract class PersistentStateConfig abstract class PersistentStorageConfig extends PersistentStateConfig -case class CassandraStorageConfig extends PersistentStorageConfig -case class TerracottaStorageConfig extends PersistentStorageConfig -case class TokyoCabinetStorageConfig extends PersistentStorageConfig -case class MongoStorageConfig extends PersistentStorageConfig +case class CassandraStorageConfig() extends PersistentStorageConfig +case class TerracottaStorageConfig() extends PersistentStorageConfig +case class TokyoCabinetStorageConfig() extends PersistentStorageConfig +case class MongoStorageConfig() extends PersistentStorageConfig /** * Scala API. @@ -25,34 +25,29 @@ case class MongoStorageConfig extends PersistentStorageConfig *
* val myMap = PersistentState.newMap(CassandraStorageConfig) *- */ -object PersistentState extends PersistentState - -/** * Java API. * * Example Java usage: *
- * PersistentState state = new PersistentState(); - * TransactionalMap myMap = state.newMap(new CassandraStorageConfig()); + * TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig()); **/ -class PersistentState { - def newMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match { +object PersistentState { + def newMap(config: PersistentStorageConfig): PersistentMap = config match { case CassandraStorageConfig() => new CassandraPersistentMap case MongoStorageConfig() => new MongoPersistentMap case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - def newVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { + def newVector(config: PersistentStorageConfig): PersistentVector = config match { case CassandraStorageConfig() => new CassandraPersistentVector case MongoStorageConfig() => new MongoPersistentVector case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - def newRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match { + def newRef(config: PersistentStorageConfig): PersistentRef = config match { case CassandraStorageConfig() => new CassandraPersistentRef case MongoStorageConfig() => new MongoPersistentRef case TerracottaStorageConfig() => throw new UnsupportedOperationException @@ -68,10 +63,10 @@ class PersistentState { * * @author Jonas Bonér */ -abstract class PersistentMap[K, V] extends TransactionalMap[K, V] { +abstract class PersistentMap extends TransactionalMap[AnyRef, AnyRef] { // FIXME: need to handle remove in another changeSet - protected[akka] val changeSet = new HashMap[K, V] + protected[akka] val changeSet = new HashMap[AnyRef, AnyRef] def getRange(start: Option[AnyRef], count: Int) @@ -81,15 +76,15 @@ abstract class PersistentMap[K, V] extends TransactionalMap[K, V] { override def rollback = changeSet.clear // ---- For scala.collection.mutable.Map ---- - override def put(key: K, value: V): Option[V] = { + override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = { verifyTransaction changeSet += key -> value None // always return None to speed up writes (else need to go to DB to get } - override def -=(key: K) = remove(key) + override def -=(key: AnyRef) = remove(key) - override def update(key: K, value: V) = put(key, value) + override def update(key: AnyRef, value: AnyRef) = put(key, value) } /** @@ -101,7 +96,7 @@ abstract class PersistentMap[K, V] extends TransactionalMap[K, V] { * * @author Jonas Bonér */ -abstract class TemplatePersistentMap extends PersistentMap[AnyRef, AnyRef] { +abstract class TemplatePersistentMap extends PersistentMap { // to be concretized in subclasses val storage: MapStorage @@ -212,10 +207,10 @@ class MongoPersistentMap extends TemplatePersistentMap { * * @author Jonas Bonér */ -abstract class PersistentVector[T] extends TransactionalVector[T] { +abstract class PersistentVector extends TransactionalVector[AnyRef] { // FIXME: need to handle remove in another changeSet - protected[akka] val changeSet = new ArrayBuffer[T] + protected[akka] val changeSet = new ArrayBuffer[AnyRef] // ---- For Transactional ---- override def begin = {} @@ -223,7 +218,7 @@ abstract class PersistentVector[T] extends TransactionalVector[T] { override def rollback = changeSet.clear // ---- For TransactionalVector ---- - override def add(value: T) = { + override def add(value: AnyRef) = { verifyTransaction changeSet += value } @@ -234,7 +229,7 @@ abstract class PersistentVector[T] extends TransactionalVector[T] { * * @author Debasish Ghosh */ -abstract class TemplatePersistentVector extends PersistentVector[AnyRef] { +abstract class TemplatePersistentVector extends PersistentVector { val storage: VectorStorage @@ -286,7 +281,7 @@ class CassandraPersistentVector extends TemplatePersistentVector { val storage = CassandraStorage } -/** +/** * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage. * * @author Debaissh Ghosh @@ -295,7 +290,7 @@ class MongoPersistentVector extends TemplatePersistentVector { val storage = MongoStorage } -abstract class TemplatePersistentRef extends TransactionalRef[AnyRef] { +abstract class PersistentRef extends TransactionalRef[AnyRef] { val storage: RefStorage override def commit = if (ref.isDefined) { @@ -319,10 +314,10 @@ abstract class TemplatePersistentRef extends TransactionalRef[AnyRef] { } } -class CassandraPersistentRef extends TemplatePersistentRef { +class CassandraPersistentRef extends PersistentRef { val storage = CassandraStorage } -class MongoPersistentRef extends TemplatePersistentRef { +class MongoPersistentRef extends PersistentRef { val storage = MongoStorage } diff --git a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java index 947a0f8b3f..b995972188 100644 --- a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java @@ -13,7 +13,7 @@ import se.scalablesolutions.akka.annotation.prerestart; import se.scalablesolutions.akka.annotation.postrestart; import se.scalablesolutions.akka.state.TransactionalState; import se.scalablesolutions.akka.state.PersistentState; -import se.scalablesolutions.akka.state.TransactionalMap; +import se.scalablesolutions.akka.state.PersistentMap; import se.scalablesolutions.akka.state.CassandraStorageConfig; /** @@ -26,11 +26,10 @@ import se.scalablesolutions.akka.state.CassandraStorageConfig; @Path("/persistentjavacount") @transactionrequired public class PersistentSimpleService { - private String KEY = "COUNTER"; + private Object KEY = "COUNTER"; private boolean hasStartedTicking = false; - private PersistentState factory = new PersistentState(); - private TransactionalMap