fixed bug with using ThreadBasedDispatcher + added tests for dispatchers
This commit is contained in:
parent
2c517f2731
commit
61d0b4418e
21 changed files with 303 additions and 101 deletions
|
|
@ -67,49 +67,49 @@ class ActiveObjectFactory {
|
|||
|
||||
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(intf, target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(intf, target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
|
|
@ -173,49 +173,49 @@ object ActiveObject {
|
|||
|
||||
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(intf, target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(intf, target, actor, None, timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
|
||||
val actor = new Dispatcher(None)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
|
||||
val actor = new Dispatcher(restartCallbacks)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = dispatcher
|
||||
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,15 +7,16 @@ package se.scalablesolutions.akka.actor
|
|||
import com.google.protobuf.ByteString
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.CopyOnWriteArraySet
|
||||
|
||||
import reactor._
|
||||
import config.ScalaConfig._
|
||||
import stm.TransactionManagement
|
||||
import util.Helpers.ReadWriteLock
|
||||
import nio.protobuf.RemoteProtocol.RemoteRequest
|
||||
import util.Logging
|
||||
import serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
|
||||
|
||||
import se.scalablesolutions.akka.reactor._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||
import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
|
||||
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.Config._
|
||||
|
||||
sealed abstract class LifecycleMessage
|
||||
case class Init(config: AnyRef) extends LifecycleMessage
|
||||
|
|
@ -42,7 +43,6 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Actor {
|
||||
import Config._
|
||||
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
|
||||
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
|
||||
}
|
||||
|
|
@ -100,7 +100,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* .buildThreadPool
|
||||
* </pre>
|
||||
*/
|
||||
protected[akka] var dispatcher: MessageDispatcher = {
|
||||
protected[akka] var messageDispatcher: MessageDispatcher = {
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
|
||||
mailbox = dispatcher.messageQueue
|
||||
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
|
|
@ -198,7 +198,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*/
|
||||
def start = synchronized {
|
||||
if (!isRunning) {
|
||||
dispatcher.start
|
||||
messageDispatcher.start
|
||||
isRunning = true
|
||||
}
|
||||
}
|
||||
|
|
@ -208,7 +208,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*/
|
||||
def stop = synchronized {
|
||||
if (isRunning) {
|
||||
dispatcher.unregisterHandler(this)
|
||||
messageDispatcher.unregisterHandler(this)
|
||||
isRunning = false
|
||||
shutdown
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
|
@ -265,14 +265,16 @@ trait Actor extends Logging with TransactionManagement {
|
|||
case Some(future) => future.completeWithResult(message)
|
||||
}
|
||||
|
||||
def dispatcher = messageDispatcher
|
||||
|
||||
/**
|
||||
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
|
||||
*/
|
||||
def setDispatcher(disp: MessageDispatcher) = synchronized {
|
||||
def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
|
||||
if (!isRunning) {
|
||||
dispatcher = disp
|
||||
mailbox = dispatcher.messageQueue
|
||||
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
messageDispatcher = dispatcher
|
||||
mailbox = messageDispatcher.messageQueue
|
||||
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
} else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started")
|
||||
}
|
||||
|
||||
|
|
@ -361,7 +363,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*/
|
||||
protected[this] def spawn[T <: Actor](actorClass: Class[T]): T = {
|
||||
val actor = actorClass.newInstance.asInstanceOf[T]
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = messageDispatcher
|
||||
actor.mailbox = mailbox
|
||||
actor.start
|
||||
actor
|
||||
|
|
@ -375,7 +377,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
protected[this] def spawnRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = {
|
||||
val actor = actorClass.newInstance.asInstanceOf[T]
|
||||
actor.makeRemote(hostname, port)
|
||||
actor.dispatcher = dispatcher
|
||||
actor.messageDispatcher = messageDispatcher
|
||||
actor.mailbox = mailbox
|
||||
actor.start
|
||||
actor
|
||||
|
|
@ -587,9 +589,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
|
||||
|
||||
private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
|
||||
dispatcher = disp
|
||||
mailbox = dispatcher.messageQueue
|
||||
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
messageDispatcher = disp
|
||||
mailbox = messageDispatcher.messageQueue
|
||||
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
}
|
||||
|
||||
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.reactor
|
||||
|
||||
import actor.Actor
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
|
||||
/**
|
||||
* Scala API. Dispatcher factory.
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package se.scalablesolutions.akka.reactor
|
|||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.Queue
|
||||
|
||||
import actor.{Actor, ActorMessageInvoker}
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorMessageInvoker}
|
||||
|
||||
/**
|
||||
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
|
||||
|
|
@ -41,8 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
|
|||
selectorThread.interrupt
|
||||
}
|
||||
|
||||
def registerHandler(key: AnyRef, handler: MessageInvoker) = throw new UnsupportedOperationException
|
||||
def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException
|
||||
def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
|
||||
def unregisterHandler(key: AnyRef) = {}
|
||||
}
|
||||
|
||||
class BlockingMessageQueue(name: String) extends MessageQueue {
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ class TransactionalState {
|
|||
@serializable
|
||||
trait Transactional {
|
||||
// FIXME: won't work across the cluster
|
||||
val uuid = Uuid.newUuid.toString
|
||||
var uuid = Uuid.newUuid.toString
|
||||
|
||||
private[akka] def begin
|
||||
private[akka] def commit
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import junit.framework.Test
|
|||
import junit.framework.TestCase
|
||||
import junit.framework.TestSuite
|
||||
|
||||
import actor.{ActorSpec, RemoteActorSpec, InMemoryActorSpec, SupervisorSpec, RemoteSupervisorSpec,SchedulerSpec}
|
||||
import actor.{ThreadBasedActorSpec, RemoteActorSpec, InMemoryActorSpec, SupervisorSpec, RemoteSupervisorSpec,SchedulerSpec}
|
||||
import reactor.{EventBasedSingleThreadDispatcherTest, EventBasedThreadPoolDispatcherTest}
|
||||
|
||||
object AllTest extends TestCase {
|
||||
|
|
@ -14,7 +14,9 @@ object AllTest extends TestCase {
|
|||
suite.addTestSuite(classOf[RemoteSupervisorSpec])
|
||||
suite.addTestSuite(classOf[EventBasedSingleThreadDispatcherTest])
|
||||
suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
|
||||
suite.addTestSuite(classOf[ActorSpec])
|
||||
suite.addTestSuite(classOf[ThreadBasedActorSpec])
|
||||
suite.addTestSuite(classOf[EventBasedSingleThreadDispatcherTest])
|
||||
suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
|
||||
suite.addTestSuite(classOf[RemoteActorSpec])
|
||||
suite.addTestSuite(classOf[InMemoryActorSpec])
|
||||
suite.addTestSuite(classOf[SchedulerSpec])
|
||||
|
|
|
|||
|
|
@ -0,0 +1,69 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import junit.framework.Assert._
|
||||
|
||||
import se.scalablesolutions.akka.reactor.Dispatchers
|
||||
|
||||
class EventBasedSingleThreadActorSpec extends junit.framework.TestCase {
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(name)
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case "Hello" =>
|
||||
reply("World")
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("expected")
|
||||
}
|
||||
}
|
||||
|
||||
def testSendOneWay = {
|
||||
implicit val timeout = 5000L
|
||||
var oneWay = "nada"
|
||||
val actor = new Actor {
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case "OneWay" => oneWay = "received"
|
||||
}
|
||||
}
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(100)
|
||||
assertEquals("received", oneWay)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
def testSendReplySync = {
|
||||
implicit val timeout = 5000L
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
val result: String = actor !? "Hello"
|
||||
assertEquals("World", result)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
def testSendReplyAsync = {
|
||||
implicit val timeout = 5000L
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
val result = actor !! "Hello"
|
||||
assertEquals("World", result.get.asInstanceOf[String])
|
||||
actor.stop
|
||||
}
|
||||
|
||||
def testSendReceiveException = {
|
||||
implicit val timeout = 5000L
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
try {
|
||||
actor !! "Failure"
|
||||
fail("Should have thrown an exception")
|
||||
} catch {
|
||||
case e =>
|
||||
assertEquals("expected", e.getMessage())
|
||||
}
|
||||
actor.stop
|
||||
}
|
||||
}
|
||||
|
|
@ -108,8 +108,8 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None))
|
||||
}
|
||||
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assertFalse(threadingIssueDetected.get)
|
||||
|
|
|
|||
|
|
@ -2,9 +2,9 @@ package se.scalablesolutions.akka.actor
|
|||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.junit.Assert._
|
||||
import junit.framework.Assert._
|
||||
|
||||
class ActorSpec extends junit.framework.TestCase {
|
||||
class EventBasedThreadPoolActorSpec extends junit.framework.TestCase {
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
|
|
@ -15,7 +15,7 @@ class ActorSpec extends junit.framework.TestCase {
|
|||
throw new RuntimeException("expected")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def testSendOneWay = {
|
||||
implicit val timeout = 5000L
|
||||
var oneWay = "nada"
|
||||
|
|
@ -152,8 +152,8 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None))
|
||||
}
|
||||
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assertFalse(threadingIssueDetected.get)
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ case class User(val usernamePassword: Tuple2[String, String],
|
|||
def toBytes: Array[Byte] = toByteArray(this)
|
||||
}
|
||||
|
||||
case class RemotePing extends TestMessage
|
||||
case object RemotePing extends TestMessage
|
||||
case object RemotePong extends TestMessage
|
||||
case object RemoteOneWay extends TestMessage
|
||||
case object RemoteDie extends TestMessage
|
||||
|
|
|
|||
69
akka-actors/src/test/scala/ThreadBasedActorSpec.scala
Normal file
69
akka-actors/src/test/scala/ThreadBasedActorSpec.scala
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import junit.framework.Assert._
|
||||
|
||||
import se.scalablesolutions.akka.reactor.Dispatchers
|
||||
|
||||
class ThreadBasedActorSpec extends junit.framework.TestCase {
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case "Hello" =>
|
||||
reply("World")
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("expected")
|
||||
}
|
||||
}
|
||||
|
||||
def testSendOneWay = {
|
||||
implicit val timeout = 5000L
|
||||
var oneWay = "nada"
|
||||
val actor = new Actor {
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case "OneWay" => oneWay = "received"
|
||||
}
|
||||
}
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(100)
|
||||
assertEquals("received", oneWay)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
def testSendReplySync = {
|
||||
implicit val timeout = 5000L
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
val result: String = actor !? "Hello"
|
||||
assertEquals("World", result)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
def testSendReplyAsync = {
|
||||
implicit val timeout = 5000L
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
val result = actor !! "Hello"
|
||||
assertEquals("World", result.get.asInstanceOf[String])
|
||||
actor.stop
|
||||
}
|
||||
|
||||
def testSendReceiveException = {
|
||||
implicit val timeout = 5000L
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
try {
|
||||
actor !! "Failure"
|
||||
fail("Should have thrown an exception")
|
||||
} catch {
|
||||
case e =>
|
||||
assertEquals("expected", e.getMessage())
|
||||
}
|
||||
actor.stop
|
||||
}
|
||||
}
|
||||
|
|
@ -73,7 +73,7 @@ class ThreadBasedDispatcherTest extends TestCase {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation("id", new Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation("id", new java.lang.Integer(i), None, None))
|
||||
}
|
||||
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assertFalse(threadingIssueDetected.get)
|
||||
|
|
|
|||
|
|
@ -5,11 +5,9 @@ import se.scalablesolutions.akka.state.*;
|
|||
|
||||
@transactionrequired
|
||||
public class PersistentStateful {
|
||||
private PersistentState factory = new PersistentState();
|
||||
private TransactionalMap mapState = factory.newMap(new CassandraStorageConfig());
|
||||
private TransactionalVector vectorState = factory.newVector(new CassandraStorageConfig());;
|
||||
private TransactionalRef refState = factory.newRef(new CassandraStorageConfig());
|
||||
|
||||
private PersistentMap mapState = PersistentState.newMap(new CassandraStorageConfig());
|
||||
private PersistentVector vectorState = PersistentState.newVector(new CassandraStorageConfig());;
|
||||
private PersistentRef refState = PersistentState.newRef(new CassandraStorageConfig());
|
||||
|
||||
public String getMapState(String key) {
|
||||
return (String) mapState.get(key).get();
|
||||
|
|
|
|||
|
|
@ -5,11 +5,9 @@ import se.scalablesolutions.akka.state.*;
|
|||
|
||||
@transactionrequired
|
||||
public class PersistentStatefulNested {
|
||||
private PersistentState factory = new PersistentState();
|
||||
private TransactionalMap mapState = factory.newMap(new CassandraStorageConfig());
|
||||
private TransactionalVector vectorState = factory.newVector(new CassandraStorageConfig());;
|
||||
private TransactionalRef refState = factory.newRef(new CassandraStorageConfig());
|
||||
|
||||
private PersistentMap mapState = PersistentState.newMap(new CassandraStorageConfig());
|
||||
private PersistentVector vectorState = PersistentState.newVector(new CassandraStorageConfig());
|
||||
private PersistentRef refState = PersistentState.newRef(new CassandraStorageConfig());
|
||||
|
||||
public String getMapState(String key) {
|
||||
return (String) mapState.get(key).get();
|
||||
|
|
|
|||
|
|
@ -13,10 +13,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
|
|||
|
||||
sealed abstract class PersistentStateConfig
|
||||
abstract class PersistentStorageConfig extends PersistentStateConfig
|
||||
case class CassandraStorageConfig extends PersistentStorageConfig
|
||||
case class TerracottaStorageConfig extends PersistentStorageConfig
|
||||
case class TokyoCabinetStorageConfig extends PersistentStorageConfig
|
||||
case class MongoStorageConfig extends PersistentStorageConfig
|
||||
case class CassandraStorageConfig() extends PersistentStorageConfig
|
||||
case class TerracottaStorageConfig() extends PersistentStorageConfig
|
||||
case class TokyoCabinetStorageConfig() extends PersistentStorageConfig
|
||||
case class MongoStorageConfig() extends PersistentStorageConfig
|
||||
|
||||
/**
|
||||
* Scala API.
|
||||
|
|
@ -25,34 +25,29 @@ case class MongoStorageConfig extends PersistentStorageConfig
|
|||
* <pre>
|
||||
* val myMap = PersistentState.newMap(CassandraStorageConfig)
|
||||
* </pre>
|
||||
*/
|
||||
object PersistentState extends PersistentState
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* <p/>
|
||||
* Example Java usage:
|
||||
* <pre>
|
||||
* PersistentState state = new PersistentState();
|
||||
* TransactionalMap myMap = state.newMap(new CassandraStorageConfig());
|
||||
* TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
|
||||
* </pre>
|
||||
*/
|
||||
class PersistentState {
|
||||
def newMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
|
||||
object PersistentState {
|
||||
def newMap(config: PersistentStorageConfig): PersistentMap = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentMap
|
||||
case MongoStorageConfig() => new MongoPersistentMap
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
def newVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
|
||||
def newVector(config: PersistentStorageConfig): PersistentVector = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentVector
|
||||
case MongoStorageConfig() => new MongoPersistentVector
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
def newRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
|
||||
def newRef(config: PersistentStorageConfig): PersistentRef = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentRef
|
||||
case MongoStorageConfig() => new MongoPersistentRef
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
|
|
@ -68,10 +63,10 @@ class PersistentState {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
abstract class PersistentMap[K, V] extends TransactionalMap[K, V] {
|
||||
abstract class PersistentMap extends TransactionalMap[AnyRef, AnyRef] {
|
||||
|
||||
// FIXME: need to handle remove in another changeSet
|
||||
protected[akka] val changeSet = new HashMap[K, V]
|
||||
protected[akka] val changeSet = new HashMap[AnyRef, AnyRef]
|
||||
|
||||
def getRange(start: Option[AnyRef], count: Int)
|
||||
|
||||
|
|
@ -81,15 +76,15 @@ abstract class PersistentMap[K, V] extends TransactionalMap[K, V] {
|
|||
override def rollback = changeSet.clear
|
||||
|
||||
// ---- For scala.collection.mutable.Map ----
|
||||
override def put(key: K, value: V): Option[V] = {
|
||||
override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = {
|
||||
verifyTransaction
|
||||
changeSet += key -> value
|
||||
None // always return None to speed up writes (else need to go to DB to get
|
||||
}
|
||||
|
||||
override def -=(key: K) = remove(key)
|
||||
override def -=(key: AnyRef) = remove(key)
|
||||
|
||||
override def update(key: K, value: V) = put(key, value)
|
||||
override def update(key: AnyRef, value: AnyRef) = put(key, value)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -101,7 +96,7 @@ abstract class PersistentMap[K, V] extends TransactionalMap[K, V] {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
abstract class TemplatePersistentMap extends PersistentMap[AnyRef, AnyRef] {
|
||||
abstract class TemplatePersistentMap extends PersistentMap {
|
||||
|
||||
// to be concretized in subclasses
|
||||
val storage: MapStorage
|
||||
|
|
@ -212,10 +207,10 @@ class MongoPersistentMap extends TemplatePersistentMap {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
abstract class PersistentVector[T] extends TransactionalVector[T] {
|
||||
abstract class PersistentVector extends TransactionalVector[AnyRef] {
|
||||
|
||||
// FIXME: need to handle remove in another changeSet
|
||||
protected[akka] val changeSet = new ArrayBuffer[T]
|
||||
protected[akka] val changeSet = new ArrayBuffer[AnyRef]
|
||||
|
||||
// ---- For Transactional ----
|
||||
override def begin = {}
|
||||
|
|
@ -223,7 +218,7 @@ abstract class PersistentVector[T] extends TransactionalVector[T] {
|
|||
override def rollback = changeSet.clear
|
||||
|
||||
// ---- For TransactionalVector ----
|
||||
override def add(value: T) = {
|
||||
override def add(value: AnyRef) = {
|
||||
verifyTransaction
|
||||
changeSet += value
|
||||
}
|
||||
|
|
@ -234,7 +229,7 @@ abstract class PersistentVector[T] extends TransactionalVector[T] {
|
|||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
abstract class TemplatePersistentVector extends PersistentVector[AnyRef] {
|
||||
abstract class TemplatePersistentVector extends PersistentVector {
|
||||
|
||||
val storage: VectorStorage
|
||||
|
||||
|
|
@ -286,7 +281,7 @@ class CassandraPersistentVector extends TemplatePersistentVector {
|
|||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
|
||||
|
|
@ -295,7 +290,7 @@ class MongoPersistentVector extends TemplatePersistentVector {
|
|||
val storage = MongoStorage
|
||||
}
|
||||
|
||||
abstract class TemplatePersistentRef extends TransactionalRef[AnyRef] {
|
||||
abstract class PersistentRef extends TransactionalRef[AnyRef] {
|
||||
val storage: RefStorage
|
||||
|
||||
override def commit = if (ref.isDefined) {
|
||||
|
|
@ -319,10 +314,10 @@ abstract class TemplatePersistentRef extends TransactionalRef[AnyRef] {
|
|||
}
|
||||
}
|
||||
|
||||
class CassandraPersistentRef extends TemplatePersistentRef {
|
||||
class CassandraPersistentRef extends PersistentRef {
|
||||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
class MongoPersistentRef extends TemplatePersistentRef {
|
||||
class MongoPersistentRef extends PersistentRef {
|
||||
val storage = MongoStorage
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import se.scalablesolutions.akka.annotation.prerestart;
|
|||
import se.scalablesolutions.akka.annotation.postrestart;
|
||||
import se.scalablesolutions.akka.state.TransactionalState;
|
||||
import se.scalablesolutions.akka.state.PersistentState;
|
||||
import se.scalablesolutions.akka.state.TransactionalMap;
|
||||
import se.scalablesolutions.akka.state.PersistentMap;
|
||||
import se.scalablesolutions.akka.state.CassandraStorageConfig;
|
||||
|
||||
/**
|
||||
|
|
@ -26,11 +26,10 @@ import se.scalablesolutions.akka.state.CassandraStorageConfig;
|
|||
@Path("/persistentjavacount")
|
||||
@transactionrequired
|
||||
public class PersistentSimpleService {
|
||||
private String KEY = "COUNTER";
|
||||
private Object KEY = "COUNTER";
|
||||
|
||||
private boolean hasStartedTicking = false;
|
||||
private PersistentState factory = new PersistentState();
|
||||
private TransactionalMap<Object, Object> storage = factory.newMap(new CassandraStorageConfig());
|
||||
private PersistentMap storage = PersistentState.newMap(new CassandraStorageConfig());
|
||||
|
||||
@GET
|
||||
@Produces({"application/html"})
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import java.lang.Integer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import java.lang.Integer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import _root_.se.scalablesolutions.akka.util.Logging
|
|||
import _root_.se.scalablesolutions.akka.security.{DigestAuthenticationActor, UserInfo}
|
||||
import _root_.javax.annotation.security.{DenyAll,PermitAll,RolesAllowed}
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, Consumes}
|
||||
import java.lang.Integer
|
||||
|
||||
class Boot {
|
||||
object factory extends SupervisorFactory {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,72 @@
|
|||
include "akka-reference.conf"
|
||||
|
||||
# This config import the Akka reference configuration.
|
||||
# In this file you can override any option defined in the 'akka-reference.conf' file.
|
||||
|
||||
####################
|
||||
# Akka Config File #
|
||||
####################
|
||||
|
||||
# This file has all the default settings, so all these could be remove with no visible effect.
|
||||
# Modify as needed.
|
||||
|
||||
<log>
|
||||
filename = "./logs/akka.log"
|
||||
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
|
||||
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
|
||||
console = on
|
||||
# syslog_host = ""
|
||||
# syslog_server_name = ""
|
||||
</log>
|
||||
|
||||
<akka>
|
||||
version = "0.6"
|
||||
|
||||
# FQN to the class doing initial active object/actor
|
||||
# supervisor bootstrap, should be defined in default constructor
|
||||
boot = ["training.ships.akka_rest.Boot", "training.ships.akka_persistence.Boot"]
|
||||
|
||||
<actor>
|
||||
timeout = 5000 # default timeout for future based invocations
|
||||
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
||||
</actor>
|
||||
|
||||
<stm>
|
||||
service = on
|
||||
restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
|
||||
# if 'off' then throws an exception or rollback for user to handle
|
||||
wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
|
||||
wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
|
||||
distributed = off # not implemented yet
|
||||
</stm>
|
||||
|
||||
<remote>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9999
|
||||
connection-timeout = 1000 # in millis
|
||||
</remote>
|
||||
|
||||
<rest>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9998
|
||||
filters = "se.scalablesolutions.akka.security.AkkaSecurityFilterFactory;org.atmosphere.core.AtmosphereFilter"
|
||||
authenticator = "sample.secure.SimpleAuthenticationService"
|
||||
</rest>
|
||||
|
||||
<storage>
|
||||
system = "mongodb" # Options: cassandra, mongodb
|
||||
|
||||
<cassandra>
|
||||
service = on
|
||||
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
|
||||
port = 9160
|
||||
storage-format = "java" # Options: java, scala-json, java-json, protobuf
|
||||
consistency-level = 1
|
||||
</cassandra>
|
||||
|
||||
<mongodb>
|
||||
service = on
|
||||
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
|
||||
port = 27017
|
||||
dbname = "mydb"
|
||||
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
|
||||
</mongodb>
|
||||
</storage>
|
||||
</akka>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue