From cccfd51ed110d2b093004034d65cbdbf792211c5 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 23 Apr 2010 20:46:58 +0200 Subject: [PATCH] Initial parametrization --- akka-camel/src/main/scala/Producer.scala | 2 +- .../src/main/scala/actor/ActiveObject.scala | 4 +-- akka-core/src/main/scala/actor/Actor.scala | 24 +++++++------- .../src/main/scala/dispatch/Future.scala | 32 +++++++++---------- .../src/main/scala/dispatch/Reactor.scala | 2 +- .../src/main/scala/remote/RemoteClient.scala | 18 +++++------ .../src/main/scala/stm/DataFlowVariable.scala | 6 ++-- 7 files changed, 44 insertions(+), 44 deletions(-) diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 2b7d053457..e793794804 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -162,7 +162,7 @@ trait Producer { self: Actor => */ class ProducerResponseSender( headers: Map[String, Any], - replyTo : Option[Either[Actor,CompletableFuture]], + replyTo : Option[Either[Actor,CompletableFuture[Any]]], producer: Actor) extends Synchronization with Logging { implicit val producerActor = Some(producer) // the response sender diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index f3c536fdf8..839ebb1c3e 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -462,11 +462,11 @@ private[akka] sealed class ActiveObjectAspect { } } - private def getResultOrThrowException[T](future: Future): Option[T] = + private def getResultOrThrowException[T](future: Future[T]): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get throw cause - } else future.result.asInstanceOf[Option[T]] + } else future.result private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 37a297d5ca..8d46ff605d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -257,7 +257,7 @@ trait Actor extends TransactionManagement with Logging { * Is Some(Left(Actor)) if sender is an actor * Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result */ - protected var replyTo: Option[Either[Actor,CompletableFuture]] = None + protected var replyTo: Option[Either[Actor,CompletableFuture[Any]]] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -502,9 +502,9 @@ trait Actor extends TransactionManagement with Logging { def !![T](message: Any, timeout: Long): Option[T] = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) + val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val isActiveObject = message.isInstanceOf[Invocation] - if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None) + if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) try { future.await } catch { @@ -514,7 +514,7 @@ trait Actor extends TransactionManagement with Logging { } if (future.exception.isDefined) throw future.exception.get._2 - else future.result.asInstanceOf[Option[T]] + else future.result } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") @@ -539,10 +539,10 @@ trait Actor extends TransactionManagement with Logging { /** * FIXME document !!! */ - def !!!(message: Any): Future = { + def !!![T](message: Any): Future[T] = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) + postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -569,7 +569,7 @@ trait Actor extends TransactionManagement with Logging { */ protected[this] def reply(message: Any) = replyTo match { case Some(Left(actor)) => actor ! message - case Some(Right(future)) => future.completeWithResult(message) + case Some(Right(future : Future[Any])) => future.completeWithResult(message) case _ => throw new IllegalStateException( "\n\tNo sender in scope, can't reply. " + "\n\tYou have probably used the '!' method to either; " + @@ -813,7 +813,7 @@ trait Actor extends TransactionManagement with Logging { RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get) } RemoteProtocolBuilder.setMessage(message, requestBuilder) - RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None) + RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) } else { val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get) if (messageDispatcher.usesActorMailbox) { @@ -824,10 +824,10 @@ trait Actor extends TransactionManagement with Logging { } } - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, - senderFuture: Option[CompletableFuture]): CompletableFuture = { + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { joinTransaction(message) if (_remoteAddress.isDefined) { @@ -847,8 +847,8 @@ trait Actor extends TransactionManagement with Logging { else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture(timeout) - val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get) + else new DefaultCompletableFuture[T](timeout) + val invocation = new MessageInvocation(this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index 0bf9723e31..7e8fd5b087 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -20,8 +20,8 @@ object Futures { * } * */ - def future(timeout: Long)(body: => Any): Future = { - val promise = new DefaultCompletableFuture(timeout) + def future[T](timeout: Long)(body: => T): Future[T] = { + val promise = new DefaultCompletableFuture[T](timeout) try { promise completeWithResult body } catch { @@ -30,10 +30,10 @@ object Futures { promise } - def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await) + def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - def awaitOne(futures: List[Future]): Future = { - var future: Option[Future] = None + def awaitOne(futures: List[Future[_]]): Future[_] = { + var future: Option[Future[_]] = None do { future = futures.find(_.isCompleted) } while (future.isEmpty) @@ -41,12 +41,12 @@ object Futures { } /* - def awaitEither(f1: Future, f2: Future): Option[Any] = { + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = { import Actor.Sender.Self import Actor.{spawn, actor} - case class Result(res: Option[Any]) - val handOff = new SynchronousQueue[Option[Any]] + case class Result(res: Option[T]) + val handOff = new SynchronousQueue[Option[T]] spawn { try { println("f1 await") @@ -70,23 +70,23 @@ object Futures { */ } -sealed trait Future { +sealed trait Future[T] { def await def awaitBlocking def isCompleted: Boolean def isExpired: Boolean def timeoutInNanos: Long - def result: Option[Any] + def result: Option[T] def exception: Option[Tuple2[AnyRef, Throwable]] } -trait CompletableFuture extends Future { - def completeWithResult(result: Any) +trait CompletableFuture[T] extends Future[T] { + def completeWithResult(result: T) def completeWithException(toBlame: AnyRef, exception: Throwable) } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. -class DefaultCompletableFuture(timeout: Long) extends CompletableFuture { +class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private val TIME_UNIT = TimeUnit.MILLISECONDS def this() = this(0) @@ -95,7 +95,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture { private val _lock = new ReentrantLock private val _signal = _lock.newCondition private var _completed: Boolean = _ - private var _result: Option[Any] = None + private var _result: Option[T] = None private var _exception: Option[Tuple2[AnyRef, Throwable]] = None def await = try { @@ -138,7 +138,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture { _lock.unlock } - def result: Option[Any] = try { + def result: Option[T] = try { _lock.lock _result } finally { @@ -152,7 +152,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture { _lock.unlock } - def completeWithResult(result: Any) = try { + def completeWithResult(result: T) = try { _lock.lock if (!_completed) { _completed = true diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index 3f300b1c52..3eecbef0f3 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -15,7 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier final class MessageInvocation(val receiver: Actor, val message: Any, - val replyTo : Option[Either[Actor,CompletableFuture]], + val replyTo : Option[Either[Actor,CompletableFuture[Any]]], val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 81d5591fbb..4676acc904 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -85,13 +85,13 @@ object RemoteClient extends Logging { requestBuilder.setSourcePort(port) } RemoteProtocolBuilder.setMessage(message, requestBuilder) - remoteClient.send(requestBuilder.build, None) + remoteClient.send[Any](requestBuilder.build, None) } - override def postMessageToMailboxAndCreateFutureResultWithTimeout( + override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, - senderFuture: Option[CompletableFuture]): CompletableFuture = { + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(className) @@ -173,7 +173,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false - private val futures = new ConcurrentHashMap[Long, CompletableFuture] + private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] private val supervisors = new ConcurrentHashMap[String, Actor] private[remote] val listeners = new ConcurrentSkipListSet[Actor] @@ -217,14 +217,14 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } - def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) { + def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { if (request.getIsOneWay) { connection.getChannel.write(request) None } else { futures.synchronized { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture(request.getTimeout) + else new DefaultCompletableFuture[T](request.getTimeout) futures.put(request.getId, futureResult) connection.getChannel.write(request) Some(futureResult) @@ -253,7 +253,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { * @author Jonas Bonér */ class RemoteClientPipelineFactory(name: String, - futures: ConcurrentMap[Long, CompletableFuture], + futures: ConcurrentMap[Long, CompletableFuture[_]], supervisors: ConcurrentMap[String, Actor], bootstrap: ClientBootstrap, remoteAddress: SocketAddress, @@ -284,7 +284,7 @@ class RemoteClientPipelineFactory(name: String, */ @ChannelHandler.Sharable class RemoteClientHandler(val name: String, - val futures: ConcurrentMap[Long, CompletableFuture], + val futures: ConcurrentMap[Long, CompletableFuture[_]], val supervisors: ConcurrentMap[String, Actor], val bootstrap: ClientBootstrap, val remoteAddress: SocketAddress, @@ -306,7 +306,7 @@ class RemoteClientHandler(val name: String, if (result.isInstanceOf[RemoteReply]) { val reply = result.asInstanceOf[RemoteReply] log.debug("Remote client received RemoteReply[\n%s]", reply.toString) - val future = futures.get(reply.getId) + val future : CompletableFuture[Any] = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]] if (reply.getIsSuccessful) { val message = RemoteProtocolBuilder.getMessage(reply) future.completeWithResult(message) diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index 7b2084aec6..332ae5c14e 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -80,7 +80,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { timeout = TIME_OUT start - private var readerFuture: Option[CompletableFuture] = None + private var readerFuture: Option[CompletableFuture[T]] = None def receive = { case Get => val ref = dataFlow.value.get @@ -88,11 +88,11 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture reply(ref.get) else { readerFuture = replyTo match { - case Some(Right(future)) => Some(future) + case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]]) case _ => None } } - case Set(v) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) + case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) case Exit => exit } }