From 15ed113112e9a7c48257552445de97fe44f70d35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Mon, 1 Mar 2010 22:03:17 +0100 Subject: [PATCH] improved perf with 25 % + renamed FutureResult -> Future + Added lightweight future factory method --- .../src/main/scala/actor/ActiveObject.scala | 4 +- akka-core/src/main/scala/actor/Actor.scala | 19 +++---- .../ExecutorBasedEventDrivenDispatcher.scala | 13 ++++- .../src/main/scala/dispatch/Future.scala | 38 +++++++++---- .../src/main/scala/dispatch/Reactor.scala | 2 +- .../scala/dispatch/ThreadPoolBuilder.scala | 2 +- .../src/main/scala/remote/RemoteClient.scala | 14 ++--- .../src/main/scala/stm/DataFlowVariable.scala | 4 +- .../src/test/scala/ActorRegistryTest.scala | 56 +------------------ 9 files changed, 60 insertions(+), 92 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 1858952f40..d88f0e861b 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} -import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult} +import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util._ @@ -299,7 +299,7 @@ private[akka] sealed class ActiveObjectAspect { } } - private def getResultOrThrowException[T](future: FutureResult): Option[T] = + private def getResultOrThrowException[T](future: Future): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get throw cause diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a1eb6bd309..ea230b7d45 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -240,7 +240,7 @@ trait Actor extends TransactionManagement { * But it can be used for advanced use-cases when one might want to store away the future and * resolve it later and/or somewhere else. */ - protected var senderFuture: Option[CompletableFutureResult] = None + protected var senderFuture: Option[CompletableFuture] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -533,7 +533,10 @@ trait Actor extends TransactionManagement { */ def !![T](message: Any): Option[T] = !![T](message, timeout) - def !!!(message: Any): FutureResult = { + /** + * FIXME document !!! + */ + def !!!(message: Any): Future = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) @@ -541,12 +544,6 @@ trait Actor extends TransactionManagement { "Actor has not been started, you need to invoke 'actor.start' before using it") } - /** - * This method is evil and has been removed. Use '!!' with a timeout instead. - */ - def !?[T](message: Any): T = throw new UnsupportedOperationException( - "'!?' is evil and has been removed. Use '!!' with a timeout instead") - /** * Forwards the message and passes the original sender actor as the sender. *

@@ -832,7 +829,7 @@ trait Actor extends TransactionManagement { protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { + senderFuture: Option[CompletableFuture]): CompletableFuture = { if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -850,7 +847,7 @@ trait Actor extends TransactionManagement { else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFutureResult(timeout) + else new DefaultCompletableFuture(timeout) val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get) if (_isEventBased) { _mailbox.add(invocation) @@ -937,7 +934,7 @@ trait Actor extends TransactionManagement { } } - private def getResultOrThrowException[T](future: FutureResult): Option[T] = + private def getResultOrThrowException[T](future: Future): Option[T] = if (future.exception.isDefined) throw future.exception.get._2 else future.result.asInstanceOf[Option[T]] diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 7da13a10b3..d35b8205d1 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,15 +62,22 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - invocation.receiver.synchronized { + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } + } + /* invocation.receiver.synchronized { val messages = invocation.receiver._mailbox.iterator while (messages.hasNext) { - messages.next.asInstanceOf[MessageInvocation].invoke + messages.next.invoke messages.remove } } } - }) + */ + }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") def start = if (!active) { diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index c1e61695b8..0dcc0f850c 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -2,22 +2,37 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -/** - * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. - */ package se.scalablesolutions.akka.dispatch import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.{SynchronousQueue, TimeUnit} +import java.util.concurrent.TimeUnit class FutureTimeoutException(message: String) extends RuntimeException(message) object Futures { - def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await) - def awaitOne(futures: List[FutureResult]): FutureResult = { - var future: Option[FutureResult] = None + /** + *

