diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 1858952f40..d88f0e861b 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} -import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult} +import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util._ @@ -299,7 +299,7 @@ private[akka] sealed class ActiveObjectAspect { } } - private def getResultOrThrowException[T](future: FutureResult): Option[T] = + private def getResultOrThrowException[T](future: Future): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get throw cause diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a1eb6bd309..ea230b7d45 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -240,7 +240,7 @@ trait Actor extends TransactionManagement { * But it can be used for advanced use-cases when one might want to store away the future and * resolve it later and/or somewhere else. */ - protected var senderFuture: Option[CompletableFutureResult] = None + protected var senderFuture: Option[CompletableFuture] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -533,7 +533,10 @@ trait Actor extends TransactionManagement { */ def !: Option[T] = ! - def !!!(message: Any): FutureResult = { + /** + * FIXME document !!! + */ + def !!!(message: Any): Future = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) @@ -541,12 +544,6 @@ trait Actor extends TransactionManagement { "Actor has not been started, you need to invoke 'actor.start' before using it") } - /** - * This method is evil and has been removed. Use '!!' with a timeout instead. - */ - def !?[T](message: Any): T = throw new UnsupportedOperationException( - "'!?' is evil and has been removed. Use '!!' with a timeout instead") - /** * Forwards the message and passes the original sender actor as the sender. *
@@ -832,7 +829,7 @@ trait Actor extends TransactionManagement { protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { + senderFuture: Option[CompletableFuture]): CompletableFuture = { if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -850,7 +847,7 @@ trait Actor extends TransactionManagement { else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFutureResult(timeout) + else new DefaultCompletableFuture(timeout) val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get) if (_isEventBased) { _mailbox.add(invocation) @@ -937,7 +934,7 @@ trait Actor extends TransactionManagement { } } - private def getResultOrThrowException[T](future: FutureResult): Option[T] = + private def getResultOrThrowException[T](future: Future): Option[T] = if (future.exception.isDefined) throw future.exception.get._2 else future.result.asInstanceOf[Option[T]] diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 7da13a10b3..d35b8205d1 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,15 +62,22 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - invocation.receiver.synchronized { + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } + } + /* invocation.receiver.synchronized { val messages = invocation.receiver._mailbox.iterator while (messages.hasNext) { - messages.next.asInstanceOf[MessageInvocation].invoke + messages.next.invoke messages.remove } } } - }) + */ + }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") def start = if (!active) { diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index c1e61695b8..0dcc0f850c 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -2,22 +2,37 @@ * Copyright (C) 2009-2010 Scalable Solutions AB
+ * val future = Futures.future(1000) {
+ * ... // do stuff
+ * }
+ *
+ */
+ def future(timeout: Long)(body: => Any): Future = {
+ val promise = new DefaultCompletableFuture(timeout)
+ try {
+ promise completeWithResult body
+ } catch {
+ case e => promise completeWithException (None, e)
+ }
+ promise
+ }
+
+ def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
+
+ def awaitOne(futures: List[Future]): Future = {
+ var future: Option[Future] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
@@ -25,7 +40,7 @@ object Futures {
}
/*
- def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = {
+ def awaitEither(f1: Future, f2: Future): Option[Any] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
@@ -54,7 +69,7 @@ object Futures {
*/
}
-sealed trait FutureResult {
+sealed trait Future {
def await
def awaitBlocking
def isCompleted: Boolean
@@ -64,12 +79,13 @@ sealed trait FutureResult {
def exception: Option[Tuple2[AnyRef, Throwable]]
}
-trait CompletableFutureResult extends FutureResult {
+trait CompletableFuture extends Future {
def completeWithResult(result: Any)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
-class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult {
+// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
+class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index f7bfa52215..bf8254c64a 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap
final class MessageInvocation(val receiver: Actor,
val message: Any,
- val future: Option[CompletableFutureResult],
+ val future: Option[CompletableFuture],
val sender: Option[Actor],
val tx: Option[Transaction]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
index cb465907cb..1fedc1a5d7 100644
--- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
@@ -4,11 +4,11 @@
package se.scalablesolutions.akka.dispatch
+import java.util.Collection
import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
-import java.util.Collection
import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder {
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index f97f014f06..0887ebcd82 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
-import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
+import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config
@@ -86,7 +86,7 @@ object RemoteClient extends Logging {
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
- senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+ senderFuture: Option[CompletableFuture]): CompletableFuture = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
- private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
+ private val futures = new ConcurrentHashMap[Long, CompletableFuture]
private val supervisors = new ConcurrentHashMap[String, Actor]
private val channelFactory = new NioClientSocketChannelFactory(
@@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
}
- def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
+ def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFutureResult(request.getTimeout)
+ else new DefaultCompletableFuture(request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
* @author Jonas Bonér
*/
class RemoteClientPipelineFactory(name: String,
- futures: ConcurrentMap[Long, CompletableFutureResult],
+ futures: ConcurrentMap[Long, CompletableFuture],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
@@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String,
*/
@ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String,
- val futures: ConcurrentMap[Long, CompletableFutureResult],
+ val futures: ConcurrentMap[Long, CompletableFuture],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index aa5a8255e4..daed4ec55f 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
-import se.scalablesolutions.akka.dispatch.CompletableFutureResult
+import se.scalablesolutions.akka.dispatch.CompletableFuture
/**
* Implements Oz-style dataflow (single assignment) variables.
@@ -74,7 +74,7 @@ object DataFlow {
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
start
- private var readerFuture: Option[CompletableFutureResult] = None
+ private var readerFuture: Option[CompletableFuture] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
diff --git a/akka-core/src/test/scala/ActorRegistryTest.scala b/akka-core/src/test/scala/ActorRegistryTest.scala
index a751acee3e..ada0c027d5 100644
--- a/akka-core/src/test/scala/ActorRegistryTest.scala
+++ b/akka-core/src/test/scala/ActorRegistryTest.scala
@@ -3,58 +3,6 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
-/*
-class ActorRegistryTest extends JUnitSuite {
-
- val registry = ActorRegistry
-
- @Test
- def testRegistrationWithDefaultId {
- val actor = new TestActor1
- assertEquals(actor.getClass.getName, actor.getId)
- testRegistration(actor, classOf[TestActor1])
- }
-
- @Test
- def testRegistrationWithCustomId {
- val actor = new TestActor2
- assertEquals("customid", actor.getId)
- testRegistration(actor, classOf[TestActor2])
- }
-
- private def testRegistration[T <: Actor](actor: T, actorClass: Class[T]) {
- assertEquals("non-started actor registered", Nil, registry.actorsFor(actorClass))
- assertEquals("non-started actor registered", Nil, registry.actorsFor(actor.getId))
- assertEquals("non-started actor registered", None, registry.actorFor(actor.uuid))
- actor.start
- assertEquals("actor not registered", List(actor), registry.actorsFor(actorClass))
- assertEquals("actor not registered", List(actor), registry.actorsFor(actor.getId))
- assertEquals("actor not registered", Some(actor), registry.actorFor(actor.uuid))
- actor.stop
- assertEquals("stopped actor registered", Nil, registry.actorsFor(actorClass))
- assertEquals("stopped actor registered", Nil, registry.actorsFor(actor.getId))
- assertEquals("stopped actor registered", None, registry.actorFor(actor.uuid))
- }
-
-}
-
-class TestActor1 extends Actor {
-
- // use default id
-
- protected def receive = null
-
-}
-
-class TestActor2 extends Actor {
-
- id = "customid"
-
- protected def receive = null
-
-}
-
- */
class ActorRegistryTest extends JUnitSuite {
var record = ""
class TestActor extends Actor {
@@ -62,6 +10,7 @@ class ActorRegistryTest extends JUnitSuite {
def receive = {
case "ping" =>
record = "pong" + record
+ reply("got ping")
}
}
@@ -180,8 +129,7 @@ class ActorRegistryTest extends JUnitSuite {
val actor2 = new TestActor
actor2.start
record = ""
- ActorRegistry.foreach(actor => actor send "ping")
- Thread.sleep(1000)
+ ActorRegistry.foreach(actor => actor !! "ping")
assert(record === "pongpong")
actor1.stop
actor2.stop