fixed async bug in active object + added AllTests for scala tests

This commit is contained in:
Jonas Boner 2009-07-04 06:38:47 +02:00
parent 800f3bc917
commit d75d769351
14 changed files with 757 additions and 380 deletions

View file

@ -192,9 +192,9 @@ sealed class ActorAroundAdvice(val target: Class[_],
private def localDispatch(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
if (isOneWay(rtti)) actor ! Invocation(joinpoint)
if (isOneWay(rtti)) actor ! Invocation(joinpoint, true)
else {
val result = actor !! Invocation(joinpoint)
val result = actor !! Invocation(joinpoint, false)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
}
@ -233,22 +233,24 @@ sealed class ActorAroundAdvice(val target: Class[_],
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable private[kernel] case class Invocation(val joinpoint: JoinPoint) {
@serializable private[kernel] case class Invocation(val joinpoint: JoinPoint, val isOneWay: Boolean) {
override def toString: String = synchronized {
"Invocation [joinpoint: " + joinpoint.toString + "]"
"Invocation [joinpoint: " + joinpoint.toString + ", isOneWay: " + isOneWay + "]"
}
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, joinpoint)
result = HashCode.hash(result, isOneWay)
result
}
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[Invocation] &&
that.asInstanceOf[Invocation].joinpoint == joinpoint
that.asInstanceOf[Invocation].joinpoint == joinpoint &&
that.asInstanceOf[Invocation].isOneWay == isOneWay
}
}
@ -283,8 +285,9 @@ private[kernel] class Dispatcher extends Actor {
}
override def receive: PartialFunction[Any, Unit] = {
case Invocation(joinpoint: JoinPoint) =>
reply(joinpoint.proceed)
case Invocation(joinpoint, oneWay) =>
if (oneWay) joinpoint.proceed
else reply(joinpoint.proceed)
case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}

View file

@ -32,7 +32,8 @@ final object CassandraStorage extends Logging {
val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
val BLOCKING_CALL = kernel.Kernel.config.getInt("akka.storage.cassandra.blocking", 0)
@volatile private[this] var isRunning = false
private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "serialization") match {
case "serialization" => new JavaSerializationSerializer
@ -48,22 +49,25 @@ final object CassandraStorage extends Logging {
private[this] var thriftServer: CassandraThriftServer = _
def start = {
try {
server.start
log.info("Persistent storage has started up successfully");
} catch {
case e =>
log.error("Could not start up persistent storage")
throw e
}
if (RUN_THRIFT_SERVICE) {
thriftServer = new CassandraThriftServer(server)
thriftServer.start
def start = synchronized {
if (!isRunning) {
try {
server.start
log.info("Persistent storage has started up successfully");
} catch {
case e =>
log.error("Could not start up persistent storage")
throw e
}
if (RUN_THRIFT_SERVICE) {
thriftServer = new CassandraThriftServer(server)
thriftServer.start
}
isRunning
}
}
def stop = {
def stop = if (isRunning) {
//server.storageService.shutdown
if (RUN_THRIFT_SERVICE) thriftServer.stop
}

View file

@ -87,7 +87,7 @@ trait Transactional {
/**
* Base trait for all state implementations (persistent or in-memory).
*
* TODO: Make this class inherit scala.collection.mutable.Map and/or java.util.Map
* FIXME: Create Java versions using pcollections
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -168,9 +168,9 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
def getRange(start: Int, count: Int)
// ---- For Transactional ----
override def begin = changeSet.clear
override def rollback = {}
override def begin = {}
override def rollback = changeSet.clear
// ---- For scala.collection.mutable.Map ----
override def put(key: K, value: V): Option[V] = {
verifyTransaction
@ -200,11 +200,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
// ---- For Transactional ----
override def commit = {
CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
// FIXME: should use batch function once the bug is resolved
// for (entry <- changeSet) {
// val (key, value) = entry
// CassandraStorage.insertMapStorageEntryFor(uuid, key, value)
// }
changeSet.clear
}
// ---- Overriding scala.collection.mutable.Map behavior ----
@ -316,8 +312,8 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
protected[kernel] var changeSet: List[T] = Nil
// ---- For Transactional ----
override def begin = changeSet = Nil
override def rollback = {}
override def begin = {}
override def rollback = changeSet = Nil
// ---- For TransactionalVector ----
override def add(value: T) = {
@ -358,9 +354,8 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- changeSet) {
CassandraStorage.insertVectorStorageEntryFor(uuid, element)
}
for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element)
changeSet = Nil
}
}
@ -398,8 +393,11 @@ class TransactionalRef[T] extends Transactional {
}
class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
override def commit = if (ref.isDefined) CassandraStorage.insertRefStorageFor(uuid, ref.get)
override def commit = if (ref.isDefined) {
CassandraStorage.insertRefStorageFor(uuid, ref.get)
ref = None
}
override def rollback = ref = None
override def get: Option[AnyRef] = {
verifyTransaction
CassandraStorage.getRefStorageFor(uuid)

View file

@ -1,14 +1,10 @@
package se.scalablesolutions.akka.kernel.actor
import concurrent.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import reactor._
import org.junit.{Test, Before}
import org.junit.Assert._
class ActorTest {
class ActorSpec extends junit.framework.TestCase {
private val unit = TimeUnit.MILLISECONDS
class TestActor extends Actor {
@ -20,8 +16,7 @@ class ActorTest {
}
}
@Test
def sendOneWay = {
def testSendOneWay = {
implicit val timeout = 5000L
var oneWay = "nada"
val actor = new Actor {
@ -36,8 +31,7 @@ class ActorTest {
actor.stop
}
@Test
def sendReplySync = {
def testSendReplySync = {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
@ -46,8 +40,7 @@ class ActorTest {
actor.stop
}
@Test
def sendReplyAsync = {
def testSendReplyAsync = {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
@ -56,8 +49,7 @@ class ActorTest {
actor.stop
}
@Test
def sendReceiveException = {
def testSendReceiveException = {
implicit val timeout = 5000L
val actor = new TestActor
actor.start

View file

@ -0,0 +1,22 @@
package se.scalablesolutions.akka.kernel
import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
object AllTests extends TestCase {
def suite(): Test = {
val suite = new TestSuite("All tests")
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.SupervisorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.RemoteSupervisorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.ActorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.RemoteActorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.PersistentActorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.InMemoryActorSpec])
suite
}
def main(args: Array[String]) = junit.textui.TestRunner.run(suite)
}

View file

@ -96,7 +96,7 @@ class PersistentActorSpec extends TestCase {
stateful.start
stateful !! SetVectorState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals(3, (stateful !! GetVectorSize).get)
assertEquals(2, (stateful !! GetVectorSize).get)
}
@Test

View file

@ -10,7 +10,7 @@ import kernel.config.ScalaConfig._
import com.jteigen.scalatest.JUnit4Runner
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.Suite
object Log {
var messageLog: String = ""
@ -20,7 +20,7 @@ object Log {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@RunWith(classOf[JUnit4Runner])
class RemoteSupervisorSpec extends Suite {
class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
Kernel.config
new Thread(new Runnable() {

View file

@ -9,13 +9,13 @@ import kernel.config.ScalaConfig._
import com.jteigen.scalatest.JUnit4Runner
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.Suite
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@RunWith(classOf[JUnit4Runner])
class SupervisorSpec extends Suite {
class SupervisorSpec extends junit.framework.TestCase with Suite {
var messageLog: String = ""
var oneWayLog: String = ""