+   * val future = Futures.future(1000) {
+   *  ... // do stuff
+   * }
+   * 
+ */ + def future(timeout: Long)(body: => Any): Future = { + val promise = new DefaultCompletableFuture(timeout) + try { + promise completeWithResult body + } catch { + case e => promise completeWithException (None, e) + } + promise + } + + def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await) + + def awaitOne(futures: List[Future]): Future = { + var future: Option[Future] = None do { future = futures.find(_.isCompleted) } while (future.isEmpty) @@ -25,7 +40,7 @@ object Futures { } /* - def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = { + def awaitEither(f1: Future, f2: Future): Option[Any] = { import Actor.Sender.Self import Actor.{spawn, actor} @@ -54,7 +69,7 @@ object Futures { */ } -sealed trait FutureResult { +sealed trait Future { def await def awaitBlocking def isCompleted: Boolean @@ -64,12 +79,13 @@ sealed trait FutureResult { def exception: Option[Tuple2[AnyRef, Throwable]] } -trait CompletableFutureResult extends FutureResult { +trait CompletableFuture extends Future { def completeWithResult(result: Any) def completeWithException(toBlame: AnyRef, exception: Throwable) } -class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult { +// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. +class DefaultCompletableFuture(timeout: Long) extends CompletableFuture { private val TIME_UNIT = TimeUnit.MILLISECONDS def this() = this(0) diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index f7bfa52215..bf8254c64a 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap final class MessageInvocation(val receiver: Actor, val message: Any, - val future: Option[CompletableFutureResult], + val future: Option[CompletableFuture], val sender: Option[Actor], val tx: Option[Transaction]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index cb465907cb..1fedc1a5d7 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -4,11 +4,11 @@ package se.scalablesolutions.akka.dispatch +import java.util.Collection import java.util.concurrent._ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy -import java.util.Collection import se.scalablesolutions.akka.util.Logging trait ThreadPoolBuilder { diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index f97f014f06..0887ebcd82 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import se.scalablesolutions.akka.actor.{Exit, Actor} -import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult} +import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.Config.config @@ -86,7 +86,7 @@ object RemoteClient extends Logging { override def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { + senderFuture: Option[CompletableFuture]): CompletableFuture = { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(className) @@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false - private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] + private val futures = new ConcurrentHashMap[Long, CompletableFuture] private val supervisors = new ConcurrentHashMap[String, Actor] private val channelFactory = new NioClientSocketChannelFactory( @@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging { } } - def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) { + def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) { if (request.getIsOneWay) { connection.getChannel.write(request) None } else { futures.synchronized { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFutureResult(request.getTimeout) + else new DefaultCompletableFuture(request.getTimeout) futures.put(request.getId, futureResult) connection.getChannel.write(request) Some(futureResult) @@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { * @author Jonas Bonér */ class RemoteClientPipelineFactory(name: String, - futures: ConcurrentMap[Long, CompletableFutureResult], + futures: ConcurrentMap[Long, CompletableFuture], supervisors: ConcurrentMap[String, Actor], bootstrap: ClientBootstrap, remoteAddress: SocketAddress, @@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String, */ @ChannelPipelineCoverage {val value = "all"} class RemoteClientHandler(val name: String, - val futures: ConcurrentMap[Long, CompletableFutureResult], + val futures: ConcurrentMap[Long, CompletableFuture], val supervisors: ConcurrentMap[String, Actor], val bootstrap: ClientBootstrap, val remoteAddress: SocketAddress, diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index aa5a8255e4..daed4ec55f 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.dispatch.CompletableFutureResult +import se.scalablesolutions.akka.dispatch.CompletableFuture /** * Implements Oz-style dataflow (single assignment) variables. @@ -74,7 +74,7 @@ object DataFlow { private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { timeout = TIME_OUT start - private var readerFuture: Option[CompletableFutureResult] = None + private var readerFuture: Option[CompletableFuture] = None def receive = { case Get => val ref = dataFlow.value.get diff --git a/akka-core/src/test/scala/ActorRegistryTest.scala b/akka-core/src/test/scala/ActorRegistryTest.scala index a751acee3e..ada0c027d5 100644 --- a/akka-core/src/test/scala/ActorRegistryTest.scala +++ b/akka-core/src/test/scala/ActorRegistryTest.scala @@ -3,58 +3,6 @@ package se.scalablesolutions.akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test -/* -class ActorRegistryTest extends JUnitSuite { - - val registry = ActorRegistry - - @Test - def testRegistrationWithDefaultId { - val actor = new TestActor1 - assertEquals(actor.getClass.getName, actor.getId) - testRegistration(actor, classOf[TestActor1]) - } - - @Test - def testRegistrationWithCustomId { - val actor = new TestActor2 - assertEquals("customid", actor.getId) - testRegistration(actor, classOf[TestActor2]) - } - - private def testRegistration[T <: Actor](actor: T, actorClass: Class[T]) { - assertEquals("non-started actor registered", Nil, registry.actorsFor(actorClass)) - assertEquals("non-started actor registered", Nil, registry.actorsFor(actor.getId)) - assertEquals("non-started actor registered", None, registry.actorFor(actor.uuid)) - actor.start - assertEquals("actor not registered", List(actor), registry.actorsFor(actorClass)) - assertEquals("actor not registered", List(actor), registry.actorsFor(actor.getId)) - assertEquals("actor not registered", Some(actor), registry.actorFor(actor.uuid)) - actor.stop - assertEquals("stopped actor registered", Nil, registry.actorsFor(actorClass)) - assertEquals("stopped actor registered", Nil, registry.actorsFor(actor.getId)) - assertEquals("stopped actor registered", None, registry.actorFor(actor.uuid)) - } - -} - -class TestActor1 extends Actor { - - // use default id - - protected def receive = null - -} - -class TestActor2 extends Actor { - - id = "customid" - - protected def receive = null - -} - - */ class ActorRegistryTest extends JUnitSuite { var record = "" class TestActor extends Actor { @@ -62,6 +10,7 @@ class ActorRegistryTest extends JUnitSuite { def receive = { case "ping" => record = "pong" + record + reply("got ping") } } @@ -180,8 +129,7 @@ class ActorRegistryTest extends JUnitSuite { val actor2 = new TestActor actor2.start record = "" - ActorRegistry.foreach(actor => actor send "ping") - Thread.sleep(1000) + ActorRegistry.foreach(actor => actor !! "ping") assert(record === "pongpong") actor1.stop actor2.stop