Merge branch 'master' of github.com:jboner/akka into wip-akka-rest-fix

This commit is contained in:
Viktor Klang 2010-05-28 21:49:38 +02:00
commit 2cf60ceae6
11 changed files with 130 additions and 55 deletions

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.FaultHandlingStrategy
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
@ -67,7 +67,7 @@ final class ActiveObjectConfiguration {
/**
* Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* reference etc.
* reference, the 'senderFuture' reference etc.
* <p/>
* In order to make use of this context you have to create a member field in your
* Active Object that has the type 'ActiveObjectContext', then an instance will
@ -94,21 +94,49 @@ final class ActiveObjectConfiguration {
*/
final class ActiveObjectContext {
private[akka] var _sender: AnyRef = _
private[akka] var _senderFuture: CompletableFuture[Any] = _
/**
* Returns the current sender Active Object reference.
* Scala style getter.
*/
def sender = _sender
def sender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
}
/**
* Returns the current sender Active Object reference.
* Java style getter.
*/
def getSender = _sender
def getSender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
}
/**
* Returns the current sender future Active Object reference.
* Scala style getter.
*/
def senderFuture: Option[CompletableFuture[Any]] = if (_senderFuture eq null) None else Some(_senderFuture)
/**
* Returns the current sender future Active Object reference.
* Java style getter.
* This method returns 'null' if the sender future is not available.
*/
def getSenderFuture = _senderFuture
}
/**
* Internal helper class to help pass the contextual information between threads.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] object ActiveObjectContext {
private[actor] val sender = new scala.util.DynamicVariable[AnyRef](null)
import scala.util.DynamicVariable
private[actor] val sender = new DynamicVariable[AnyRef](null)
private[actor] val senderFuture = new DynamicVariable[CompletableFuture[Any]](null)
}
/**
@ -508,11 +536,12 @@ private[akka] sealed class ActiveObjectAspect {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = isVoid(rtti)
val sender = ActiveObjectContext.sender.value
val senderFuture = ActiveObjectContext.senderFuture.value
if (isOneWay) {
actorRef ! Invocation(joinPoint, true, true, sender)
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
null.asInstanceOf[AnyRef]
} else {
val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender), timeout)
val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
@ -574,13 +603,14 @@ private[akka] sealed class ActiveObjectAspect {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable private[akka] case class Invocation(
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef) {
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) {
override def toString: String = synchronized {
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
", isVoid: " + isVoid +
", sender: " + sender +
", senderFuture: " + senderFuture +
"]"
}
@ -590,6 +620,7 @@ private[akka] sealed class ActiveObjectAspect {
result = HashCode.hash(result, isOneWay)
result = HashCode.hash(result, isVoid)
result = HashCode.hash(result, sender)
result = HashCode.hash(result, senderFuture)
result
}
@ -599,7 +630,8 @@ private[akka] sealed class ActiveObjectAspect {
that.asInstanceOf[Invocation].joinPoint == joinPoint &&
that.asInstanceOf[Invocation].isOneWay == isOneWay &&
that.asInstanceOf[Invocation].isVoid == isVoid &&
that.asInstanceOf[Invocation].sender == sender
that.asInstanceOf[Invocation].sender == sender &&
that.asInstanceOf[Invocation].senderFuture == senderFuture
}
}
@ -672,12 +704,18 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
}
def receive = {
case Invocation(joinPoint, isOneWay, _, sender) =>
context.foreach(ctx => if (sender ne null) ctx._sender = sender)
case Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
}
ActiveObjectContext.sender.value = joinPoint.getThis // set next sender
self.senderFuture.foreach(ActiveObjectContext.senderFuture.value = _)
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else self.reply(joinPoint.proceed)
// Jan Kronquist: started work on issue 121
case Link(target) => self.link(target)
case Unlink(target) => self.unlink(target)

View file

@ -6,26 +6,34 @@ package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import java.util.concurrent.CopyOnWriteArraySet
sealed trait ListenerMessage
case class Listen(listener: ActorRef) extends ListenerMessage
case class Deafen(listener: ActorRef) extends ListenerMessage
case class WithListeners(f: Set[ActorRef] => Unit) extends ListenerMessage
case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage
/** Listeners is a generic trait to implement listening capability on an Actor
* Use the <code>gossip(msg)</code> method to have it sent to the listenees
* Send <code>Listen(self)</code> to start listening
* Send <code>Deafen(self)</code> to stop listening
* Send <code>WithListeners(fun)</code> to traverse the current listeners
/**
* Listeners is a generic trait to implement listening capability on an Actor.
* <p/>
* Use the <code>gossip(msg)</code> method to have it sent to the listeners.
* <p/>
* Send <code>Listen(self)</code> to start listening.
* <p/>
* Send <code>Deafen(self)</code> to stop listening.
* <p/>
* Send <code>WithListeners(fun)</code> to traverse the current listeners.
*/
trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[ActorRef]())
trait Listeners { self: Actor =>
private val listeners = new CopyOnWriteArraySet[ActorRef]
protected def listenerManagement : Receive = {
case Listen(l) => listeners( _ + l)
case Deafen(l) => listeners( _ - l )
case WithListeners(f) => listeners foreach f
protected def listenerManagement: Receive = {
case Listen(l) => listeners add l
case Deafen(l) => listeners remove l
case WithListeners(f) => f(listenersAsList)
}
protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg)
private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]]
}

View file

@ -281,10 +281,16 @@ object Transaction {
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
def run = tx.commit
def run = {
log.trace("=========> Committing transaction [%s]", mtx)
tx.commit
}
})
txSet.registerOnAbortTask(new Runnable() {
def run = tx.abort
def run = {
log.trace("=========> Aborting transaction [%s]", mtx)
tx.abort
}
})
}
}.execute()

View file

@ -100,7 +100,7 @@ class AccountActor extends Transactor {
import org.scalatest.junit.JUnitSuite
class RedisPersistentActorSpec extends JUnitSuite {
@Test
def testSuccessfulDebit = {
def testSuccessfulDebit {
val bactor = actorOf[AccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
@ -108,39 +108,45 @@ class RedisPersistentActorSpec extends JUnitSuite {
val acc = "a-123"
val a: Option[BigInt] = bactor !! Credit(acc, 5000)
println("a = " + a)
println("----------- SIZE 0 " + (bactor !! LogSize).get)
bactor !! Credit(acc, 5000)
println("----------- SIZE 1 " + (bactor !! LogSize).get)
println(bactor !! Balance(acc))
/**
bactor !! Debit("a-123", 3000, failer)
val c: Int = (bactor !! LogSize).get
println(c)
bactor !! Debit("a-123", 3000, failer)
assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
val d: Int = (bactor !! LogSize).get
println(d)
println("----------- SIZE 2 " + (bactor !! LogSize).get)
bactor !! Credit("a-123", 7000)
assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
bactor !! Debit(acc, 3000, failer)
println("----------- SIZE 3 " + (bactor !! LogSize).get)
assertEquals(BigInt(2000), (bactor !! Balance(acc)).get)
println("----------- SIZE 4 " + (bactor !! LogSize).get)
bactor !! Debit("a-123", 8000, failer)
assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
bactor !! Credit(acc, 7000)
println("----------- SIZE 5 " + (bactor !! LogSize).get)
assertEquals(BigInt(9000), (bactor !! Balance(acc)).get)
println("----------- SIZE 6 " + (bactor !! LogSize).get)
bactor !! Debit(acc, 8000, failer)
println("----------- SIZE 7 " + (bactor !! LogSize).get)
assertEquals(BigInt(1000), (bactor !! Balance(acc)).get)
println("----------- SIZE 8 " + (bactor !! LogSize).get)
assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7
val c: Int = (bactor !! LogSize).get
println(c)
assertTrue(7 == c)
import scala.collection.mutable.ArrayBuffer
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
**/
}
/**
@Test
def testUnsuccessfulDebit = {
def testUnsuccessfulDebit {
val bactor = actorOf(new AccountActor)
bactor.start
bactor !! Credit("a-123", 5000)
@ -161,7 +167,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
}
@Test
def testUnsuccessfulMultiDebit = {
def testUnsuccessfulMultiDebit {
val bactor = actorOf(new AccountActor)
bactor.start
bactor !! Credit("a-123", 5000)

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sbinary</groupId>
<artifactId>sbinary</artifactId>
<version>2.8.0.RC3-0.3.1-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.6-SNAPSHOT-2.8.RC3</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,7 +1,7 @@
project.organization=se.scalablesolutions.akka
project.name=akka
project.version=0.9
scala.version=2.8.0.RC2
scala.version=2.8.0.RC3
sbt.version=0.7.4
def.scala.version=2.7.7
build.scala.versions=2.8.0.RC2
build.scala.versions=2.8.0.RC3

View file

@ -160,10 +160,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaCoreProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val netty = "org.jboss.netty" % "netty" % "3.2.0.CR1" % "compile"
val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.RC2" % "0.7.3" % "compile"
val dispatch_htdisttp = "net.databinder" % "dispatch-http_2.8.0.RC2" % "0.7.3" % "compile"
val sjson = "sjson.json" % "sjson" % "0.5-SNAPSHOT-2.8.RC2" % "compile"
val sbinary = "sbinary" % "sbinary" % "2.8.0.RC2-0.3.1-SNAPSHOT" % "compile"
val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.RC3" % "0.7.4" % "compile"
val dispatch_htdisttp = "net.databinder" % "dispatch-http_2.8.0.RC3" % "0.7.4" % "compile"
val sjson = "sjson.json" % "sjson" % "0.6-SNAPSHOT-2.8.RC3" % "compile"
val sbinary = "sbinary" % "sbinary" % "2.8.0.RC3-0.3.1-SNAPSHOT" % "compile"
val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.2.1" % "compile"
val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"
val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile"
@ -222,7 +222,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val redis = "com.redis" % "redisclient" % "2.8.0.RC2-1.4-SNAPSHOT" % "compile"
val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}