diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index 46297d32d0..dc74028964 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -413,8 +413,14 @@ trait Actor extends Logging { /** * Changes tha Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack */ - def become(behavior: Receive): Unit = self.hotswap = self.hotswap.push(behavior) + def become(behavior: Receive, discardOld: Boolean = false) { + if (discardOld) + unbecome + + self.hotswap = self.hotswap.push(behavior) + } /** * Reverts the Actor behavior to the previous one in the hotswap stack. diff --git a/akka-actor/src/main/scala/actor/UntypedActor.scala b/akka-actor/src/main/scala/actor/UntypedActor.scala index e36a7837b6..b31077c8f3 100644 --- a/akka-actor/src/main/scala/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/actor/UntypedActor.scala @@ -72,7 +72,13 @@ abstract class UntypedActor extends Actor { /** * Java API for become */ - def become(behavior: Procedure[Any]): Unit = super.become { case msg => behavior.apply(msg) } + def become(behavior: Procedure[Any]):Unit = become(behavior,false) + + /* + * Java API for become with optional discardOld + */ + def become(behavior: Procedure[Any], discardOld: Boolean): Unit = + super.become({ case msg => behavior.apply(msg) }, discardOld) @throws(classOf[Exception]) def onReceive(message: Any): Unit diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index cb7f8fab94..4d33bf03ce 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -48,6 +48,9 @@ import java.util.concurrent.TimeUnit */ object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). + map(time => Duration(time, TIME_UNIT)). + getOrElse(Duration(1000,TimeUnit.MILLISECONDS)) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT) diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 68a3ce4399..2ac412d36d 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2010 Scalable Solutions AB + * Copyright (C) 2009-2010 Scalable Solutions AB */ package akka.dispatch @@ -10,6 +10,7 @@ import akka.routing.Dispatcher import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit +import akka.japi.Procedure class FutureTimeoutException(message: String) extends AkkaException(message) @@ -34,19 +35,24 @@ object Futures { f } + /** + * (Blocking!) + */ def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) /** - * Returns the First Future that is completed - * if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans + * Returns the First Future that is completed (blocking!) */ - def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = { - var future: Option[Future[_]] = None - do { - future = futures.find(_.isCompleted) - if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs) - } while (future.isEmpty) - future.get + def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures).await + + /** + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = { + val futureResult = new DefaultCompletableFuture[Any](timeout) + val fun = (f: Future[_]) => futureResult completeWith f.asInstanceOf[Future[Any]] + for(f <- futures) f onComplete fun + futureResult } /** @@ -55,34 +61,10 @@ object Futures { def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = in map { f => fun(f.await) } - /* - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = { - import Actor.Sender.Self - import Actor.{spawn, actor} - - case class Result(res: Option[T]) - val handOff = new SynchronousQueue[Option[T]] - spawn { - try { - println("f1 await") - f1.await - println("f1 offer") - handOff.offer(f1.result) - } catch {case _ => {}} - } - spawn { - try { - println("f2 await") - f2.await - println("f2 offer") - println("f2 offer: " + f2.result) - handOff.offer(f2.result) - } catch {case _ => {}} - } - Thread.sleep(100) - handOff.take - } -*/ + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException } sealed trait Future[T] { @@ -100,6 +82,24 @@ sealed trait Future[T] { def exception: Option[Throwable] + def onComplete(func: Future[T] => Unit): Future[T] + + /** + * Returns the current result, throws the exception is one has been raised, else returns None + */ + def resultOrException: Option[T] = { + val r = result + if (r.isDefined) result + else { + val problem = exception + if (problem.isDefined) throw problem.get + else None + } + } + + /* Java API */ + def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(f => proc(f)) + def map[O](f: (T) => O): Future[O] = { val wrapped = this new Future[O] { @@ -110,14 +110,21 @@ sealed trait Future[T] { def timeoutInNanos = wrapped.timeoutInNanos def result: Option[O] = { wrapped.result map f } def exception: Option[Throwable] = wrapped.exception + def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this } } } } trait CompletableFuture[T] extends Future[T] { def completeWithResult(result: T) - def completeWithException(exception: Throwable) + def completeWith(other: Future[T]) { + val result = other.result + val exception = other.exception + if (result.isDefined) completeWithResult(result.get) + else if (exception.isDefined) completeWithException(exception.get) + //else TODO how to handle this case? + } } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. @@ -133,6 +140,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private var _completed: Boolean = _ private var _result: Option[T] = None private var _exception: Option[Throwable] = None + private var _listeners: List[Future[T] => Unit] = Nil def await = try { _lock.lock @@ -190,33 +198,67 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def completeWithResult(result: T) = try { - _lock.lock - if (!_completed) { - _completed = true - _result = Some(result) - onComplete(result) + def completeWithResult(result: T) { + val notify = try { + _lock.lock + if (!_completed) { + _completed = true + _result = Some(result) + true + } else false + } finally { + _signal.signalAll + _lock.unlock } - } finally { - _signal.signalAll - _lock.unlock + + if (notify) + notifyListeners } - def completeWithException(exception: Throwable) = try { - _lock.lock - if (!_completed) { - _completed = true - _exception = Some(exception) - onCompleteException(exception) + def completeWithException(exception: Throwable) { + val notify = try { + _lock.lock + if (!_completed) { + _completed = true + _exception = Some(exception) + true + } else false + } finally { + _signal.signalAll + _lock.unlock } - } finally { - _signal.signalAll - _lock.unlock + + if (notify) + notifyListeners } - protected def onComplete(result: T) {} + def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { + val notifyNow = try { + _lock.lock + if (!_completed) { + _listeners ::= func + false + } + else + true + } finally { + _lock.unlock + } - protected def onCompleteException(exception: Throwable) {} + if (notifyNow) + notifyListener(func) + + this + } + + private def notifyListeners() { + for(l <- _listeners) + notifyListener(l) + } + + private def notifyListener(func: Future[T] => Unit) { + func(this) + } private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index f7eb0ca170..33a7a62af3 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -145,7 +145,11 @@ trait MessageDispatcher extends MailboxFactory with Logging { } } - private[akka] def timeoutMs: Long = 1000 + /** + * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms + * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second + */ + private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference diff --git a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala index 6cd57c15fe..df088ce89c 100644 --- a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala @@ -192,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite { a.start assertDispatcher(dispatcher)(starts = 1, stops = 0) a.stop - await(dispatcher.stops.get == 1)(withinMs = 10000) + await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 1, stops = 1) assertRef(a,dispatcher)( suspensions = 0, diff --git a/akka-actor/src/test/scala/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/dispatch/FutureSpec.scala index d8b03bb0da..2b456a7cd3 100644 --- a/akka-actor/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/dispatch/FutureSpec.scala @@ -5,6 +5,7 @@ import org.junit.Test import akka.dispatch.Futures import Actor._ import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.CountDownLatch object FutureSpec { class TestActor extends Actor { @@ -53,7 +54,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - /* // FIXME: implement Futures.awaitEither, and uncomment these two tests @Test def shouldFutureAwaitEitherLeft = { val actor1 = actorOf[TestActor].start @@ -78,7 +78,7 @@ class FutureSpec extends JUnitSuite { actor1.stop actor2.stop } - */ + @Test def shouldFutureAwaitOneLeft = { val actor1 = actorOf[TestActor].start val actor2 = actorOf[TestActor].start diff --git a/akka-amqp/src/main/scala/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/akka/amqp/AMQP.scala index 392ccc8d33..ec029bc1cd 100644 --- a/akka-amqp/src/main/scala/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/akka/amqp/AMQP.scala @@ -13,6 +13,7 @@ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.{String, IllegalArgumentException} import reflect.Manifest import akka.japi.Procedure +import akka.dispatch.Dispatchers /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. @@ -23,6 +24,8 @@ import akka.japi.Procedure */ object AMQP { + lazy val consumerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("amqp-consumers").build + lazy val producerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("amqp-producers").build /** * Parameters used to make the connection to the amqp broker. Uses the rabbitmq defaults. */ @@ -57,7 +60,8 @@ object AMQP { */ case class ChannelParameters( shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) { + channelCallback: Option[ActorRef] = None, + prefetchSize: Int = 0) { // Needed for Java API usage def this() = this (None, None) @@ -148,7 +152,7 @@ object AMQP { routingKey: String, deliveryHandler: ActorRef, queueName: Option[String] = None, - exchangeParameters: Option[ExchangeParameters], + exchangeParameters: Option[ExchangeParameters] = None, queueDeclaration: Declaration = ActiveDeclaration(), selfAcknowledging: Boolean = true, channelParameters: Option[ChannelParameters] = None) { @@ -232,6 +236,7 @@ object AMQP { def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = { val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters)) + producer.dispatcher = producerDispatcher connection.startLink(producer) producer ! Start producer @@ -239,7 +244,9 @@ object AMQP { def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = { val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters)) + consumer.dispatcher = consumerDispatcher val handler = consumerParameters.deliveryHandler + if (handler.isUnstarted) handler.dispatcher = consumerDispatcher if (handler.supervisor.isEmpty) consumer.startLink(handler) connection.startLink(consumer) consumer ! Start @@ -342,7 +349,7 @@ object AMQP { } val deliveryHandler = actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) } + def receive = { case Delivery(payload, _, _, _, _, _) => handler.apply(new String(payload)) } } ).start val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) @@ -432,7 +439,7 @@ object AMQP { } val deliveryHandler = actorOf(new Actor { - def receive = { case Delivery(payload, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) } + def receive = { case Delivery(payload, _, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) } }).start val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) diff --git a/akka-amqp/src/main/scala/akka/amqp/AMQPMessage.scala b/akka-amqp/src/main/scala/akka/amqp/AMQPMessage.scala index 88bcea866d..af9fb2cbb6 100644 --- a/akka-amqp/src/main/scala/akka/amqp/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/akka/amqp/AMQPMessage.scala @@ -38,6 +38,7 @@ case class Delivery( payload: Array[Byte], routingKey: String, deliveryTag: Long, + isRedeliver: Boolean, properties: BasicProperties, sender: Option[ActorRef]) extends AMQPMessage diff --git a/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala b/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala index c14b1a6981..b339cf4727 100644 --- a/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala @@ -10,7 +10,7 @@ import akka.util.Logging import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer} -import akka.amqp.AMQP.{NoActionDeclaration, ActiveDeclaration, PassiveDeclaration, ConsumerParameters} +import akka.amqp.AMQP._ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) extends FaultTolerantChannelActor( @@ -30,38 +30,38 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) protected def setupChannel(ch: Channel) = { - val queueDeclare: com.rabbitmq.client.AMQP.Queue.DeclareOk = { - queueName match { - case Some(name) => - queueDeclaration match { - case PassiveDeclaration => - log.debug("Passively declaring new queue [%s] for %s", name, toString) - ch.queueDeclarePassive(name) - case ActiveDeclaration(durable, autoDelete, exclusive) => - log.debug("Actively declaring new queue [%s] for %s", name, toString) - val configurationArguments = exchangeParameters match { - case Some(params) => params.configurationArguments - case _ => Map.empty - } - ch.queueDeclare(name, durable, exclusive, autoDelete, JavaConversions.asMap(configurationArguments.toMap)) - case NoActionDeclaration => new com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk(name, 0, 0) // do nothing here + channelParameters.foreach(params => ch.basicQos(params.prefetchSize)) + + val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName)) + val consumingQueue = exchangeName match { + case Some(exchange) => + val queueDeclare: com.rabbitmq.client.AMQP.Queue.DeclareOk = { + queueName match { + case Some(name) => + declareQueue(ch, name, queueDeclaration) + case None => + log.debug("Declaring new generated queue for %s", toString) + ch.queueDeclare } - case None => - log.debug("Declaring new generated queue for %s", toString) - ch.queueDeclare - } + } + log.debug("Binding new queue [%s] with [%s] for %s", queueDeclare.getQueue, routingKey, toString) + ch.queueBind(queueDeclare.getQueue, exchange, routingKey) + queueDeclare.getQueue + case None => + // no exchange, use routing key as queuename + log.debug("No exchange specified, creating queue using routingkey as name (%s)", routingKey) + declareQueue(ch, routingKey, queueDeclaration) + routingKey } - val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName)) - log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString) - ch.queueBind(queueDeclare.getQueue, exchangeName.getOrElse(""), routingKey) - val tag = ch.basicConsume(queueDeclare.getQueue, false, new DefaultConsumer(ch) with Logging { + val tag = ch.basicConsume(consumingQueue, false, new DefaultConsumer(ch) with Logging { override def handleDelivery(tag: String, envelope: Envelope, properties: BasicProperties, payload: Array[Byte]) { try { val deliveryTag = envelope.getDeliveryTag log.debug("Passing a message on to %s", toString) - deliveryHandler ! Delivery(payload, envelope.getRoutingKey, envelope.getDeliveryTag, properties, someSelf) + import envelope._ + deliveryHandler ! Delivery(payload, getRoutingKey, getDeliveryTag, isRedeliver, properties, someSelf) if (selfAcknowledging) { log.debug("Self acking...") @@ -78,6 +78,22 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) log.info("Intitialized %s", toString) } + private def declareQueue(ch: Channel, queueName: String, queueDeclaration: Declaration): com.rabbitmq.client.AMQP.Queue.DeclareOk = { + queueDeclaration match { + case PassiveDeclaration => + log.debug("Passively declaring new queue [%s] for %s", queueName, toString) + ch.queueDeclarePassive(queueName) + case ActiveDeclaration(durable, autoDelete, exclusive) => + log.debug("Actively declaring new queue [%s] for %s", queueName, toString) + val configurationArguments = exchangeParameters match { + case Some(params) => params.configurationArguments + case _ => Map.empty + } + ch.queueDeclare(queueName, durable, exclusive, autoDelete, JavaConversions.asMap(configurationArguments.toMap)) + case NoActionDeclaration => new com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk(queueName, 0, 0) // do nothing here + } + } + private def acknowledgeDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = { log.debug("Acking message with delivery tag [%s]", deliveryTag) channel.foreach { diff --git a/akka-amqp/src/main/scala/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/akka/amqp/ExampleSession.scala index 8ca3961177..0e14f1b3bb 100644 --- a/akka-amqp/src/main/scala/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/akka/amqp/ExampleSession.scala @@ -70,7 +70,7 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct) val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf(new Actor { def receive = { - case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + case Delivery(payload, _, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) }}), None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) @@ -85,11 +85,11 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf(new Actor { def receive = { - case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + case Delivery(payload, _, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) }}), None, Some(exchangeParameters))) val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf(new Actor { def receive = { - case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) + case Delivery(payload, _, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) }}), None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) @@ -104,11 +104,11 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf(new Actor { def receive = { - case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + case Delivery(payload, _, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) }}), None, Some(exchangeParameters))) val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf(new Actor { def receive = { - case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) + case Delivery(payload, _, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) }}), None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) @@ -200,8 +200,8 @@ object ExampleSession { def requestHandler(request: String) = 3 - val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer, - requestHandler _, queueName = Some("rpc.in.key.queue")) + val rpcServer = RPC.newRpcServer(connection, exchangeName, rpcServerSerializer, requestHandler _, + routingKey = Some("rpc.in.key"), queueName = Some("rpc.in.key.queue")) /** Client */ @@ -213,9 +213,9 @@ object ExampleSession { } val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeName, "rpc.in.key", rpcClientSerializer) + val rpcClient = RPC.newRpcClient(connection, exchangeName, rpcClientSerializer, Some("rpc.in.key")) - val response = (rpcClient !! "rpc_request") + val response = rpcClient.call("rpc_request") log.info("Response: " + response) } diff --git a/akka-amqp/src/main/scala/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/akka/amqp/rpc/RPC.scala index 4213c77f01..8ce746735a 100644 --- a/akka-amqp/src/main/scala/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/akka/amqp/rpc/RPC.scala @@ -14,8 +14,8 @@ object RPC { def newRpcClient[O, I](connection: ActorRef, exchangeName: String, routingKey: String, - serializer: RpcClientSerializer[O, I]): ActorRef = { - newRpcClient(connection, exchangeName, routingKey, serializer, None) + serializer: RpcClientSerializer[O, I]): RpcClient[O,I] = { + newRpcClient(connection, exchangeName, serializer, Some(routingKey), None) } // Needed for Java API usage @@ -23,81 +23,93 @@ object RPC { exchangeName: String, routingKey: String, serializer: RpcClientSerializer[O, I], - channelParameters: ChannelParameters): ActorRef = { - newRpcClient(connection, exchangeName, routingKey, serializer, Some(channelParameters)) + channelParameters: ChannelParameters): RpcClient[O,I] = { + newRpcClient(connection, exchangeName, serializer, Some(routingKey), Some(channelParameters)) } def newRpcClient[O, I](connection: ActorRef, exchangeName: String, - routingKey: String, serializer: RpcClientSerializer[O, I], - channelParameters: Option[ChannelParameters] = None): ActorRef = { + routingKey: Option[String] = None, + channelParameters: Option[ChannelParameters] = None): RpcClient[O,I] = { + + val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) + val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( - ExchangeParameters(exchangeName, exchangeDeclaration = PassiveDeclaration), routingKey, serializer, channelParameters)) + ExchangeParameters(exchangeName, exchangeDeclaration = PassiveDeclaration), rKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor + new RpcClient(rpcActor) } // Needed for Java API usage def newRpcServer[I, O](connection: ActorRef, exchangeName: String, - routingKey: String, serializer: RpcServerSerializer[I, O], - requestHandler: japi.Function[I,O]): RpcServerHandle = { - newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _) + requestHandler: japi.Function[I,O], + routingKey: String): RpcServerHandle = { + newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey)) } // Needed for Java API usage def newRpcServer[I, O](connection: ActorRef, exchangeName: String, - routingKey: String, serializer: RpcServerSerializer[I, O], requestHandler: Function[I,O], + routingKey: String, queueName: String): RpcServerHandle = { - newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName)) + newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey), Some(queueName)) } // Needed for Java API usage def newRpcServer[I, O](connection: ActorRef, exchangeName: String, - routingKey: String, serializer: RpcServerSerializer[I, O], requestHandler: japi.Function[I,O], + routingKey: String, channelParameters: ChannelParameters): RpcServerHandle = { - newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters)) + newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey), None, Some(channelParameters)) } // Needed for Java API usage def newRpcServer[I, O](connection: ActorRef, exchangeName: String, - routingKey: String, serializer: RpcServerSerializer[I, O], requestHandler: japi.Function[I,O], + routingKey: String, queueName: String, channelParameters: ChannelParameters): RpcServerHandle = { - newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters)) + newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey), Some(queueName), Some(channelParameters)) } def newRpcServer[I, O](connection: ActorRef, exchangeName: String, - routingKey: String, serializer: RpcServerSerializer[I, O], requestHandler: I => O, + routingKey: Option[String] = None, queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None): RpcServerHandle = { + channelParameters: Option[ChannelParameters] = None, + poolSize: Int = 1): RpcServerHandle = { + + val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) + val qName = queueName.getOrElse("%s.in".format(rKey)) val producer = newProducer(connection, ProducerParameters(channelParameters = channelParameters)) - val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, ConsumerParameters(routingKey, rpcServer, - exchangeParameters = Some(ExchangeParameters(exchangeName)), channelParameters = channelParameters, - selfAcknowledging = false, queueName = queueName)) - RpcServerHandle(producer, consumer) + + val consumers = (1 to poolSize).map { + num => + val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) + newConsumer(connection, ConsumerParameters(rKey, rpcServer, + exchangeParameters = Some(ExchangeParameters(exchangeName)), channelParameters = channelParameters, + selfAcknowledging = false, queueName = Some(qName))) + } + RpcServerHandle(producer, consumers) } - case class RpcServerHandle(producer: ActorRef, consumer: ActorRef) { + case class RpcServerHandle(producer: ActorRef, consumers: Seq[ActorRef]) { def stop = { - consumer.stop + consumers.foreach(_.stop) producer.stop } } @@ -195,7 +207,7 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer) + newRpcServer(connection, exchangeName, serializer, requestHandler, routingKey, queueName) } // Needed for Java API usage @@ -234,7 +246,7 @@ object RPC { } }) - startClient(connection, exchangeName, routingKey, serializer) + newRpcClient(connection, exchangeName, serializer, routingKey) } // Needed for Java API usage @@ -276,7 +288,7 @@ object RPC { def toBinary(t: String) = t.getBytes }) - startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer) + newRpcServer(connection, exchangeName, serializer, requestHandler, routingKey, queueName) } // Needed for Java API usage @@ -306,31 +318,7 @@ object RPC { } }) - startClient(connection, exchange, routingKey, serializer) - } - - private def startClient[O, I](connection: ActorRef, - exchangeName: String, - routingKey: Option[String] = None, - serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = { - - val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) - - val client = newRpcClient(connection, exchangeName, rKey, serializer) - new RpcClient(client) - } - - private def startServer[I, O](connection: ActorRef, - exchangeName: String, - requestHandler: I => O, - routingKey: Option[String] = None, - queueName: Option[String] = None, - serializer: RpcServerSerializer[I, O]): RpcServerHandle = { - - val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) - val qName = queueName.getOrElse("%s.in".format(rKey)) - - newRpcServer(connection, exchangeName, rKey, serializer, requestHandler, Some(qName)) + newRpcClient(connection, exchange, serializer, routingKey) } } diff --git a/akka-amqp/src/main/scala/akka/amqp/rpc/RpcServerActor.scala b/akka-amqp/src/main/scala/akka/amqp/rpc/RpcServerActor.scala index 40443c63ad..2459cde290 100644 --- a/akka-amqp/src/main/scala/akka/amqp/rpc/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/akka/amqp/rpc/RpcServerActor.scala @@ -16,7 +16,7 @@ class RpcServerActor[I,O]( log.info("%s started", this) protected def receive = { - case Delivery(payload, _, tag, props, sender) => { + case Delivery(payload, _, tag, _, props, sender) => { log.debug("%s handling delivery with tag %d", this, tag) val request = serializer.fromBinary.fromBinary(payload) diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala index 72c7590f86..eace56114f 100644 --- a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala @@ -45,8 +45,8 @@ class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMat val consumerExchangeParameters = ExchangeParameters("text_exchange") val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open } - }).start, + def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open } + }), exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala index 1752afe400..52769db007 100644 --- a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala @@ -65,9 +65,9 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must val consumerExchangeParameters = ExchangeParameters("text_exchange") val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open } - }).start, - exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) + def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open } + }), exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) + consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) val listenerLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala index 25435d24e5..3f3bf0539b 100644 --- a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala @@ -36,7 +36,7 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM var deliveryTagCheck: Long = -1 val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actorOf( new Actor { def receive = { - case Delivery(payload, _, deliveryTag, _, sender) => { + case Delivery(payload, _, deliveryTag, _, _, sender) => { if (!failLatch.isOpen) { failLatch.open error("Make it fail!") @@ -47,7 +47,7 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM } case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open } - }).start, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters), + }), queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters), selfAcknowledging = false, channelParameters = Some(channelParameters), queueDeclaration = ActiveDeclaration(autoDelete = false))) diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala index 69e5aa8aa9..4ba4c27971 100644 --- a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala @@ -35,10 +35,10 @@ class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatche val rejectedLatch = new StandardLatch val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actorOf( new Actor { def receive = { - case Delivery(payload, _, deliveryTag, _, sender) => sender.foreach(_ ! Reject(deliveryTag)) + case Delivery(payload, _, deliveryTag, _, _, sender) => sender.foreach(_ ! Reject(deliveryTag)) case Rejected(deliveryTag) => rejectedLatch.open } - }).start, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters), + }), queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters), selfAcknowledging = false, channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala index 820edf8624..0a9613d21f 100644 --- a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala @@ -33,8 +33,8 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers { val payloadLatch = new StandardLatch val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf(new Actor { - def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open } - }).start, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters))) + def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open } + }), exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters))) diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerPrivateQueueTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerPrivateQueueTestIntegration.scala new file mode 100644 index 0000000000..6b03b6ded8 --- /dev/null +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPConsumerPrivateQueueTestIntegration.scala @@ -0,0 +1,45 @@ +package akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +import akka.amqp._ +import org.multiverse.api.latches.StandardLatch +import akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.{CountDownLatch, TimeUnit} +import akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import akka.actor.Actor + +class AMQPConsumerPrivateQueueTestIntegration extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = AMQPTest.withCleanEndState { + val connection = AMQP.newConnection() + val countDown = new CountDownLatch(2) + val channelCallback = actorOf(new Actor { + def receive = { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + }).start + + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + + val payloadLatch = new StandardLatch + val consumer = AMQP.newConsumer(connection, ConsumerParameters("my.private.routing.key", actorOf(new Actor { + def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open } + }), channelParameters = Some(channelParameters))) + + val producer = AMQP.newProducer(connection, + ProducerParameters(channelParameters = Some(channelParameters))) + + countDown.await(2, TimeUnit.SECONDS) must be (true) + producer ! Message("some_payload".getBytes, "my.private.routing.key") + payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala b/akka-amqp/src/test/scala/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala index c8cea4bf7c..8ada12c423 100644 --- a/akka-amqp/src/test/scala/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala +++ b/akka-amqp/src/test/scala/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala @@ -44,8 +44,8 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers { def requestHandler(request: String) = 3 - val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, "rpc.routing", rpcServerSerializer, - requestHandler _, channelParameters = Some(channelParameters)) + val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, rpcServerSerializer, + requestHandler _, Some("rpc.routing"), channelParameters = Some(channelParameters)) val rpcClientSerializer = new RpcClientSerializer[String, Int]( new ToBinary[String] { @@ -54,11 +54,11 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers { def fromBinary(bytes: Array[Byte]) = bytes.head.toInt }) - val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeName, "rpc.routing", rpcClientSerializer, + val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeName, rpcClientSerializer, Some("rpc.routing"), channelParameters = Some(channelParameters)) countDown.await(2, TimeUnit.SECONDS) must be(true) - val response = rpcClient !! "some_payload" + val response = rpcClient.call("some_payload") response must be(Some(3)) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala index 66e08f2c98..0a58592045 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala @@ -17,12 +17,12 @@ import Actor._ * @author Debasish Ghosh */ -case class Add(email: String, value: String) +case class AddEmail(email: String, value: String) case class GetAll(email: String) class MySortedSet extends Transactor { def receive = { - case Add(userEmail, value) => { + case AddEmail(userEmail, value) => { val registryId = "userValues:%s".format(userEmail) val storageSet = RedisStorage.getSortedSet(registryId) storageSet.add(value.getBytes, System.nanoTime.toFloat) @@ -58,8 +58,9 @@ class RedisTicket513Spec extends val a = actorOf[MySortedSet] a.start it("should work with transactors") { - (a !! Add("test.user@gmail.com", "foo")).get should equal(1) - (a !! Add("test.user@gmail.com", "bar")).get should equal(2) + (a !! AddEmail("test.user@gmail.com", "foo")).get should equal(1) + Thread.sleep(10) + (a !! AddEmail("test.user@gmail.com", "bar")).get should equal(2) (a !! GetAll("test.user@gmail.com")).get.asInstanceOf[List[_]].size should equal(2) } } diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index a1b01a078f..e80c7936c2 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -310,13 +310,11 @@ class RemoteClient private[akka] ( connection.getChannel.write(request) None } else { - futures.synchronized { val futureResult = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) connection.getChannel.write(request) Some(futureResult) - } } } else { val exception = new RemoteClientException( diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c85f2913e0..bc12477970 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -16,7 +16,6 @@ import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.config.Config._ import akka.config.ConfigurationException -import akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ @@ -31,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.collection.mutable.Map import scala.reflect.BeanProperty +import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture} /** * Use this object if you need a single remote server on a specific node. @@ -66,10 +66,10 @@ object RemoteNode extends RemoteServer * @author Jonas Bonér */ object RemoteServer { - val UUID_PREFIX = "uuid:" - - val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") - val REQUIRE_COOKIE = { + val UUID_PREFIX = "uuid:" + val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) + val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") + val REQUIRE_COOKIE = { val requireCookie = config.getBool("akka.remote.server.require-cookie", true) if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException( "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") @@ -400,7 +400,7 @@ class RemoteServerPipelineFactory( } val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() - val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder @@ -497,7 +497,7 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { - log.debug("Received RemoteMessageProtocol[\n%s]", request.toString) + log.debug("Received RemoteMessageProtocol[\n%s]".format(request.toString)) request.getActorInfo.getActorType match { case SCALA_ACTOR => dispatchToActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel) @@ -538,41 +538,46 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ - override def onComplete(result: AnyRef) { - log.debug("Returning result from actor invocation [%s]", result) - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - Some(actorRef), - Right(request.getUuid), - actorInfo.getId, - actorInfo.getTarget, - actorInfo.getTimeout, - Left(result), - true, - Some(actorRef), - None, - AkkaActorType.ScalaActor, - None) + Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). + onComplete(f => { + val result = f.result + val exception = f.exception - // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + if (exception.isDefined) { + log.debug("Returning exception from actor invocation [%s]".format(exception.get)) + try { + channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + } + } + else if (result.isDefined) { + log.debug("Returning result from actor invocation [%s]".format(result.get)) + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + Some(actorRef), + Right(request.getUuid), + actorInfo.getId, + actorInfo.getTarget, + actorInfo.getTimeout, + Left(result.get), + true, + Some(actorRef), + None, + AkkaActorType.ScalaActor, + None) - try { - channel.write(messageBuilder.build) - } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + + try { + channel.write(messageBuilder.build) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + } } } - - override def onCompleteException(exception: Throwable) { - try { - channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor)) - } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) - } - } - } - )) + ) + )) } } @@ -589,7 +594,10 @@ class RemoteServerHandler( val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) else { - val result = messageReceiver.invoke(typedActor, args: _*) + val result = messageReceiver.invoke(typedActor, args: _*) match { + case f: Future[_] => f.await.result.get + case other => other + } log.debug("Returning result from remote typed actor invocation [%s]", result) val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( diff --git a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala new file mode 100644 index 0000000000..8457f10f45 --- /dev/null +++ b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package akka.actor.ticket + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import akka.remote.{RemoteClient, RemoteServer} +import akka.actor._ + + +class Ticket519Spec extends Spec with ShouldMatchers { + + val HOSTNAME = "localhost" + val PORT = 6666 + + describe("A remote TypedActor") { + it("should handle remote future replies") { + import akka.remote._ + + val server = { val s = new RemoteServer; s.start(HOSTNAME,PORT); s} + val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,HOSTNAME,PORT) + val r = actor.someFutureString + + r.await.result.get should equal ("foo") + TypedActor.stop(actor) + server.shutdown + } + } +} diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 98d9bcdea3..3ad5468369 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -181,7 +181,6 @@ abstract class TypedActor extends Actor with Proxyable { if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed else self.reply(joinPoint.proceed) - case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) case unexpected => throw new IllegalActorStateException( @@ -851,6 +850,7 @@ private[akka] abstract class ActorAspect { ActorType.TypedActor) if (isOneWay) null // for void methods + else if (TypedActor.returnsFuture_?(methodRtti)) future.get else { if (future.isDefined) { future.get.await diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java index 743a189bf6..5b9de51d44 100644 --- a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java +++ b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java @@ -1,8 +1,9 @@ package akka.actor; -import java.util.concurrent.CountDownLatch; +import akka.dispatch.Future; public interface SamplePojo { public String greet(String s); public String fail(); + public Future someFutureString(); } diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java index 093904e5e1..b61fc2f366 100644 --- a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java +++ b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java @@ -25,6 +25,10 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo { throw new RuntimeException("expected"); } + public akka.dispatch.Future someFutureString() { + return future("foo"); + } + @Override public void preRestart(Throwable e) { _pre = true; diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 221c4213cc..17a29b0ff9 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -26,6 +26,7 @@ akka { serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline + dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down default-dispatcher { type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable @@ -135,6 +136,7 @@ akka { service = on hostname = "localhost" # The hostname or IP that clients should connect to port = 2552 # The port clients should connect to + message-frame-size = 1048576 connection-timeout = 1 require-cookie = on untrusted-mode = off