diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 1858952f40..d88f0e861b 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} -import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult} +import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util._ @@ -299,7 +299,7 @@ private[akka] sealed class ActiveObjectAspect { } } - private def getResultOrThrowException[T](future: FutureResult): Option[T] = + private def getResultOrThrowException[T](future: Future): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get throw cause diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 2919cc5c71..135e45f317 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -17,11 +17,11 @@ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} import org.multiverse.api.ThreadLocalTransaction._ +import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.{Queue, HashSet} import java.util.concurrent.ConcurrentLinkedQueue import java.net.InetSocketAddress -import org.multiverse.commitbarriers.CountDownCommitBarrier /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -91,7 +91,7 @@ object Actor extends Logging { */ def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { start - def receive = body + def receive: PartialFunction[Any, Unit] = body } /** @@ -134,11 +134,14 @@ object Actor extends Logging { * } * */ - def spawn(body: => Unit): Actor = new Actor() { - start - body - def receive = { - case _ => throw new IllegalArgumentException("Actors created with 'actor(body: => Unit)' do not respond to messages.") + def spawn(body: => Unit): Actor = { + case object Spawn + new Actor() { + start + send(Spawn) + def receive = { + case Spawn => body; stop + } } } @@ -199,8 +202,6 @@ trait Actor extends TransactionManagement { // Only mutable for RemoteServer in order to maintain identity across nodes private[akka] var _uuid = UUID.newUuid.toString - def uuid = _uuid - // ==================================== // private fields // ==================================== @@ -239,7 +240,7 @@ trait Actor extends TransactionManagement { * But it can be used for advanced use-cases when one might want to store away the future and * resolve it later and/or somewhere else. */ - protected var senderFuture: Option[CompletableFutureResult] = None + protected var senderFuture: Option[CompletableFuture] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -256,7 +257,7 @@ trait Actor extends TransactionManagement { * use a custom name to be able to retrieve the "correct" persisted state * upon restart, remote restart etc. */ - protected[akka] var id: String = this.getClass.getName + protected var id: String = this.getClass.getName /** * User overridable callback/setting. @@ -266,8 +267,6 @@ trait Actor extends TransactionManagement { */ @volatile var timeout: Long = Actor.TIMEOUT - ActorRegistry.register(this) - /** * User overridable callback/setting. *
@@ -415,6 +414,7 @@ trait Actor extends TransactionManagement { init } Actor.log.debug("[%s] has started", toString) + ActorRegistry.register(this) this } @@ -533,26 +533,17 @@ trait Actor extends TransactionManagement { */ def !: Option[T] = ! - - /* - //FIXME 2.8 def !!!(message: Any)(implicit sender: AnyRef = None): FutureResult = { - def !!!(message: Any)(implicit sender: AnyRef): FutureResult = { + /** + * FIXME document !!! + */ + def !!!(message: Any): Future = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { - val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) - else None - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, from) + postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } - */ - /** - * This method is evil and has been removed. Use '!!' with a timeout instead. - */ - def !?[T](message: Any): T = throw new UnsupportedOperationException( - "'!?' is evil and has been removed. Use '!!' with a timeout instead") - /** * Forwards the message and passes the original sender actor as the sender. * @@ -763,6 +754,16 @@ trait Actor extends TransactionManagement { actor } + /** + * Returns the id for the actor. + */ + def getId = id + + /** + * Returns the uuid for the actor. + */ + def uuid = _uuid + // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= @@ -804,12 +805,12 @@ trait Actor extends TransactionManagement { // set the source fields used to reply back to the original sender // (i.e. not the remote proxy actor) - if (sender.isDefined) { + if(sender.isDefined) { val s = sender.get requestBuilder.setSourceTarget(s.getClass.getName) requestBuilder.setSourceUuid(s.uuid) - val (host, port) = s._replyToAddress.map(actor => (actor.getHostName, actor.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT)) log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) @@ -823,7 +824,9 @@ trait Actor extends TransactionManagement { if (_isEventBased) { _mailbox.add(invocation) if (_isSuspended) invocation.send - } else invocation.send + } + else + invocation.send } clearTransactionSet } @@ -831,7 +834,7 @@ trait Actor extends TransactionManagement { protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { + senderFuture: Option[CompletableFuture]): CompletableFuture = { if (isTransactionSetInScope) { log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message) getTransactionSetInScope.incParties @@ -855,7 +858,7 @@ trait Actor extends TransactionManagement { else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFutureResult(timeout) + else new DefaultCompletableFuture(timeout) val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get) if (_isEventBased) { _mailbox.add(invocation) @@ -956,7 +959,7 @@ trait Actor extends TransactionManagement { } } - private def getResultOrThrowException[T](future: FutureResult): Option[T] = + private def getResultOrThrowException[T](future: Future): Option[T] = if (future.exception.isDefined) throw future.exception.get._2 else future.result.asInstanceOf[Option[T]] @@ -1064,6 +1067,5 @@ trait Actor extends TransactionManagement { that.asInstanceOf[Actor]._uuid == _uuid } - override def toString(): String = "Actor[" + id + ":" + uuid + "]" - + override def toString = "Actor[" + id + ":" + uuid + "]" } diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 63314ae051..9e0b1cba08 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -6,81 +6,123 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.util.Logging -import scala.collection.mutable.{ListBuffer, HashMap} +import scala.collection.mutable.ListBuffer import scala.reflect.Manifest +import java.util.concurrent.ConcurrentHashMap + /** - * Registry holding all actor instances, mapped by class and the actor's id field (which can be set by user-code). + * Registry holding all Actor instances in the whole system. + * Mapped by: + *
+ * val future = Futures.future(1000) {
+ * ... // do stuff
+ * }
+ *
+ */
+ def future(timeout: Long)(body: => Any): Future = {
+ val promise = new DefaultCompletableFuture(timeout)
+ try {
+ promise completeWithResult body
+ } catch {
+ case e => promise completeWithException (None, e)
+ }
+ promise
+ }
+
+ def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
+
+ def awaitOne(futures: List[Future]): Future = {
+ var future: Option[Future] = None
+ do {
+ future = futures.find(_.isCompleted)
+ } while (future.isEmpty)
+ future.get
+ }
+
+ /*
+ def awaitEither(f1: Future, f2: Future): Option[Any] = {
+ import Actor.Sender.Self
+ import Actor.{spawn, actor}
+
+ case class Result(res: Option[Any])
+ val handOff = new SynchronousQueue[Option[Any]]
+ spawn {
+ try {
+ println("f1 await")
+ f1.await
+ println("f1 offer")
+ handOff.offer(f1.result)
+ } catch {case _ => {}}
+ }
+ spawn {
+ try {
+ println("f2 await")
+ f2.await
+ println("f2 offer")
+ println("f2 offer: " + f2.result)
+ handOff.offer(f2.result)
+ } catch {case _ => {}}
+ }
+ Thread.sleep(100)
+ handOff.take
+ }
+*/
+}
+
+sealed trait Future {
def await
def awaitBlocking
def isCompleted: Boolean
@@ -22,12 +80,13 @@ sealed trait FutureResult {
def exception: Option[Tuple2[AnyRef, Throwable]]
}
-trait CompletableFutureResult extends FutureResult {
+trait CompletableFuture extends Future {
def completeWithResult(result: Any)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
-class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult {
+// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
+class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)
@@ -46,7 +105,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
var start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
- if (wait <= 0) throw new FutureTimeoutException("Future timed out after [" + timeout + "] milliseconds")
+ if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
} catch {
case e: InterruptedException =>
wait = wait - (currentTimeInNanos - start)
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index b5d4d634f6..627d27aeac 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -15,7 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: Actor,
val message: Any,
- val future: Option[CompletableFutureResult],
+ val future: Option[CompletableFuture],
val sender: Option[Actor],
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
index cb465907cb..1fedc1a5d7 100644
--- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
@@ -4,11 +4,11 @@
package se.scalablesolutions.akka.dispatch
+import java.util.Collection
import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
-import java.util.Collection
import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder {
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index f97f014f06..0887ebcd82 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
-import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
+import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config
@@ -86,7 +86,7 @@ object RemoteClient extends Logging {
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
- senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+ senderFuture: Option[CompletableFuture]): CompletableFuture = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
- private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
+ private val futures = new ConcurrentHashMap[Long, CompletableFuture]
private val supervisors = new ConcurrentHashMap[String, Actor]
private val channelFactory = new NioClientSocketChannelFactory(
@@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
}
- def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
+ def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFutureResult(request.getTimeout)
+ else new DefaultCompletableFuture(request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
* @author Jonas Bonér
*/
class RemoteClientPipelineFactory(name: String,
- futures: ConcurrentMap[Long, CompletableFutureResult],
+ futures: ConcurrentMap[Long, CompletableFuture],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
@@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String,
*/
@ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String,
- val futures: ConcurrentMap[Long, CompletableFutureResult],
+ val futures: ConcurrentMap[Long, CompletableFuture],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 5288bd7bd4..6da2ceea99 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -196,8 +196,8 @@ class RemoteServer extends Logging {
* Register Remote Actor by the Actor's 'id' field.
*/
def register(actor: Actor) = if (isRunning) {
- log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id)
- RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
+ log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
+ RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
}
/**
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index aa5a8255e4..daed4ec55f 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
-import se.scalablesolutions.akka.dispatch.CompletableFutureResult
+import se.scalablesolutions.akka.dispatch.CompletableFuture
/**
* Implements Oz-style dataflow (single assignment) variables.
@@ -74,7 +74,7 @@ object DataFlow {
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
start
- private var readerFuture: Option[CompletableFutureResult] = None
+ private var readerFuture: Option[CompletableFuture] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
diff --git a/akka-core/src/test/scala/ActorRegistryTest.scala b/akka-core/src/test/scala/ActorRegistryTest.scala
new file mode 100644
index 0000000000..ada0c027d5
--- /dev/null
+++ b/akka-core/src/test/scala/ActorRegistryTest.scala
@@ -0,0 +1,160 @@
+package se.scalablesolutions.akka.actor
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+class ActorRegistryTest extends JUnitSuite {
+ var record = ""
+ class TestActor extends Actor {
+ id = "MyID"
+ def receive = {
+ case "ping" =>
+ record = "pong" + record
+ reply("got ping")
+ }
+ }
+
+ @Test def shouldGetActorByIdFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ actor.start
+ val actors = ActorRegistry.actorsFor("MyID")
+ assert(actors.size === 1)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId == "MyID")
+ actor.stop
+ }
+
+ @Test def shouldGetActorByUUIDFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ val uuid = actor.uuid
+ actor.start
+ val actorOrNone = ActorRegistry.actorFor(uuid)
+ assert(actorOrNone.isDefined)
+ assert(actorOrNone.get.uuid === uuid)
+ actor.stop
+ }
+
+ @Test def shouldGetActorByClassFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ actor.start
+ val actors = ActorRegistry.actorsFor(classOf[TestActor])
+ assert(actors.size === 1)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ actor.stop
+ }
+
+ @Test def shouldGetActorByManifestFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ actor.start
+ val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
+ assert(actors.size === 1)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ actor.stop
+ }
+
+ @Test def shouldGetActorsByIdFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors = ActorRegistry.actorsFor("MyID")
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetActorsByClassFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors = ActorRegistry.actorsFor(classOf[TestActor])
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetActorsByManifestFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetAllActorsFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors = ActorRegistry.actors
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ record = ""
+ ActorRegistry.foreach(actor => actor !! "ping")
+ assert(record === "pongpong")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldShutdownAllActorsInActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ ActorRegistry.shutdownAll
+ assert(ActorRegistry.actors.size === 0)
+ }
+
+ @Test def shouldRemoveUnregisterActorInActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ assert(ActorRegistry.actors.size === 2)
+ ActorRegistry.unregister(actor1)
+ assert(ActorRegistry.actors.size === 1)
+ ActorRegistry.unregister(actor2)
+ assert(ActorRegistry.actors.size === 0)
+ }
+}
diff --git a/akka-core/src/test/scala/FutureTest.scala b/akka-core/src/test/scala/FutureTest.scala
new file mode 100644
index 0000000000..d073a92557
--- /dev/null
+++ b/akka-core/src/test/scala/FutureTest.scala
@@ -0,0 +1,111 @@
+package se.scalablesolutions.akka.actor
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+import se.scalablesolutions.akka.dispatch.Futures
+
+class FutureTest extends JUnitSuite {
+ class TestActor extends Actor {
+ def receive = {
+ case "Hello" =>
+ reply("World")
+ case "NoReply" => {}
+ case "Failure" =>
+ throw new RuntimeException("expected")
+ }
+ }
+
+ @Test def shouldActorReplyResultThroughExplicitFuture = {
+ val actor = new TestActor
+ actor.start
+ val future = actor !!! "Hello"
+ future.await
+ assert(future.result.isDefined)
+ assert("World" === future.result.get)
+ actor.stop
+ }
+
+ @Test def shouldActorReplyExceptionThroughExplicitFuture = {
+ val actor = new TestActor
+ actor.start
+ val future = actor !!! "Failure"
+ future.await
+ assert(future.exception.isDefined)
+ assert("expected" === future.exception.get._2.getMessage)
+ actor.stop
+ }
+
+ /*
+ @Test def shouldFutureAwaitEitherLeft = {
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val future1 = actor1 !!! "Hello"
+ val future2 = actor2 !!! "NoReply"
+ val result = Futures.awaitEither(future1, future2)
+ assert(result.isDefined)
+ assert("World" === result.get)
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldFutureAwaitEitherRight = {
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val future1 = actor1 !!! "NoReply"
+ val future2 = actor2 !!! "Hello"
+ val result = Futures.awaitEither(future1, future2)
+ assert(result.isDefined)
+ assert("World" === result.get)
+ actor1.stop
+ actor2.stop
+ }
+ */
+ @Test def shouldFutureAwaitOneLeft = {
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val future1 = actor1 !!! "NoReply"
+ val future2 = actor2 !!! "Hello"
+ val result = Futures.awaitOne(List(future1, future2))
+ assert(result.result.isDefined)
+ assert("World" === result.result.get)
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldFutureAwaitOneRight = {
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val future1 = actor1 !!! "Hello"
+ val future2 = actor2 !!! "NoReply"
+ val result = Futures.awaitOne(List(future1, future2))
+ assert(result.result.isDefined)
+ assert("World" === result.result.get)
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldFutureAwaitAll = {
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val future1 = actor1 !!! "Hello"
+ val future2 = actor2 !!! "Hello"
+ Futures.awaitAll(List(future1, future2))
+ assert(future1.result.isDefined)
+ assert("World" === future1.result.get)
+ assert(future2.result.isDefined)
+ assert("World" === future2.result.get)
+ actor1.stop
+ actor2.stop
+ }
+
+}
diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala
index cc2825145a..b967c07df7 100644
--- a/akka-patterns/src/main/scala/Patterns.scala
+++ b/akka-patterns/src/main/scala/Patterns.scala
@@ -28,11 +28,6 @@ object Patterns {
val seq = actors
}
- //FIXME 2.8, use default params with CyclicIterator
- /*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor(
- new CyclicIterator(actors())
- ) */
-
def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
override def transform(msg : Any) = msgTransformer(msg)
def routes = routing
@@ -81,21 +76,4 @@ class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
current = nc.tail
nc.head
}
-}
-
-//Agent
-/*
-val a = agent(startValue)
-a.set(_ + 5)
-a.get
-a.foreach println(_)
-*/
-object Agent {
- sealed trait AgentMessage
- case class FunMessage[T](f : (T) => T) extends AgentMessage
- case class ProcMessage[T](f : (T) => Unit) extends AgentMessage
- case class ValMessage[T](t : T) extends AgentMessage
-}
-sealed private[akka] class Agent[T] {
-
}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-cassandra/pom.xml b/akka-persistence/akka-persistence-cassandra/pom.xml
index 4bca9ffbac..d8490382d5 100644
--- a/akka-persistence/akka-persistence-cassandra/pom.xml
+++ b/akka-persistence/akka-persistence-cassandra/pom.xml
@@ -18,6 +18,12 @@