Fixed bug in fault handling of TEMPORARY Actors + ported all Active Object Java tests to Scala (using Java POJOs)
This commit is contained in:
parent
c6b7ba6a01
commit
e8581123da
17 changed files with 225 additions and 166 deletions
|
|
@ -13,6 +13,7 @@ public class InMemStateful {
|
|||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = new TransactionalMap();
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ public class InMemStatefulNested {
|
|||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = new TransactionalMap();
|
||||
|
|
|
|||
|
|
@ -1311,11 +1311,14 @@ sealed class LocalActorRef private[akka](
|
|||
}
|
||||
|
||||
protected[akka] def restart(reason: Throwable): Unit = {
|
||||
//_isBeingRestarted = true
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
restartLinkedActors(reason)
|
||||
val failedActor = actorInstance.get
|
||||
failedActor.synchronized {
|
||||
lifeCycle.get match {
|
||||
case LifeCycle(scope, _) => {
|
||||
scope match {
|
||||
case Permanent =>
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
restartLinkedActors(reason)
|
||||
Actor.log.debug("Restarting linked actors for actor [%s].", id)
|
||||
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
|
||||
failedActor.preRestart(reason)
|
||||
|
|
@ -1329,6 +1332,10 @@ sealed class LocalActorRef private[akka](
|
|||
freshActor.postRestart(reason)
|
||||
}
|
||||
_isBeingRestarted = false
|
||||
case Temporary => shutDownTemporaryActor(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1338,25 +1345,27 @@ sealed class LocalActorRef private[akka](
|
|||
actorRef.lifeCycle.get match {
|
||||
case LifeCycle(scope, _) => {
|
||||
scope match {
|
||||
case Permanent =>
|
||||
actorRef.restart(reason)
|
||||
case Temporary =>
|
||||
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actorRef.id)
|
||||
actorRef.stop
|
||||
linkedActors.remove(actorRef.uuid) // remove the temporary actor
|
||||
case Permanent => actorRef.restart(reason)
|
||||
case Temporary => shutDownTemporaryActor(actorRef)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def shutDownTemporaryActor(temporaryActor: ActorRef) = {
|
||||
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id)
|
||||
temporaryActor.stop
|
||||
linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
|
||||
// if last temporary actor is gone, then unlink me from supervisor
|
||||
if (linkedActors.isEmpty) {
|
||||
Actor.log.info(
|
||||
"All linked actors have died permanently (they were all configured as TEMPORARY)" +
|
||||
"\n\tshutting down and unlinking supervisor actor as well [%s].",
|
||||
actorRef.id)
|
||||
temporaryActor.id)
|
||||
_supervisor.foreach(_ ! UnlinkAndStop(this))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard {
|
||||
if (_supervisor.isDefined) {
|
||||
|
|
|
|||
|
|
@ -425,14 +425,14 @@ class RemoteServerHandler(
|
|||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getCause.getClass.getName).setMessage(e.getCause.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
case e: Throwable =>
|
||||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
log.error(e, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
|
|
|
|||
|
|
@ -87,34 +87,6 @@ object Transaction {
|
|||
|
||||
// --- public methods ---------
|
||||
|
||||
def begin = synchronized {
|
||||
jta.foreach { txContainer =>
|
||||
txContainer.begin
|
||||
txContainer.registerSynchronization(new StmSynchronization(txContainer, this))
|
||||
}
|
||||
}
|
||||
|
||||
def commit = synchronized {
|
||||
log.trace("Committing transaction %s", toString)
|
||||
persistentStateMap.valuesIterator.foreach(_.commit)
|
||||
status = TransactionStatus.Completed
|
||||
jta.foreach(_.commit)
|
||||
}
|
||||
|
||||
def abort = synchronized {
|
||||
log.trace("Aborting transaction %s", toString)
|
||||
jta.foreach(_.rollback)
|
||||
persistentStateMap.valuesIterator.foreach(_.abort)
|
||||
persistentStateMap.clear
|
||||
}
|
||||
|
||||
def isNew = synchronized { status == TransactionStatus.New }
|
||||
|
||||
def isActive = synchronized { status == TransactionStatus.Active }
|
||||
|
||||
def isCompleted = synchronized { status == TransactionStatus.Completed }
|
||||
|
||||
def isAborted = synchronized { status == TransactionStatus.Aborted }
|
||||
|
||||
// --- internal methods ---------
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor;
|
||||
|
||||
public class InMemFailer implements java.io.Serializable {
|
||||
public class ActiveObjectFailer implements java.io.Serializable {
|
||||
public int fail() {
|
||||
throw new RuntimeException("expected");
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ public class Foo extends se.scalablesolutions.akka.serialization.Serializable.Ja
|
|||
}
|
||||
public String longRunning() {
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
return "test";
|
||||
|
|
|
|||
|
|
@ -5,12 +5,13 @@ import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
|
|||
import se.scalablesolutions.akka.stm.*;
|
||||
|
||||
@transactionrequired
|
||||
public class InMemStatefulNested {
|
||||
public class NestedTransactionalActiveObject {
|
||||
private TransactionalMap<String, String> mapState;
|
||||
private TransactionalVector<String> vectorState;
|
||||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = new TransactionalMap();
|
||||
|
|
@ -57,7 +58,7 @@ public class InMemStatefulNested {
|
|||
}
|
||||
|
||||
|
||||
public String failure(String key, String msg, InMemFailer failer) {
|
||||
public String failure(String key, String msg, ActiveObjectFailer failer) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
|
|
@ -66,7 +67,7 @@ public class InMemStatefulNested {
|
|||
}
|
||||
|
||||
|
||||
public void thisMethodHangs(String key, String msg, InMemFailer failer) {
|
||||
public void thisMethodHangs(String key, String msg, ActiveObjectFailer failer) {
|
||||
setMapState(key, msg);
|
||||
}
|
||||
|
||||
|
|
@ -7,12 +7,13 @@ import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
|
|||
import se.scalablesolutions.akka.stm.*;
|
||||
|
||||
@transactionrequired
|
||||
public class InMemStateful {
|
||||
public class TransactionalActiveObject {
|
||||
private TransactionalMap<String, String> mapState;
|
||||
private TransactionalVector<String> vectorState;
|
||||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = new TransactionalMap();
|
||||
|
|
@ -52,14 +53,14 @@ public class InMemStateful {
|
|||
refState.swap(msg);
|
||||
}
|
||||
|
||||
public void success(String key, String msg, InMemStatefulNested nested) {
|
||||
public void success(String key, String msg, NestedTransactionalActiveObject nested) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
nested.success(key, msg);
|
||||
}
|
||||
|
||||
public String failure(String key, String msg, InMemFailer failer) {
|
||||
public String failure(String key, String msg, ActiveObjectFailer failer) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
|
|
@ -67,7 +68,7 @@ public class InMemStateful {
|
|||
return msg;
|
||||
}
|
||||
|
||||
public String failure(String key, String msg, InMemStatefulNested nested, InMemFailer failer) {
|
||||
public String failure(String key, String msg, NestedTransactionalActiveObject nested, ActiveObjectFailer failer) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
|
|
@ -75,7 +76,7 @@ public class InMemStateful {
|
|||
return msg;
|
||||
}
|
||||
|
||||
public void thisMethodHangs(String key, String msg, InMemFailer failer) {
|
||||
public void thisMethodHangs(String key, String msg, ActiveObjectFailer failer) {
|
||||
setMapState(key, msg);
|
||||
}
|
||||
|
||||
|
|
@ -18,7 +18,7 @@ import se.scalablesolutions.akka.config.JavaConfig._
|
|||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class InMemoryNestedStateSpec extends
|
||||
class NestedTransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
|
@ -31,13 +31,13 @@ class InMemoryNestedStateSpec extends
|
|||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(classOf[InMemStateful],
|
||||
new Component(classOf[TransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(classOf[InMemStatefulNested],
|
||||
new Component(classOf[NestedTransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(classOf[InMemFailer],
|
||||
new Component(classOf[ActiveObjectFailer],
|
||||
new LifeCycle(new Permanent),
|
||||
10000)
|
||||
).toArray).supervise
|
||||
|
|
@ -50,11 +50,11 @@ class InMemoryNestedStateSpec extends
|
|||
describe("Transactional nested in-memory Active Object") {
|
||||
|
||||
it("map should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[InMemStatefulNested])
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
|
|
@ -65,15 +65,15 @@ class InMemoryNestedStateSpec extends
|
|||
}
|
||||
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[InMemStatefulNested])
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
val failer = conf.getInstance(classOf[InMemFailer])
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
Thread.sleep(100)
|
||||
|
|
@ -85,11 +85,11 @@ class InMemoryNestedStateSpec extends
|
|||
}
|
||||
|
||||
it("vector should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[InMemStatefulNested])
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
Thread.sleep(100)
|
||||
nested.setVectorState("init") // set init state
|
||||
|
|
@ -102,15 +102,15 @@ class InMemoryNestedStateSpec extends
|
|||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[InMemStatefulNested])
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val failer = conf.getInstance(classOf[InMemFailer])
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
Thread.sleep(100)
|
||||
|
|
@ -122,9 +122,9 @@ class InMemoryNestedStateSpec extends
|
|||
}
|
||||
|
||||
it("ref should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
val nested = conf.getInstance(classOf[InMemStatefulNested])
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
stateful.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
|
|
@ -138,15 +138,15 @@ class InMemoryNestedStateSpec extends
|
|||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
val nested = conf.getInstance(classOf[InMemStatefulNested])
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
stateful.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
nested.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val failer = conf.getInstance(classOf[InMemFailer])
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
Thread.sleep(100)
|
||||
|
|
@ -7,6 +7,7 @@ package se.scalablesolutions.akka.actor
|
|||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import se.scalablesolutions.akka.serialization.BinaryString
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer}
|
||||
import se.scalablesolutions.akka.OneWay
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
|
@ -71,19 +72,21 @@ object Log {
|
|||
}
|
||||
}
|
||||
|
||||
object RemoteSupervisorSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9988
|
||||
var server: RemoteServer = null
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteSupervisorSpec extends JUnitSuite {
|
||||
import RemoteSupervisorSpec._
|
||||
Config.config
|
||||
|
||||
se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
new Thread(new Runnable() {
|
||||
def run = {
|
||||
RemoteNode.start(RemoteServer.HOSTNAME, 9988)
|
||||
}
|
||||
}).start
|
||||
Thread.sleep(1000)
|
||||
server = new RemoteServer()
|
||||
server.start(HOSTNAME, PORT)
|
||||
|
||||
var pingpong1: ActorRef = _
|
||||
var pingpong2: ActorRef = _
|
||||
|
|
@ -460,7 +463,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
|||
// implementation of the Actors we want to use.
|
||||
|
||||
pingpong1 = actorOf[RemotePingPong1Actor]
|
||||
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong1.makeRemote(HOSTNAME, PORT)
|
||||
pingpong1.start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
|
|
@ -476,7 +479,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
|||
|
||||
def getSingleActorOneForOneSupervisor: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor]
|
||||
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong1.makeRemote(HOSTNAME, PORT)
|
||||
pingpong1.start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
|
|
@ -491,13 +494,13 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
|||
|
||||
def getMultipleActorsAllForOneConf: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor]
|
||||
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong1.makeRemote(HOSTNAME, PORT)
|
||||
pingpong1.start
|
||||
pingpong2 = actorOf[RemotePingPong2Actor]
|
||||
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong2.makeRemote(HOSTNAME, PORT)
|
||||
pingpong2.start
|
||||
pingpong3 = actorOf[RemotePingPong3Actor]
|
||||
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong3.makeRemote(HOSTNAME, PORT)
|
||||
pingpong3.start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
|
|
@ -520,15 +523,15 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
|||
|
||||
def getMultipleActorsOneForOneConf: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor]
|
||||
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong1.makeRemote(HOSTNAME, PORT)
|
||||
pingpong1 = actorOf[RemotePingPong1Actor]
|
||||
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong1.makeRemote(HOSTNAME, PORT)
|
||||
pingpong1.start
|
||||
pingpong2 = actorOf[RemotePingPong2Actor]
|
||||
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong2.makeRemote(HOSTNAME, PORT)
|
||||
pingpong2.start
|
||||
pingpong3 = actorOf[RemotePingPong3Actor]
|
||||
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong3.makeRemote(HOSTNAME, PORT)
|
||||
pingpong3.start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
|
|
@ -551,13 +554,13 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
|||
|
||||
def getNestedSupervisorsAllForOneConf: Supervisor = {
|
||||
pingpong1 = actorOf[RemotePingPong1Actor]
|
||||
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong1.makeRemote(HOSTNAME, PORT)
|
||||
pingpong1.start
|
||||
pingpong2 = actorOf[RemotePingPong2Actor]
|
||||
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong2.makeRemote(HOSTNAME, PORT)
|
||||
pingpong2.start
|
||||
pingpong3 = actorOf[RemotePingPong3Actor]
|
||||
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
|
||||
pingpong3.makeRemote(HOSTNAME, PORT)
|
||||
pingpong3.start
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
|
|
|
|||
|
|
@ -13,40 +13,44 @@ import org.junit.runner.RunWith
|
|||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
|
||||
import se.scalablesolutions.akka.remote.RemoteNode
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer}
|
||||
|
||||
object RemoteTransactionalActiveObjectSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9988
|
||||
var server: RemoteServer = null
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RemoteInMemoryStateSpec extends
|
||||
class RemoteTransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
import RemoteTransactionalActiveObjectSpec._
|
||||
Config.config
|
||||
|
||||
server = new RemoteServer()
|
||||
server.start(HOSTNAME, PORT)
|
||||
|
||||
private val conf = new ActiveObjectConfigurator
|
||||
private var messageLog = ""
|
||||
|
||||
override def beforeAll = {
|
||||
Config.config
|
||||
new Thread(new Runnable {
|
||||
def run = RemoteNode.start
|
||||
}).start
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
override def afterAll = conf.stop
|
||||
|
||||
describe("Remote transactional in-memory Active Object ") {
|
||||
|
||||
it("map should not rollback state for stateful server in case of success") {
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[InMemStateful], 1000, "localhost", 9999)
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[TransactionalActiveObject], 1000, HOSTNAME, PORT)
|
||||
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess") should equal("new state")
|
||||
}
|
||||
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[InMemStateful], 1000, "localhost", 9999)
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[TransactionalActiveObject], 1000, HOSTNAME, PORT)
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
val failer =ActiveObject.newRemoteInstance(classOf[InMemFailer], 1000, "localhost", 9999) //conf.getInstance(classOf[InMemFailer])
|
||||
val failer =ActiveObject.newRemoteInstance(classOf[ActiveObjectFailer], 1000, HOSTNAME, PORT) //conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -55,16 +59,16 @@ class RemoteInMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("vector should not rollback state for stateful server in case of success") {
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[InMemStateful], 1000, "localhost", 9999)
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[TransactionalActiveObject], 1000, HOSTNAME, PORT)
|
||||
stateful.setVectorState("init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
stateful.getVectorState should equal("new state")
|
||||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[InMemStateful], 1000, "localhost", 9999)
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[TransactionalActiveObject], 1000, HOSTNAME, PORT)
|
||||
stateful.setVectorState("init") // set init state
|
||||
val failer =ActiveObject.newRemoteInstance(classOf[InMemFailer], 1000, "localhost", 9999) //conf.getInstance(classOf[InMemFailer])
|
||||
val failer =ActiveObject.newRemoteInstance(classOf[ActiveObjectFailer], 1000, HOSTNAME, PORT) //conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -73,16 +77,16 @@ class RemoteInMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("ref should not rollback state for stateful server in case of success") {
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[InMemStateful], 1000, "localhost", 9999)
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[TransactionalActiveObject], 1000, HOSTNAME, PORT)
|
||||
stateful.setRefState("init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
stateful.getRefState should equal("new state")
|
||||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[InMemStateful], 1000, "localhost", 9999)
|
||||
val stateful = ActiveObject.newRemoteInstance(classOf[TransactionalActiveObject], 1000, HOSTNAME, PORT)
|
||||
stateful.setRefState("init") // set init state
|
||||
val failer =ActiveObject.newRemoteInstance(classOf[InMemFailer], 1000, "localhost", 9999) //conf.getInstance(classOf[InMemFailer])
|
||||
val failer =ActiveObject.newRemoteInstance(classOf[ActiveObjectFailer], 1000, HOSTNAME, PORT) //conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -100,7 +100,7 @@ class StmSpec extends
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
describe("Transactor") {
|
||||
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
|
||||
import GlobalTransactionVectorTestActor._
|
||||
|
|
@ -121,6 +121,7 @@ class StmSpec extends
|
|||
size4 should equal(3)
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
object GlobalTransactionVectorTestActor {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.{OneWay, Die, Ping}
|
||||
import Actor._
|
||||
|
|
@ -75,6 +76,33 @@ object SupervisorSpec {
|
|||
messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
class TemporaryActor extends Actor {
|
||||
import self._
|
||||
lifeCycle = Some(LifeCycle(Temporary))
|
||||
def receive = {
|
||||
case Ping =>
|
||||
messageLog.put("ping")
|
||||
reply("pong")
|
||||
case Die =>
|
||||
println("******************** GOT DIE 3")
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
println("******************** restart temporary")
|
||||
messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
class Master extends Actor {
|
||||
self.trapExit = classOf[Exception] :: Nil
|
||||
self.faultHandler = Some(OneForOneStrategy(5, 1000))
|
||||
val temp = self.spawnLink[TemporaryActor]
|
||||
override def receive = {
|
||||
case Die => temp !! (Die, 5000)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -86,7 +114,9 @@ class SupervisorSpec extends JUnitSuite {
|
|||
var pingpong1: ActorRef = _
|
||||
var pingpong2: ActorRef = _
|
||||
var pingpong3: ActorRef = _
|
||||
var temporaryActor: ActorRef = _
|
||||
|
||||
/*
|
||||
@Test def shouldStartServer = {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
|
|
@ -96,6 +126,30 @@ class SupervisorSpec extends JUnitSuite {
|
|||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
||||
}
|
||||
}
|
||||
*/
|
||||
@Test def shoulNotRestartProgrammaticallyLinkedTemporaryActor = {
|
||||
clearMessageLogs
|
||||
val master = actorOf[Master].start
|
||||
|
||||
intercept[RuntimeException] {
|
||||
master !! (Die, 5000)
|
||||
}
|
||||
|
||||
Thread.sleep(1000)
|
||||
assert(messageLog.size === 0)
|
||||
}
|
||||
|
||||
@Test def shoulNotRestartTemporaryActor = {
|
||||
clearMessageLogs
|
||||
val sup = getTemporaryActorAllForOneSupervisor
|
||||
|
||||
intercept[RuntimeException] {
|
||||
temporaryActor !! (Die, 5000)
|
||||
}
|
||||
|
||||
Thread.sleep(1000)
|
||||
assert(messageLog.size === 0)
|
||||
}
|
||||
|
||||
@Test def shouldStartServerForNestedSupervisorHierarchy = {
|
||||
clearMessageLogs
|
||||
|
|
@ -445,6 +499,18 @@ class SupervisorSpec extends JUnitSuite {
|
|||
// =============================================
|
||||
// Create some supervisors with different configurations
|
||||
|
||||
def getTemporaryActorAllForOneSupervisor: Supervisor = {
|
||||
temporaryActor = actorOf[TemporaryActor].start
|
||||
|
||||
Supervisor(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
|
||||
Supervise(
|
||||
temporaryActor,
|
||||
LifeCycle(Temporary))
|
||||
:: Nil))
|
||||
}
|
||||
|
||||
def getSingleActorAllForOneSupervisor: Supervisor = {
|
||||
pingpong1 = actorOf[PingPong1Actor].start
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import se.scalablesolutions.akka.config.JavaConfig._
|
|||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class InMemoryStateSpec extends
|
||||
class TransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
|
@ -31,11 +31,11 @@ class InMemoryStateSpec extends
|
|||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(classOf[InMemStateful],
|
||||
new Component(classOf[TransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
//new RestartCallbacks("preRestart", "postRestart")),
|
||||
10000),
|
||||
new Component(classOf[InMemFailer],
|
||||
new Component(classOf[ActiveObjectFailer],
|
||||
new LifeCycle(new Permanent),
|
||||
10000)).toArray
|
||||
).supervise
|
||||
|
|
@ -48,7 +48,7 @@ class InMemoryStateSpec extends
|
|||
describe("Transactional in-memory Active Object ") {
|
||||
|
||||
it("map should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init")
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
|
||||
|
|
@ -56,10 +56,10 @@ class InMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init")
|
||||
val failer = conf.getInstance(classOf[InMemFailer])
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -68,10 +68,10 @@ class InMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
val failer = conf.getInstance(classOf[InMemFailer])
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -80,7 +80,7 @@ class InMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("vector should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
|
||||
|
|
@ -88,10 +88,10 @@ class InMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setRefState("init") // set init state
|
||||
val failer = conf.getInstance(classOf[InMemFailer])
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -100,7 +100,7 @@ class InMemoryStateSpec extends
|
|||
}
|
||||
|
||||
it("ref should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[InMemStateful])
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setRefState("init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
|
||||
|
|
@ -7,7 +7,7 @@ import org.junit.Test
|
|||
import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector}
|
||||
import Actor._
|
||||
|
||||
object InMemoryActorSpec {
|
||||
object TransactorSpec {
|
||||
case class GetMapState(key: String)
|
||||
case object GetVectorState
|
||||
case object GetVectorSize
|
||||
|
|
@ -27,9 +27,9 @@ object InMemoryActorSpec {
|
|||
|
||||
case object GetNotifier
|
||||
}
|
||||
import InMemoryActorSpec._
|
||||
import TransactorSpec._
|
||||
|
||||
class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor {
|
||||
class StatefulTransactor(expectedInvocationCount: Int) extends Transactor {
|
||||
def this() = this(0)
|
||||
self.timeout = 5000
|
||||
|
||||
|
|
@ -101,7 +101,7 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor {
|
|||
}
|
||||
|
||||
@serializable
|
||||
class InMemFailerActor extends Transactor {
|
||||
class FailerTransactor extends Transactor {
|
||||
|
||||
def receive = {
|
||||
case "Failure" =>
|
||||
|
|
@ -109,10 +109,10 @@ class InMemFailerActor extends Transactor {
|
|||
}
|
||||
}
|
||||
|
||||
class InMemoryActorSpec extends JUnitSuite {
|
||||
class TransactorSpec extends JUnitSuite {
|
||||
@Test
|
||||
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(new InMemStatefulActor(2))
|
||||
val stateful = actorOf(new StatefulTransactor(2))
|
||||
stateful.start
|
||||
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
|
|
@ -123,7 +123,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf[InMemStatefulActor]
|
||||
val stateful = actorOf[StatefulTransactor]
|
||||
stateful.start
|
||||
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
|
|
@ -132,9 +132,9 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(new InMemStatefulActor(2))
|
||||
val stateful = actorOf(new StatefulTransactor(2))
|
||||
stateful.start
|
||||
val failer = actorOf[InMemFailerActor]
|
||||
val failer = actorOf[FailerTransactor]
|
||||
failer.start
|
||||
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
|
|
@ -145,10 +145,10 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf[InMemStatefulActor]
|
||||
val stateful = actorOf[StatefulTransactor]
|
||||
stateful.start
|
||||
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
val failer = actorOf[InMemFailerActor]
|
||||
val failer = actorOf[FailerTransactor]
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
|
|
@ -159,7 +159,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(new InMemStatefulActor(2))
|
||||
val stateful = actorOf(new StatefulTransactor(2))
|
||||
stateful.start
|
||||
stateful ! SetVectorStateOneWay("init") // set init state
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
|
|
@ -170,7 +170,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf[InMemStatefulActor]
|
||||
val stateful = actorOf[StatefulTransactor]
|
||||
stateful.start
|
||||
stateful !! SetVectorState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
|
|
@ -179,11 +179,11 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(new InMemStatefulActor(2))
|
||||
val stateful = actorOf(new StatefulTransactor(2))
|
||||
stateful.start
|
||||
stateful ! SetVectorStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
val failer = actorOf[InMemFailerActor]
|
||||
val failer = actorOf[FailerTransactor]
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier = (stateful !! GetNotifier).as[CountDownLatch]
|
||||
|
|
@ -193,10 +193,10 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf[InMemStatefulActor]
|
||||
val stateful = actorOf[StatefulTransactor]
|
||||
stateful.start
|
||||
stateful !! SetVectorState("init") // set init state
|
||||
val failer = actorOf[InMemFailerActor]
|
||||
val failer = actorOf[FailerTransactor]
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
|
|
@ -207,7 +207,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf(new InMemStatefulActor(2))
|
||||
val stateful = actorOf(new StatefulTransactor(2))
|
||||
stateful.start
|
||||
stateful ! SetRefStateOneWay("init") // set init state
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
|
|
@ -218,7 +218,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = actorOf[InMemStatefulActor]
|
||||
val stateful = actorOf[StatefulTransactor]
|
||||
stateful.start
|
||||
stateful !! SetRefState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
|
|
@ -227,11 +227,11 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf(new InMemStatefulActor(2))
|
||||
val stateful = actorOf(new StatefulTransactor(2))
|
||||
stateful.start
|
||||
stateful ! SetRefStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
val failer = actorOf[InMemFailerActor]
|
||||
val failer = actorOf[FailerTransactor]
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier = (stateful !! GetNotifier).as[CountDownLatch]
|
||||
|
|
@ -241,10 +241,10 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = actorOf[InMemStatefulActor]
|
||||
val stateful = actorOf[StatefulTransactor]
|
||||
stateful.start
|
||||
stateful !! SetRefState("init") // set init state
|
||||
val failer = actorOf[InMemFailerActor]
|
||||
val failer = actorOf[FailerTransactor]
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue