diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 191af89f75..9ec943cfc6 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.config.FaultHandlingStrategy import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory} -import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} +import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util._ @@ -67,7 +67,7 @@ final class ActiveObjectConfiguration { /** * Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender' - * reference etc. + * reference, the 'senderFuture' reference etc. *
* In order to make use of this context you have to create a member field in your * Active Object that has the type 'ActiveObjectContext', then an instance will @@ -94,21 +94,49 @@ final class ActiveObjectConfiguration { */ final class ActiveObjectContext { private[akka] var _sender: AnyRef = _ + private[akka] var _senderFuture: CompletableFuture[Any] = _ + /** * Returns the current sender Active Object reference. * Scala style getter. */ - def sender = _sender + def sender: AnyRef = { + if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.") + else _sender + } /** * Returns the current sender Active Object reference. * Java style getter. */ - def getSender = _sender + def getSender: AnyRef = { + if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.") + else _sender + } + + /** + * Returns the current sender future Active Object reference. + * Scala style getter. + */ + def senderFuture: Option[CompletableFuture[Any]] = if (_senderFuture eq null) None else Some(_senderFuture) + + /** + * Returns the current sender future Active Object reference. + * Java style getter. + * This method returns 'null' if the sender future is not available. + */ + def getSenderFuture = _senderFuture } +/** + * Internal helper class to help pass the contextual information between threads. + * + * @author Jonas Bonér + */ private[akka] object ActiveObjectContext { - private[actor] val sender = new scala.util.DynamicVariable[AnyRef](null) + import scala.util.DynamicVariable + private[actor] val sender = new DynamicVariable[AnyRef](null) + private[actor] val senderFuture = new DynamicVariable[CompletableFuture[Any]](null) } /** @@ -508,11 +536,12 @@ private[akka] sealed class ActiveObjectAspect { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val isOneWay = isVoid(rtti) val sender = ActiveObjectContext.sender.value + val senderFuture = ActiveObjectContext.senderFuture.value if (isOneWay) { - actorRef ! Invocation(joinPoint, true, true, sender) + actorRef ! Invocation(joinPoint, true, true, sender, senderFuture) null.asInstanceOf[AnyRef] } else { - val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender), timeout) + val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -574,13 +603,14 @@ private[akka] sealed class ActiveObjectAspect { * @author Jonas Bonér */ @serializable private[akka] case class Invocation( - joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef) { + joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) { override def toString: String = synchronized { "Invocation [joinPoint: " + joinPoint.toString + ", isOneWay: " + isOneWay + ", isVoid: " + isVoid + ", sender: " + sender + + ", senderFuture: " + senderFuture + "]" } @@ -590,6 +620,7 @@ private[akka] sealed class ActiveObjectAspect { result = HashCode.hash(result, isOneWay) result = HashCode.hash(result, isVoid) result = HashCode.hash(result, sender) + result = HashCode.hash(result, senderFuture) result } @@ -599,7 +630,8 @@ private[akka] sealed class ActiveObjectAspect { that.asInstanceOf[Invocation].joinPoint == joinPoint && that.asInstanceOf[Invocation].isOneWay == isOneWay && that.asInstanceOf[Invocation].isVoid == isVoid && - that.asInstanceOf[Invocation].sender == sender + that.asInstanceOf[Invocation].sender == sender && + that.asInstanceOf[Invocation].senderFuture == senderFuture } } @@ -672,12 +704,18 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op } def receive = { - case Invocation(joinPoint, isOneWay, _, sender) => - context.foreach(ctx => if (sender ne null) ctx._sender = sender) + case Invocation(joinPoint, isOneWay, _, sender, senderFuture) => + context.foreach { ctx => + if (sender ne null) ctx._sender = sender + if (senderFuture ne null) ctx._senderFuture = senderFuture + } ActiveObjectContext.sender.value = joinPoint.getThis // set next sender + self.senderFuture.foreach(ActiveObjectContext.senderFuture.value = _) + if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (isOneWay) joinPoint.proceed else self.reply(joinPoint.proceed) + // Jan Kronquist: started work on issue 121 case Link(target) => self.link(target) case Unlink(target) => self.unlink(target) diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala index 7b989836a5..4bfc3f30b6 100644 --- a/akka-core/src/main/scala/routing/Listeners.scala +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -6,26 +6,34 @@ package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import java.util.concurrent.CopyOnWriteArraySet + sealed trait ListenerMessage case class Listen(listener: ActorRef) extends ListenerMessage case class Deafen(listener: ActorRef) extends ListenerMessage -case class WithListeners(f: Set[ActorRef] => Unit) extends ListenerMessage +case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage -/** Listeners is a generic trait to implement listening capability on an Actor - * Use thegossip(msg) method to have it sent to the listenees
- * Send Listen(self) to start listening
- * Send Deafen(self) to stop listening
- * Send WithListeners(fun) to traverse the current listeners
+/**
+ * Listeners is a generic trait to implement listening capability on an Actor.
+ *
+ * Use the gossip(msg) method to have it sent to the listeners.
+ *
+ * Send Listen(self) to start listening.
+ *
+ * Send Deafen(self) to stop listening.
+ *
+ * Send WithListeners(fun) to traverse the current listeners.
*/
-trait Listeners { self : Actor =>
- import se.scalablesolutions.akka.actor.Agent
- private lazy val listeners = Agent(Set[ActorRef]())
+trait Listeners { self: Actor =>
+ private val listeners = new CopyOnWriteArraySet[ActorRef]
- protected def listenerManagement : Receive = {
- case Listen(l) => listeners( _ + l)
- case Deafen(l) => listeners( _ - l )
- case WithListeners(f) => listeners foreach f
+ protected def listenerManagement: Receive = {
+ case Listen(l) => listeners add l
+ case Deafen(l) => listeners remove l
+ case WithListeners(f) => f(listenersAsList)
}
- protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
+ protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg)
+
+ private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]]
}
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index bdc743a53b..787896682d 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -281,10 +281,16 @@ object Transaction {
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
- def run = tx.commit
+ def run = {
+ log.trace("=========> Committing transaction [%s]", mtx)
+ tx.commit
+ }
})
txSet.registerOnAbortTask(new Runnable() {
- def run = tx.abort
+ def run = {
+ log.trace("=========> Aborting transaction [%s]", mtx)
+ tx.abort
+ }
})
}
}.execute()
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index b0abc4d11e..a09be9bd51 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -100,7 +100,7 @@ class AccountActor extends Transactor {
import org.scalatest.junit.JUnitSuite
class RedisPersistentActorSpec extends JUnitSuite {
@Test
- def testSuccessfulDebit = {
+ def testSuccessfulDebit {
val bactor = actorOf[AccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
@@ -108,39 +108,45 @@ class RedisPersistentActorSpec extends JUnitSuite {
val acc = "a-123"
- val a: Option[BigInt] = bactor !! Credit(acc, 5000)
- println("a = " + a)
+ println("----------- SIZE 0 " + (bactor !! LogSize).get)
+
+ bactor !! Credit(acc, 5000)
+ println("----------- SIZE 1 " + (bactor !! LogSize).get)
+
println(bactor !! Balance(acc))
- /**
- bactor !! Debit("a-123", 3000, failer)
- val c: Int = (bactor !! LogSize).get
- println(c)
- bactor !! Debit("a-123", 3000, failer)
- assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
- val d: Int = (bactor !! LogSize).get
- println(d)
+ println("----------- SIZE 2 " + (bactor !! LogSize).get)
- bactor !! Credit("a-123", 7000)
- assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
+ bactor !! Debit(acc, 3000, failer)
+ println("----------- SIZE 3 " + (bactor !! LogSize).get)
+
+ assertEquals(BigInt(2000), (bactor !! Balance(acc)).get)
+ println("----------- SIZE 4 " + (bactor !! LogSize).get)
- bactor !! Debit("a-123", 8000, failer)
- assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
+ bactor !! Credit(acc, 7000)
+ println("----------- SIZE 5 " + (bactor !! LogSize).get)
+
+ assertEquals(BigInt(9000), (bactor !! Balance(acc)).get)
+ println("----------- SIZE 6 " + (bactor !! LogSize).get)
+
+ bactor !! Debit(acc, 8000, failer)
+ println("----------- SIZE 7 " + (bactor !! LogSize).get)
+
+ assertEquals(BigInt(1000), (bactor !! Balance(acc)).get)
+ println("----------- SIZE 8 " + (bactor !! LogSize).get)
+
+ assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7
- val c: Int = (bactor !! LogSize).get
- println(c)
- assertTrue(7 == c)
import scala.collection.mutable.ArrayBuffer
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
- **/
}
/**
@Test
- def testUnsuccessfulDebit = {
+ def testUnsuccessfulDebit {
val bactor = actorOf(new AccountActor)
bactor.start
bactor !! Credit("a-123", 5000)
@@ -161,7 +167,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
}
@Test
- def testUnsuccessfulMultiDebit = {
+ def testUnsuccessfulMultiDebit {
val bactor = actorOf(new AccountActor)
bactor.start
bactor !! Credit("a-123", 5000)
diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar
new file mode 100644
index 0000000000..d939a49d7c
Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar differ
diff --git a/embedded-repo/sbinary/sbinary/2.8.0.RC3-0.3.1-SNAPSHOT/sbinary-2.8.0.RC3-0.3.1-SNAPSHOT.jar b/embedded-repo/sbinary/sbinary/2.8.0.RC3-0.3.1-SNAPSHOT/sbinary-2.8.0.RC3-0.3.1-SNAPSHOT.jar
new file mode 100644
index 0000000000..0d53cb1731
Binary files /dev/null and b/embedded-repo/sbinary/sbinary/2.8.0.RC3-0.3.1-SNAPSHOT/sbinary-2.8.0.RC3-0.3.1-SNAPSHOT.jar differ
diff --git a/embedded-repo/sbinary/sbinary/2.8.0.RC3-0.3.1-SNAPSHOT/sbinary-2.8.0.RC3-0.3.1-SNAPSHOT.pom b/embedded-repo/sbinary/sbinary/2.8.0.RC3-0.3.1-SNAPSHOT/sbinary-2.8.0.RC3-0.3.1-SNAPSHOT.pom
new file mode 100644
index 0000000000..694f509fea
--- /dev/null
+++ b/embedded-repo/sbinary/sbinary/2.8.0.RC3-0.3.1-SNAPSHOT/sbinary-2.8.0.RC3-0.3.1-SNAPSHOT.pom
@@ -0,0 +1,8 @@
+
+