Initial parametrization

This commit is contained in:
Viktor Klang 2010-04-23 20:46:58 +02:00
parent f58503a1c3
commit cccfd51ed1
7 changed files with 44 additions and 44 deletions

View file

@ -162,7 +162,7 @@ trait Producer { self: Actor =>
*/ */
class ProducerResponseSender( class ProducerResponseSender(
headers: Map[String, Any], headers: Map[String, Any],
replyTo : Option[Either[Actor,CompletableFuture]], replyTo : Option[Either[Actor,CompletableFuture[Any]]],
producer: Actor) extends Synchronization with Logging { producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender implicit val producerActor = Some(producer) // the response sender

View file

@ -462,11 +462,11 @@ private[akka] sealed class ActiveObjectAspect {
} }
} }
private def getResultOrThrowException[T](future: Future): Option[T] = private def getResultOrThrowException[T](future: Future[T]): Option[T] =
if (future.exception.isDefined) { if (future.exception.isDefined) {
val (_, cause) = future.exception.get val (_, cause) = future.exception.get
throw cause throw cause
} else future.result.asInstanceOf[Option[T]] } else future.result
private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway) private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway)

View file

@ -257,7 +257,7 @@ trait Actor extends TransactionManagement with Logging {
* Is Some(Left(Actor)) if sender is an actor * Is Some(Left(Actor)) if sender is an actor
* Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result * Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/ */
protected var replyTo: Option[Either[Actor,CompletableFuture]] = None protected var replyTo: Option[Either[Actor,CompletableFuture[Any]]] = None
// ==================================== // ====================================
// ==== USER CALLBACKS TO OVERRIDE ==== // ==== USER CALLBACKS TO OVERRIDE ====
@ -502,9 +502,9 @@ trait Actor extends TransactionManagement with Logging {
def !![T](message: Any, timeout: Long): Option[T] = { def !![T](message: Any, timeout: Long): Option[T] = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) { if (_isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation] val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None) if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
try { try {
future.await future.await
} catch { } catch {
@ -514,7 +514,7 @@ trait Actor extends TransactionManagement with Logging {
} }
if (future.exception.isDefined) throw future.exception.get._2 if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]] else future.result
} }
else throw new IllegalStateException( else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it") "Actor has not been started, you need to invoke 'actor.start' before using it")
@ -539,10 +539,10 @@ trait Actor extends TransactionManagement with Logging {
/** /**
* FIXME document !!! * FIXME document !!!
*/ */
def !!!(message: Any): Future = { def !!![T](message: Any): Future[T] = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) { if (_isRunning) {
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
} else throw new IllegalStateException( } else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it") "Actor has not been started, you need to invoke 'actor.start' before using it")
} }
@ -569,7 +569,7 @@ trait Actor extends TransactionManagement with Logging {
*/ */
protected[this] def reply(message: Any) = replyTo match { protected[this] def reply(message: Any) = replyTo match {
case Some(Left(actor)) => actor ! message case Some(Left(actor)) => actor ! message
case Some(Right(future)) => future.completeWithResult(message) case Some(Right(future : Future[Any])) => future.completeWithResult(message)
case _ => throw new IllegalStateException( case _ => throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " + "\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " + "\n\tYou have probably used the '!' method to either; " +
@ -813,7 +813,7 @@ trait Actor extends TransactionManagement with Logging {
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get) RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get)
} }
RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
} else { } else {
val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get) val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get)
if (messageDispatcher.usesActorMailbox) { if (messageDispatcher.usesActorMailbox) {
@ -824,10 +824,10 @@ trait Actor extends TransactionManagement with Logging {
} }
} }
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
joinTransaction(message) joinTransaction(message)
if (_remoteAddress.isDefined) { if (_remoteAddress.isDefined) {
@ -847,8 +847,8 @@ trait Actor extends TransactionManagement with Logging {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else { } else {
val future = if (senderFuture.isDefined) senderFuture.get val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(timeout) else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get) val invocation = new MessageInvocation(this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
if (messageDispatcher.usesActorMailbox) if (messageDispatcher.usesActorMailbox)
_mailbox.add(invocation) _mailbox.add(invocation)

View file

@ -20,8 +20,8 @@ object Futures {
* } * }
* </pre> * </pre>
*/ */
def future(timeout: Long)(body: => Any): Future = { def future[T](timeout: Long)(body: => T): Future[T] = {
val promise = new DefaultCompletableFuture(timeout) val promise = new DefaultCompletableFuture[T](timeout)
try { try {
promise completeWithResult body promise completeWithResult body
} catch { } catch {
@ -30,10 +30,10 @@ object Futures {
promise promise
} }
def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await) def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[Future]): Future = { def awaitOne(futures: List[Future[_]]): Future[_] = {
var future: Option[Future] = None var future: Option[Future[_]] = None
do { do {
future = futures.find(_.isCompleted) future = futures.find(_.isCompleted)
} while (future.isEmpty) } while (future.isEmpty)
@ -41,12 +41,12 @@ object Futures {
} }
/* /*
def awaitEither(f1: Future, f2: Future): Option[Any] = { def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
import Actor.Sender.Self import Actor.Sender.Self
import Actor.{spawn, actor} import Actor.{spawn, actor}
case class Result(res: Option[Any]) case class Result(res: Option[T])
val handOff = new SynchronousQueue[Option[Any]] val handOff = new SynchronousQueue[Option[T]]
spawn { spawn {
try { try {
println("f1 await") println("f1 await")
@ -70,23 +70,23 @@ object Futures {
*/ */
} }
sealed trait Future { sealed trait Future[T] {
def await def await
def awaitBlocking def awaitBlocking
def isCompleted: Boolean def isCompleted: Boolean
def isExpired: Boolean def isExpired: Boolean
def timeoutInNanos: Long def timeoutInNanos: Long
def result: Option[Any] def result: Option[T]
def exception: Option[Tuple2[AnyRef, Throwable]] def exception: Option[Tuple2[AnyRef, Throwable]]
} }
trait CompletableFuture extends Future { trait CompletableFuture[T] extends Future[T] {
def completeWithResult(result: Any) def completeWithResult(result: T)
def completeWithException(toBlame: AnyRef, exception: Throwable) def completeWithException(toBlame: AnyRef, exception: Throwable)
} }
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture(timeout: Long) extends CompletableFuture { class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private val TIME_UNIT = TimeUnit.MILLISECONDS private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0) def this() = this(0)
@ -95,7 +95,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val _lock = new ReentrantLock private val _lock = new ReentrantLock
private val _signal = _lock.newCondition private val _signal = _lock.newCondition
private var _completed: Boolean = _ private var _completed: Boolean = _
private var _result: Option[Any] = None private var _result: Option[T] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
def await = try { def await = try {
@ -138,7 +138,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
_lock.unlock _lock.unlock
} }
def result: Option[Any] = try { def result: Option[T] = try {
_lock.lock _lock.lock
_result _result
} finally { } finally {
@ -152,7 +152,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
_lock.unlock _lock.unlock
} }
def completeWithResult(result: Any) = try { def completeWithResult(result: T) = try {
_lock.lock _lock.lock
if (!_completed) { if (!_completed) {
_completed = true _completed = true

View file

@ -15,7 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: Actor, final class MessageInvocation(val receiver: Actor,
val message: Any, val message: Any,
val replyTo : Option[Either[Actor,CompletableFuture]], val replyTo : Option[Either[Actor,CompletableFuture[Any]]],
val transactionSet: Option[CountDownCommitBarrier]) { val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null") if (receiver eq null) throw new IllegalArgumentException("receiver is null")

View file

@ -85,13 +85,13 @@ object RemoteClient extends Logging {
requestBuilder.setSourcePort(port) requestBuilder.setSourcePort(port)
} }
RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send(requestBuilder.build, None) remoteClient.send[Any](requestBuilder.build, None)
} }
override def postMessageToMailboxAndCreateFutureResultWithTimeout( override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequest.newBuilder val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId) .setId(RemoteRequestIdFactory.nextId)
.setTarget(className) .setTarget(className)
@ -173,7 +173,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false @volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFuture] private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, Actor] private val supervisors = new ConcurrentHashMap[String, Actor]
private[remote] val listeners = new ConcurrentSkipListSet[Actor] private[remote] val listeners = new ConcurrentSkipListSet[Actor]
@ -217,14 +217,14 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
} }
} }
def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) { def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
if (request.getIsOneWay) { if (request.getIsOneWay) {
connection.getChannel.write(request) connection.getChannel.write(request)
None None
} else { } else {
futures.synchronized { futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(request.getTimeout) else new DefaultCompletableFuture[T](request.getTimeout)
futures.put(request.getId, futureResult) futures.put(request.getId, futureResult)
connection.getChannel.write(request) connection.getChannel.write(request)
Some(futureResult) Some(futureResult)
@ -253,7 +253,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class RemoteClientPipelineFactory(name: String, class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFuture], futures: ConcurrentMap[Long, CompletableFuture[_]],
supervisors: ConcurrentMap[String, Actor], supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap, bootstrap: ClientBootstrap,
remoteAddress: SocketAddress, remoteAddress: SocketAddress,
@ -284,7 +284,7 @@ class RemoteClientPipelineFactory(name: String,
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
class RemoteClientHandler(val name: String, class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFuture], val futures: ConcurrentMap[Long, CompletableFuture[_]],
val supervisors: ConcurrentMap[String, Actor], val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap, val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress, val remoteAddress: SocketAddress,
@ -306,7 +306,7 @@ class RemoteClientHandler(val name: String,
if (result.isInstanceOf[RemoteReply]) { if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply] val reply = result.asInstanceOf[RemoteReply]
log.debug("Remote client received RemoteReply[\n%s]", reply.toString) log.debug("Remote client received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId) val future : CompletableFuture[Any] = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) { if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply) val message = RemoteProtocolBuilder.getMessage(reply)
future.completeWithResult(message) future.completeWithResult(message)

View file

@ -80,7 +80,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT timeout = TIME_OUT
start start
private var readerFuture: Option[CompletableFuture] = None private var readerFuture: Option[CompletableFuture[T]] = None
def receive = { def receive = {
case Get => case Get =>
val ref = dataFlow.value.get val ref = dataFlow.value.get
@ -88,11 +88,11 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
reply(ref.get) reply(ref.get)
else { else {
readerFuture = replyTo match { readerFuture = replyTo match {
case Some(Right(future)) => Some(future) case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
case _ => None case _ => None
} }
} }
case Set(v) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Exit => exit case Exit => exit
} }
} }