diff --git a/.gitignore b/.gitignore index 75ce285c98..f8bfa0a524 100755 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ tm*.log tm*.lck tm.out *.tm.epoch +_dump +WEB-INF diff --git a/akka.iml b/akka.iml index f9d299bab7..c9548a55ff 100644 --- a/akka.iml +++ b/akka.iml @@ -9,11 +9,11 @@ - - - + + + diff --git a/akka.ipr b/akka.ipr index ea473d9965..baba1e69ea 100644 --- a/akka.ipr +++ b/akka.ipr @@ -2,14 +2,6 @@ - - - - - - - - - - - - - - - - - - - @@ -31,7 +20,7 @@ - + diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java index ae7bf0ebf2..e4608a3064 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java @@ -8,7 +8,6 @@ import org.junit.*; import static org.junit.Assert.*; import junit.framework.TestSuite; -import se.scalablesolutions.akka.kernel.nio.ProxyServer; public class NioTest extends TestSuite { @@ -22,10 +21,7 @@ public class NioTest extends TestSuite { @Test public void simpleRequestReply() { - ProxyServer server = new ProxyServer(); - server.start(); - } } \ No newline at end of file diff --git a/kernel/.classpath b/kernel/.classpath index 26295f336f..9df0afc992 100644 --- a/kernel/.classpath +++ b/kernel/.classpath @@ -1,8 +1,8 @@ - - - + + + @@ -41,14 +41,13 @@ - - - + + diff --git a/kernel/.project b/kernel/.project index 5ff7c8795e..244cacc18b 100644 --- a/kernel/.project +++ b/kernel/.project @@ -1,16 +1,19 @@ + - akka-kernel - - - akka-util-java - - - - ch.epfl.lamp.sdt.core.scalabuilder - - - - org.eclipse.jdt.core.javanature - ch.epfl.lamp.sdt.core.scalanature - - \ No newline at end of file + akka-kernel + + + akka-util-java + + + + ch.epfl.lamp.sdt.core.scalabuilder + + + + + + ch.epfl.lamp.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml index ef5eb7281d..89a3c0e1e7 100644 --- a/kernel/akka-kernel.iml +++ b/kernel/akka-kernel.iml @@ -1,29 +1,6 @@ - - - - - - - - - - - - - - - - - - - - - - - @@ -35,61 +12,9 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -100,7 +25,7 @@ - + @@ -151,6 +76,15 @@ + + + + + + + + + diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala index 1d7cc675b4..b437652c8a 100644 --- a/kernel/src/main/scala/actor/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -145,7 +145,7 @@ object ActiveObject { private def localDispatch(joinpoint: JoinPoint): AnyRef = { val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] - if (isOneWay(rtti)) actor !! Invocation(joinpoint) // FIXME investigate why ! causes TX to race + if (isOneWay(rtti)) actor ! Invocation(joinpoint) else { val result = actor !! Invocation(joinpoint) if (result.isDefined) result.get @@ -160,10 +160,12 @@ object ActiveObject { new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, None, oneWay, false)) if (oneWay) null // for void methods else { - future.await - val result = getResultOrThrowException(future) - if (result.isDefined) result.get - else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]") + if (future.isDefined) { + future.get.await + val result = getResultOrThrowException(future.get) + if (result.isDefined) result.get + else throw new IllegalStateException("No result returned from call to [" + joinpoint + "]") + } else throw new IllegalStateException("No future returned from call to [" + joinpoint + "]") } } @@ -173,7 +175,9 @@ object ActiveObject { throw cause } else future.result.asInstanceOf[Option[T]] - private def isOneWay(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE + private def isOneWay(rtti: MethodRtti) = + rtti.getMethod.isAnnotationPresent(Annotations.oneway) // FIXME investigate why @oneway causes TX to race + //rtti.getMethod.getReturnType == java.lang.Void.TYPE } /** diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala index 35f96c6a4c..d232be5c02 100644 --- a/kernel/src/main/scala/actor/Actor.scala +++ b/kernel/src/main/scala/actor/Actor.scala @@ -125,9 +125,9 @@ trait Actor extends Logging with TransactionManagement { /** * TODO: document */ - def !(message: AnyRef) = if (isRunning) { - if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false)) - else mailbox.append(new MessageHandle(this, message, new NullFutureResult, activeTx)) + def !(message: AnyRef): Unit = if (isRunning) { + if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(message, timeout, false, true) + else postMessageToMailbox(message) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") /** @@ -135,7 +135,7 @@ trait Actor extends Logging with TransactionManagement { */ def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) { if (TransactionManagement.isTransactionalityEnabled) { - transactionalDispatch(message, timeout, false) + transactionalDispatch(message, timeout, false, false) } else { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) future.await @@ -153,7 +153,7 @@ trait Actor extends Logging with TransactionManagement { */ def !?[T](message: AnyRef): T = if (isRunning) { if (TransactionManagement.isTransactionalityEnabled) { - transactionalDispatch(message, 0, true).get + transactionalDispatch(message, 0, true, false).get } else { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0) future.awaitBlocking @@ -165,7 +165,7 @@ trait Actor extends Logging with TransactionManagement { * TODO: document */ protected[this] def reply(message: AnyRef) = senderFuture match { - case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." ) + case None => throw new IllegalStateException("No sender in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." ) case Some(future) => future.completeWithResult(message) } @@ -293,34 +293,50 @@ trait Actor extends Logging with TransactionManagement { try { if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx) - senderFuture = Some(future) + senderFuture = future if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]") + else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) } catch { case e => if (supervisor.isDefined) supervisor.get ! Exit(this, e) - future.completeWithException(this, e) + if (future.isDefined) future.get.completeWithException(this, e) + else e.printStackTrace + } finally { + TransactionManagement.threadBoundTx.set(None) } } + private def postMessageToMailbox(message: AnyRef): Unit = + if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false)) + else mailbox.append(new MessageHandle(this, message, None, activeTx)) + private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = - if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false)) + if (isRemote) { + val future = NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false)) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } else { val future = new DefaultCompletableFutureResult(timeout) - mailbox.append(new MessageHandle(this, message, future, TransactionManagement.threadBoundTx.get)) + mailbox.append(new MessageHandle(this, message, Some(future), TransactionManagement.threadBoundTx.get)) future } - private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean): Option[T] = { + private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean, oneWay: Boolean): Option[T] = { tryToCommitTransaction if (isInExistingTransaction) joinExistingTransaction else if (isTransactional) startNewTransaction incrementTransaction try { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) - if (blocking) future.awaitBlocking - else future.await - getResultOrThrowException(future) + if (oneWay) { + postMessageToMailbox(message) + None + } else { + val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) + if (blocking) future.awaitBlocking + else future.await + getResultOrThrowException(future) + } } catch { case e: TransactionAwareWrapperException => e.cause.printStackTrace @@ -408,4 +424,4 @@ trait Actor extends Logging with TransactionManagement { } override def toString(): String = "Actor[" + id + "]" -} \ No newline at end of file +} diff --git a/kernel/src/main/scala/nio/NettyClient.scala b/kernel/src/main/scala/nio/NettyClient.scala index fe2a17a0e9..e33a363bdf 100644 --- a/kernel/src/main/scala/nio/NettyClient.scala +++ b/kernel/src/main/scala/nio/NettyClient.scala @@ -5,11 +5,10 @@ package se.scalablesolutions.akka.kernel.nio import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap} -import kernel.reactor.{NullFutureResult, DefaultCompletableFutureResult, CompletableFutureResult} -import kernel.util.{HashCode, Logging}; +import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} +import kernel.util.Logging import org.jboss.netty.handler.codec.serialization.{ObjectEncoder, ObjectDecoder} import org.jboss.netty.bootstrap.ClientBootstrap @@ -58,17 +57,17 @@ object NettyClient extends Logging { } } - def send(request: RemoteRequest): CompletableFutureResult = if (isRunning) { + def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) { val escapedRequest = escapeRequest(request) if (escapedRequest.isOneWay) { connection.getChannel.write(escapedRequest) - new NullFutureResult + None } else { futures.synchronized { val futureResult = new DefaultCompletableFutureResult(100000) futures.put(request.id, futureResult) connection.getChannel.write(escapedRequest) - futureResult + Some(futureResult) } } } else throw new IllegalStateException("Netty client is not running, make sure you have invoked 'connect' before using the client") @@ -121,6 +120,7 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu //if (reply.successful) future.completeWithResult((reply.message, tx)) if (reply.successful) future.completeWithResult(reply.message) else future.completeWithException(null, reply.exception) + futures.remove(reply.id) } else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result) } catch { case e: Exception => log.error("Unexpected exception in NIO client handler: %s", e); throw e diff --git a/kernel/src/main/scala/nio/NettyServer.scala b/kernel/src/main/scala/nio/NettyServer.scala index a18fd4b970..40273b2297 100644 --- a/kernel/src/main/scala/nio/NettyServer.scala +++ b/kernel/src/main/scala/nio/NettyServer.scala @@ -60,7 +60,7 @@ object NettyServer extends Logging { } } -@ChannelPipelineCoverage {val value = "all"} +@ChannelPipelineCoverage { val value = "all" } class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { private val activeObjectFactory = new ActiveObjectFactory private val activeObjects = new ConcurrentHashMap[String, AnyRef] diff --git a/kernel/src/main/scala/reactor/Future.scala b/kernel/src/main/scala/reactor/Future.scala index 25ab4a8fcf..c81a88b264 100644 --- a/kernel/src/main/scala/reactor/Future.scala +++ b/kernel/src/main/scala/reactor/Future.scala @@ -117,15 +117,3 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) } - -class NullFutureResult extends CompletableFutureResult { - def completeWithResult(result: AnyRef) = {} - def completeWithException(toBlame: AnyRef, exception: Throwable) = {} - def await = throw new UnsupportedOperationException("Not implemented for NullFutureResult") - def awaitBlocking = throw new UnsupportedOperationException("Not implemented for NullFutureResult") - def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") - def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") - def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult") - def result: Option[AnyRef] = None - def exception: Option[Tuple2[AnyRef, Throwable]] = None -} diff --git a/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala b/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala index 1a320ffed9..bc86677173 100644 --- a/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala +++ b/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala @@ -33,18 +33,20 @@ class ProxyMessageDispatcher extends MessageDispatcherBase { guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] try { messageDemultiplexer.select - } catch {case e: InterruptedException => active = false} + } catch { case e: InterruptedException => active = false } val queue = messageDemultiplexer.acquireSelectedQueue for (index <- 0 until queue.size) { val handle = queue.remove handlerExecutor.execute(new Runnable { val invocation = handle.message.asInstanceOf[Invocation] override def run = { + val future = handle.future try { val result = invocation.joinpoint.proceed - handle.future.completeWithResult(result) + if (future.isDefined) future.get.completeWithResult(result) } catch { - case e: Exception => handle.future.completeWithException(invocation.joinpoint.getTarget, e) + case e: Exception => + if (future.isDefined) future.get.completeWithException(invocation.joinpoint.getTarget, e) } messageDemultiplexer.wakeUp } diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala index ab409a941c..07890d4ae2 100644 --- a/kernel/src/main/scala/reactor/Reactor.scala +++ b/kernel/src/main/scala/reactor/Reactor.scala @@ -36,14 +36,14 @@ trait MessageDemultiplexer { class MessageHandle(val sender: AnyRef, val message: AnyRef, - val future: CompletableFutureResult, + val future: Option[CompletableFutureResult], val tx: Option[Transaction]) { override def hashCode(): Int = { var result = HashCode.SEED result = HashCode.hash(result, sender) result = HashCode.hash(result, message) - result = HashCode.hash(result, future) + result = if (future.isDefined) HashCode.hash(result, future.get) else result result = if (tx.isDefined) HashCode.hash(result, tx.get.id) else result result } @@ -53,7 +53,8 @@ class MessageHandle(val sender: AnyRef, that.isInstanceOf[MessageHandle] && that.asInstanceOf[MessageHandle].sender == sender && that.asInstanceOf[MessageHandle].message == message && - that.asInstanceOf[MessageHandle].future == future && + that.asInstanceOf[MessageHandle].future.isDefined == future.isDefined && + that.asInstanceOf[MessageHandle].future.get == future.get && that.asInstanceOf[MessageHandle].tx.isDefined == tx.isDefined && that.asInstanceOf[MessageHandle].tx.get.id == tx.get.id } diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala index c02138d854..ebc3fa74c4 100644 --- a/kernel/src/main/scala/stm/Transaction.scala +++ b/kernel/src/main/scala/stm/Transaction.scala @@ -59,7 +59,7 @@ object TransactionIdFactory { def begin(participant: String) = synchronized { ensureIsActiveOrNew - if (status == TransactionStatus.New) log.debug("TX BEGIN - Server [%s] is starting NEW transaction [%s]", participant, this) + if (status == TransactionStatus.New) log.debug("TX BEGIN - Server [%s] is starting NEW transaction [%s]", participant, toString) else log.debug("Server [%s] is participating in transaction", participant) participants ::= participant status = TransactionStatus.Active @@ -67,14 +67,14 @@ object TransactionIdFactory { def precommit(participant: String) = synchronized { if (status == TransactionStatus.Active) { - log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server [%s]", this, participant) + log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server [%s]", toString, participant) precommitted ::= participant } } def commit(participant: String) = synchronized { if (status == TransactionStatus.Active) { - log.debug("TX COMMIT - Committing transaction [%s] for server [%s]", this, participant) + log.debug("TX COMMIT - Committing transaction [%s] for server [%s]", toString, participant) val haveAllPreCommitted = if (participants.size == precommitted.size) {{ for (part <- participants) yield { @@ -92,7 +92,7 @@ object TransactionIdFactory { def rollback(participant: String) = synchronized { ensureIsActiveOrAborted - log.debug("TX ROLLBACK - Server [%s] has initiated transaction rollback for [%s]", participant, this) + log.debug("TX ROLLBACK - Server [%s] has initiated transaction rollback for [%s]", participant, toString) transactionals.items.foreach(_.rollback) status = TransactionStatus.Aborted reset @@ -100,7 +100,7 @@ object TransactionIdFactory { def join(participant: String) = synchronized { ensureIsActive - log.debug("TX JOIN - Server [%s] is joining transaction [%s]" , participant, this) + log.debug("TX JOIN - Server [%s] is joining transaction [%s]" , participant, toString) participants ::= participant } @@ -116,13 +116,13 @@ object TransactionIdFactory { } private def ensureIsActive = if (status != TransactionStatus.Active) - throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]") + throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]: " + toString) private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) - throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]") + throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) - throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]") + throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) // For reinitialize transaction after sending it over the wire private[kernel] def reinit = { diff --git a/kernel/src/test/scala/EventBasedDispatcherTest.scala b/kernel/src/test/scala/EventBasedDispatcherTest.scala index 58eda77b50..e8047a414b 100644 --- a/kernel/src/test/scala/EventBasedDispatcherTest.scala +++ b/kernel/src/test/scala/EventBasedDispatcherTest.scala @@ -1,9 +1,7 @@ package se.scalablesolutions.akka.kernel.reactor -import java.util.concurrent.BrokenBarrierException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock @@ -60,7 +58,7 @@ class EventBasedDispatcherTest { dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult, None)) + dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) @@ -74,8 +72,8 @@ class EventBasedDispatcherTest { dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) dispatcher.start - dispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult, None)) - dispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult, None)) + dispatcher.messageQueue.append(new MessageHandle(key1, new Object, None, None)) + dispatcher.messageQueue.append(new MessageHandle(key2, new Object, None, None)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) } @@ -109,8 +107,8 @@ class EventBasedDispatcherTest { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult, None)) - dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult, None)) + dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) diff --git a/kernel/src/test/scala/InMemoryActorSpec.scala b/kernel/src/test/scala/InMemoryActorSpec.scala index 2397f42e13..abf7781b49 100644 --- a/kernel/src/test/scala/InMemoryActorSpec.scala +++ b/kernel/src/test/scala/InMemoryActorSpec.scala @@ -1,24 +1,29 @@ package se.scalablesolutions.akka.kernel.actor -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit - +import junit.framework.TestCase import kernel.state.TransactionalState -import kernel.reactor._ import org.junit.{Test, Before} import org.junit.Assert._ -case class SetMapState(key: String, value: String) -case class SetVectorState(key: String) -case class SetRefState(key: String) case class GetMapState(key: String) case object GetVectorState case object GetRefState + +case class SetMapState(key: String, value: String) +case class SetVectorState(key: String) +case class SetRefState(key: String) case class Success(key: String, value: String) case class Failure(key: String, value: String, failer: Actor) +case class SetMapStateOneWay(key: String, value: String) +case class SetVectorStateOneWay(key: String) +case class SetRefStateOneWay(key: String) +case class SuccessOneWay(key: String, value: String) +case class FailureOneWay(key: String, value: String, failer: Actor) + class InMemStatefulActor extends Actor { + timeout = 100000 makeTransactional private val mapState = TransactionalState.newInMemoryMap[String, String] private val vectorState = TransactionalState.newInMemoryVector[String] @@ -51,6 +56,22 @@ class InMemStatefulActor extends Actor { refState.swap(msg) failer !! "Failure" reply(msg) + + case SetMapStateOneWay(key, msg) => + mapState.put(key, msg) + case SetVectorStateOneWay(msg) => + vectorState.add(msg) + case SetRefStateOneWay(msg) => + refState.swap(msg) + case SuccessOneWay(key, msg) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + case FailureOneWay(key, msg, failer) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + failer ! "Failure" } } @@ -61,8 +82,19 @@ class InMemFailerActor extends Actor { throw new RuntimeException("expected") } } + +class InMemoryActorSpec extends TestCase { + @Test + def testOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = new InMemStatefulActor + stateful.start + stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + Thread.sleep(100) + stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional + Thread.sleep(100) + assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get) + } -class InMemoryActorSpec { @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = new InMemStatefulActor @@ -72,6 +104,19 @@ class InMemoryActorSpec { assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get) } + @Test + def testOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = new InMemStatefulActor + stateful.start + val failer = new InMemFailerActor + failer.start + stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + Thread.sleep(100) + stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method + Thread.sleep(100) + assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state + } + @Test def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = { val stateful = new InMemStatefulActor @@ -86,16 +131,39 @@ class InMemoryActorSpec { assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state } + @Test + def testOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = new InMemStatefulActor + stateful.start + stateful ! SetVectorStateOneWay("init") // set init state + Thread.sleep(100) + stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional + Thread.sleep(100) + assertEquals("new state", (stateful !! GetVectorState).get) + } + @Test def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = new InMemStatefulActor stateful.start stateful !! SetVectorState("init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional - stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // to trigger commit assertEquals("new state", (stateful !! GetVectorState).get) } + @Test + def testOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = new InMemStatefulActor + stateful.start + stateful ! SetVectorStateOneWay("init") // set init state + Thread.sleep(100) + val failer = new InMemFailerActor + failer.start + stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method + Thread.sleep(100) + assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state + } + @Test def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { val stateful = new InMemStatefulActor @@ -110,16 +178,39 @@ class InMemoryActorSpec { assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state } + @Test + def testOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = new InMemStatefulActor + stateful.start + stateful ! SetRefStateOneWay("init") // set init state + Thread.sleep(100) + stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional + Thread.sleep(100) + assertEquals("new state", (stateful !! GetRefState).get) + } + @Test def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = new InMemStatefulActor stateful.start stateful !! SetRefState("init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional - stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // to trigger commit assertEquals("new state", (stateful !! GetRefState).get) } + @Test + def testOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = new InMemStatefulActor + stateful.start + stateful ! SetRefStateOneWay("init") // set init state + Thread.sleep(100) + val failer = new InMemFailerActor + failer.start + stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method + Thread.sleep(100) + assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state + } + @Test def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = { val stateful = new InMemStatefulActor diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala index f12c72e731..204a96edaa 100644 --- a/kernel/src/test/scala/PersistentActorSpec.scala +++ b/kernel/src/test/scala/PersistentActorSpec.scala @@ -3,6 +3,9 @@ package se.scalablesolutions.akka.kernel.actor import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit +import junit.framework.TestCase +import kernel.Kernel + import kernel.reactor._ import kernel.state.{CassandraStorageConfig, TransactionalState} @@ -61,7 +64,7 @@ object PersistenceManager { isRunning = true } } -class PersistentActorSpec { +class PersistentActorSpec extends TestCase { PersistenceManager.init @Test diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala index 09f46dd128..96aa9616a0 100644 --- a/kernel/src/test/scala/RemoteActorSpec.scala +++ b/kernel/src/test/scala/RemoteActorSpec.scala @@ -3,6 +3,7 @@ package se.scalablesolutions.akka.kernel.actor import concurrent.Lock import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit +import junit.framework.TestCase import kernel.nio.{NettyClient, NettyServer} import reactor._ @@ -28,7 +29,7 @@ class RemoteActorSpecActorBidirectional extends Actor { } } -class RemoteActorSpec { +class RemoteActorSpec extends TestCase { new Thread(new Runnable() { def run = { @@ -42,7 +43,7 @@ class RemoteActorSpec { private val unit = TimeUnit.MILLISECONDS @Test - def sendOneWay = { + def testSendOneWay = { implicit val timeout = 5000L val actor = new RemoteActorSpecActorUnidirectional actor.makeRemote @@ -54,7 +55,7 @@ class RemoteActorSpec { } @Test - def sendReplySync = { + def testSendReplySync = { implicit val timeout = 5000L val actor = new RemoteActorSpecActorBidirectional actor.makeRemote @@ -65,7 +66,7 @@ class RemoteActorSpec { } @Test - def sendReplyAsync = { + def testSendReplyAsync = { implicit val timeout = 5000L val actor = new RemoteActorSpecActorBidirectional actor.makeRemote @@ -75,8 +76,8 @@ class RemoteActorSpec { actor.stop } - @Test - def sendReceiveException = { + @Test + def testSendReceiveException = { implicit val timeout = 5000L val actor = new RemoteActorSpecActorBidirectional actor.makeRemote diff --git a/kernel/src/test/scala/SupervisorSpec.scala b/kernel/src/test/scala/SupervisorSpec.scala index 30bf162e0e..52a2e9c6ad 100644 --- a/kernel/src/test/scala/SupervisorSpec.scala +++ b/kernel/src/test/scala/SupervisorSpec.scala @@ -7,9 +7,6 @@ package se.scalablesolutions.akka.kernel import kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor} import kernel.config.ScalaConfig._ -import scala.collection.Map -import scala.collection.mutable.HashMap - import com.jteigen.scalatest.JUnit4Runner import org.junit.runner.RunWith import org.scalatest._ @@ -21,6 +18,8 @@ import org.scalatest._ class SupervisorSpec extends Suite { var messageLog: String = "" + var oneWayLog: String = "" + var pingpong1: PingPong1Actor = _ var pingpong2: PingPong2Actor = _ var pingpong3: PingPong3Actor = _ @@ -39,11 +38,11 @@ class SupervisorSpec extends Suite { messageLog = "" val sup = getSingleActorOneForOneSupervisor sup ! StartSupervisor - + Thread.sleep(500) intercept(classOf[RuntimeException]) { pingpong1 !! Die } - Thread.sleep(100) + Thread.sleep(500) expect("DIE") { messageLog } @@ -53,49 +52,7 @@ class SupervisorSpec extends Suite { messageLog = "" val sup = getSingleActorOneForOneSupervisor sup ! StartSupervisor - - expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("ping") { - messageLog - } - intercept(classOf[RuntimeException]) { - pingpong1 !! Die - } - Thread.sleep(100) - expect("pingDIE") { - messageLog - } - expect("pong") { - (pingpong1 !! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingDIEping") { - messageLog - } - } - - def testKillSingleActorAllForOne = { - messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor - intercept(classOf[RuntimeException]) { - pingpong1 !! Die - } - Thread.sleep(100) - expect("DIE") { - messageLog - } - } - - def testCallKillCallSingleActorAllForOne = { - messageLog = "" - val sup = getSingleActorAllForOneSupervisor - pingpong1.timeout = 10000000 - sup.timeout = 10000000 - sup ! StartSupervisor + Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } @@ -106,7 +63,49 @@ class SupervisorSpec extends Suite { intercept(classOf[RuntimeException]) { pingpong1 !! Die } - Thread.sleep(1100) + Thread.sleep(500) + expect("pingDIE") { + messageLog + } + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingDIEping") { + messageLog + } + } + + def testKillSingleActorAllForOne = { + messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) + expect("DIE") { + messageLog + } + } + + def testCallKillCallSingleActorAllForOne = { + messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("ping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) expect("pingDIE") { messageLog } @@ -123,10 +122,11 @@ class SupervisorSpec extends Suite { messageLog = "" val sup = getMultipleActorsOneForOneConf sup ! StartSupervisor + Thread.sleep(500) intercept(classOf[RuntimeException]) { pingpong3 !! Die } - Thread.sleep(100) + Thread.sleep(500) expect("DIE") { messageLog } @@ -136,40 +136,41 @@ class SupervisorSpec extends Suite { messageLog = "" val sup = getMultipleActorsOneForOneConf sup ! StartSupervisor + Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pingpingping") { messageLog } intercept(classOf[RuntimeException]) { pingpong2 !! Die } - Thread.sleep(100) + Thread.sleep(500) expect("pingpingpingDIE") { messageLog } expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pingpingpingDIEpingpingping") { messageLog } @@ -179,10 +180,11 @@ class SupervisorSpec extends Suite { messageLog = "" val sup = getMultipleActorsAllForOneConf sup ! StartSupervisor + Thread.sleep(500) intercept(classOf[RuntimeException]) { pingpong2 !! Die } - Thread.sleep(100) + Thread.sleep(500) expect("DIEDIEDIE") { messageLog } @@ -192,45 +194,240 @@ class SupervisorSpec extends Suite { messageLog = "" val sup = getMultipleActorsAllForOneConf sup ! StartSupervisor + Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pingpingping") { messageLog } intercept(classOf[RuntimeException]) { pingpong2 !! Die } - Thread.sleep(100) + Thread.sleep(500) expect("pingpingpingDIEDIEDIE") { messageLog } expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("pingpingpingDIEDIEDIEpingpingping") { messageLog } } + def testOneWayKillSingleActorOneForOne = { + messageLog = "" + val sup = getSingleActorOneForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + pingpong1 ! Die + Thread.sleep(500) + expect("DIE") { + messageLog + } + } + + def testOneWayCallKillCallSingleActorOneForOne = { + messageLog = "" + val sup = getSingleActorOneForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + pingpong1 ! OneWay + Thread.sleep(500) + expect("oneway") { + oneWayLog + } + pingpong1 ! Die + Thread.sleep(500) + expect("DIE") { + messageLog + } + pingpong1 ! OneWay + Thread.sleep(500) + expect("onewayoneway") { + oneWayLog + } + } + + /* + def testOneWayKillSingleActorAllForOne = { + messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong1 ! Die + } + Thread.sleep(500) + expect("DIE") { + messageLog + } + } + + def testOneWayCallKillCallSingleActorAllForOne = { + messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("ping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 ! Die + } + Thread.sleep(500) + expect("pingDIE") { + messageLog + } + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingDIEping") { + messageLog + } + } + + def testOneWayKillMultipleActorsOneForOne = { + messageLog = "" + val sup = getMultipleActorsOneForOneConf + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong3 ! Die + } + Thread.sleep(500) + expect("DIE") { + messageLog + } + } + + def tesOneWayCallKillCallMultipleActorsOneForOne = { + messageLog = "" + val sup = getMultipleActorsOneForOneConf + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 ! Die + } + Thread.sleep(500) + expect("pingpingpingDIE") { + messageLog + } + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingpingDIEpingpingping") { + messageLog + } + } + + def testOneWayKillMultipleActorsAllForOne = { + messageLog = "" + val sup = getMultipleActorsAllForOneConf + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong2 ! Die + } + Thread.sleep(500) + expect("DIEDIEDIE") { + messageLog + } + } + + def tesOneWayCallKillCallMultipleActorsAllForOne = { + messageLog = "" + val sup = getMultipleActorsAllForOneConf + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + pingpong1 ! Ping + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 ! Die + } + Thread.sleep(500) + expect("pingpingpingDIEDIEDIE") { + messageLog + } + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingpingDIEDIEDIEpingpingping") { + messageLog + } + } + */ + /* def testNestedSupervisorsTerminateFirstLevelActorAllForOne = { messageLog = "" @@ -239,7 +436,7 @@ class SupervisorSpec extends Suite { intercept(classOf[RuntimeException]) { pingpong1 !! Die } - Thread.sleep(100) + Thread.sleep(500) expect("DIEDIEDIE") { messageLog } @@ -374,6 +571,9 @@ class SupervisorSpec extends Suite { case Ping => messageLog += "ping" reply("pong") + + case OneWay => + oneWayLog += "oneway" case Die => throw new RuntimeException("DIE") diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala index 58020e6511..effe74ea1c 100644 --- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala @@ -1,10 +1,8 @@ package se.scalablesolutions.akka.kernel.reactor -import java.util.concurrent.BrokenBarrierException import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock @@ -62,7 +60,7 @@ class ThreadBasedDispatcherTest { dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult, None)) + dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) @@ -86,8 +84,8 @@ class ThreadBasedDispatcherTest { } }) dispatcher.start - dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", new NullFutureResult, None)) - dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", new NullFutureResult, None)) + dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", None, None)) + dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", None, None)) handlersBarrier.await(5, TimeUnit.SECONDS) assertFalse(threadingIssueDetected.get) //dispatcher.shutdown @@ -122,8 +120,8 @@ class ThreadBasedDispatcherTest { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult, None)) - dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult, None)) + dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get)