improved perf with 25 % + renamed FutureResult -> Future + Added lightweight future factory method

This commit is contained in:
Jonas Bonér 2010-03-01 22:03:17 +01:00
parent f571c07df2
commit f3a457d4de
9 changed files with 60 additions and 92 deletions

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
@ -299,7 +299,7 @@ private[akka] sealed class ActiveObjectAspect {
}
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw cause

View file

@ -240,7 +240,7 @@ trait Actor extends TransactionManagement {
* But it can be used for advanced use-cases when one might want to store away the future and
* resolve it later and/or somewhere else.
*/
protected var senderFuture: Option[CompletableFutureResult] = None
protected var senderFuture: Option[CompletableFuture] = None
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
@ -533,7 +533,10 @@ trait Actor extends TransactionManagement {
*/
def !![T](message: Any): Option[T] = !![T](message, timeout)
def !!!(message: Any): FutureResult = {
/**
* FIXME document !!!
*/
def !!!(message: Any): Future = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
@ -541,12 +544,6 @@ trait Actor extends TransactionManagement {
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* This method is evil and has been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: Any): T = throw new UnsupportedOperationException(
"'!?' is evil and has been removed. Use '!!' with a timeout instead")
/**
* Forwards the message and passes the original sender actor as the sender.
* <p/>
@ -832,7 +829,7 @@ trait Actor extends TransactionManagement {
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
senderFuture: Option[CompletableFuture]): CompletableFuture = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@ -850,7 +847,7 @@ trait Actor extends TransactionManagement {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(timeout)
else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
_mailbox.add(invocation)
@ -937,7 +934,7 @@ trait Actor extends TransactionManagement {
}
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]]

View file

@ -62,14 +62,21 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = {
invocation.receiver.synchronized {
var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
}
}
/* invocation.receiver.synchronized {
val messages = invocation.receiver._mailbox.iterator
while (messages.hasNext) {
messages.next.asInstanceOf[MessageInvocation].invoke
messages.next.invoke
messages.remove
}
}
}
*/
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")

View file

@ -2,22 +2,37 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
/**
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{SynchronousQueue, TimeUnit}
import java.util.concurrent.TimeUnit
class FutureTimeoutException(message: String) extends RuntimeException(message)
object Futures {
def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[FutureResult]): FutureResult = {
var future: Option[FutureResult] = None
/**
* <pre>
* val future = Futures.future(1000) {
* ... // do stuff
* }
* </pre>
*/
def future(timeout: Long)(body: => Any): Future = {
val promise = new DefaultCompletableFuture(timeout)
try {
promise completeWithResult body
} catch {
case e => promise completeWithException (None, e)
}
promise
}
def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[Future]): Future = {
var future: Option[Future] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
@ -25,7 +40,7 @@ object Futures {
}
/*
def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = {
def awaitEither(f1: Future, f2: Future): Option[Any] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
@ -54,7 +69,7 @@ object Futures {
*/
}
sealed trait FutureResult {
sealed trait Future {
def await
def awaitBlocking
def isCompleted: Boolean
@ -64,12 +79,13 @@ sealed trait FutureResult {
def exception: Option[Tuple2[AnyRef, Throwable]]
}
trait CompletableFutureResult extends FutureResult {
trait CompletableFuture extends Future {
def completeWithResult(result: Any)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult {
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)

View file

@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap
final class MessageInvocation(val receiver: Actor,
val message: Any,
val future: Option[CompletableFutureResult],
val future: Option[CompletableFuture],
val sender: Option[Actor],
val tx: Option[Transaction]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")

View file

@ -4,11 +4,11 @@
package se.scalablesolutions.akka.dispatch
import java.util.Collection
import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import java.util.Collection
import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder {

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config
@ -86,7 +86,7 @@ object RemoteClient extends Logging {
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
senderFuture: Option[CompletableFuture]): CompletableFuture = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val futures = new ConcurrentHashMap[Long, CompletableFuture]
private val supervisors = new ConcurrentHashMap[String, Actor]
private val channelFactory = new NioClientSocketChannelFactory(
@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
}
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(request.getTimeout)
else new DefaultCompletableFuture(request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
futures: ConcurrentMap[Long, CompletableFuture],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String,
*/
@ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val futures: ConcurrentMap[Long, CompletableFuture],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFutureResult
import se.scalablesolutions.akka.dispatch.CompletableFuture
/**
* Implements Oz-style dataflow (single assignment) variables.
@ -74,7 +74,7 @@ object DataFlow {
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
start
private var readerFuture: Option[CompletableFutureResult] = None
private var readerFuture: Option[CompletableFuture] = None
def receive = {
case Get =>
val ref = dataFlow.value.get

View file

@ -3,58 +3,6 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
/*
class ActorRegistryTest extends JUnitSuite {
val registry = ActorRegistry
@Test
def testRegistrationWithDefaultId {
val actor = new TestActor1
assertEquals(actor.getClass.getName, actor.getId)
testRegistration(actor, classOf[TestActor1])
}
@Test
def testRegistrationWithCustomId {
val actor = new TestActor2
assertEquals("customid", actor.getId)
testRegistration(actor, classOf[TestActor2])
}
private def testRegistration[T <: Actor](actor: T, actorClass: Class[T]) {
assertEquals("non-started actor registered", Nil, registry.actorsFor(actorClass))
assertEquals("non-started actor registered", Nil, registry.actorsFor(actor.getId))
assertEquals("non-started actor registered", None, registry.actorFor(actor.uuid))
actor.start
assertEquals("actor not registered", List(actor), registry.actorsFor(actorClass))
assertEquals("actor not registered", List(actor), registry.actorsFor(actor.getId))
assertEquals("actor not registered", Some(actor), registry.actorFor(actor.uuid))
actor.stop
assertEquals("stopped actor registered", Nil, registry.actorsFor(actorClass))
assertEquals("stopped actor registered", Nil, registry.actorsFor(actor.getId))
assertEquals("stopped actor registered", None, registry.actorFor(actor.uuid))
}
}
class TestActor1 extends Actor {
// use default id
protected def receive = null
}
class TestActor2 extends Actor {
id = "customid"
protected def receive = null
}
*/
class ActorRegistryTest extends JUnitSuite {
var record = ""
class TestActor extends Actor {
@ -62,6 +10,7 @@ class ActorRegistryTest extends JUnitSuite {
def receive = {
case "ping" =>
record = "pong" + record
reply("got ping")
}
}
@ -180,8 +129,7 @@ class ActorRegistryTest extends JUnitSuite {
val actor2 = new TestActor
actor2.start
record = ""
ActorRegistry.foreach(actor => actor send "ping")
Thread.sleep(1000)
ActorRegistry.foreach(actor => actor !! "ping")
assert(record === "pongpong")
actor1.stop
actor2.stop