closing ticket 518

This commit is contained in:
ticktock 2010-11-12 10:13:56 -05:00
commit b786d35d2a
28 changed files with 381 additions and 219 deletions

View file

@ -413,8 +413,14 @@ trait Actor extends Logging {
/**
* Changes tha Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
def become(behavior: Receive): Unit = self.hotswap = self.hotswap.push(behavior)
def become(behavior: Receive, discardOld: Boolean = false) {
if (discardOld)
unbecome
self.hotswap = self.hotswap.push(behavior)
}
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.

View file

@ -72,7 +72,13 @@ abstract class UntypedActor extends Actor {
/**
* Java API for become
*/
def become(behavior: Procedure[Any]): Unit = super.become { case msg => behavior.apply(msg) }
def become(behavior: Procedure[Any]):Unit = become(behavior,false)
/*
* Java API for become with optional discardOld
*/
def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
super.become({ case msg => behavior.apply(msg) }, discardOld)
@throws(classOf[Exception])
def onReceive(message: Any): Unit

View file

@ -48,6 +48,9 @@ import java.util.concurrent.TimeUnit
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
map(time => Duration(time, TIME_UNIT)).
getOrElse(Duration(1000,TimeUnit.MILLISECONDS))
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT)

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
@ -10,6 +10,7 @@ import akka.routing.Dispatcher
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import akka.japi.Procedure
class FutureTimeoutException(message: String) extends AkkaException(message)
@ -34,19 +35,24 @@ object Futures {
f
}
/**
* (Blocking!)
*/
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
* Returns the First Future that is completed
* if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans
* Returns the First Future that is completed (blocking!)
*/
def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs)
} while (future.isEmpty)
future.get
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures).await
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = {
val futureResult = new DefaultCompletableFuture[Any](timeout)
val fun = (f: Future[_]) => futureResult completeWith f.asInstanceOf[Future[Any]]
for(f <- futures) f onComplete fun
futureResult
}
/**
@ -55,34 +61,10 @@ object Futures {
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
in map { f => fun(f.await) }
/*
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
case class Result(res: Option[T])
val handOff = new SynchronousQueue[Option[T]]
spawn {
try {
println("f1 await")
f1.await
println("f1 offer")
handOff.offer(f1.result)
} catch {case _ => {}}
}
spawn {
try {
println("f2 await")
f2.await
println("f2 offer")
println("f2 offer: " + f2.result)
handOff.offer(f2.result)
} catch {case _ => {}}
}
Thread.sleep(100)
handOff.take
}
*/
/**
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
*/
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException
}
sealed trait Future[T] {
@ -100,6 +82,24 @@ sealed trait Future[T] {
def exception: Option[Throwable]
def onComplete(func: Future[T] => Unit): Future[T]
/**
* Returns the current result, throws the exception is one has been raised, else returns None
*/
def resultOrException: Option[T] = {
val r = result
if (r.isDefined) result
else {
val problem = exception
if (problem.isDefined) throw problem.get
else None
}
}
/* Java API */
def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(f => proc(f))
def map[O](f: (T) => O): Future[O] = {
val wrapped = this
new Future[O] {
@ -110,14 +110,21 @@ sealed trait Future[T] {
def timeoutInNanos = wrapped.timeoutInNanos
def result: Option[O] = { wrapped.result map f }
def exception: Option[Throwable] = wrapped.exception
def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this }
}
}
}
trait CompletableFuture[T] extends Future[T] {
def completeWithResult(result: T)
def completeWithException(exception: Throwable)
def completeWith(other: Future[T]) {
val result = other.result
val exception = other.exception
if (result.isDefined) completeWithResult(result.get)
else if (exception.isDefined) completeWithException(exception.get)
//else TODO how to handle this case?
}
}
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
@ -133,6 +140,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private var _completed: Boolean = _
private var _result: Option[T] = None
private var _exception: Option[Throwable] = None
private var _listeners: List[Future[T] => Unit] = Nil
def await = try {
_lock.lock
@ -190,33 +198,67 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
_lock.unlock
}
def completeWithResult(result: T) = try {
_lock.lock
if (!_completed) {
_completed = true
_result = Some(result)
onComplete(result)
def completeWithResult(result: T) {
val notify = try {
_lock.lock
if (!_completed) {
_completed = true
_result = Some(result)
true
} else false
} finally {
_signal.signalAll
_lock.unlock
}
} finally {
_signal.signalAll
_lock.unlock
if (notify)
notifyListeners
}
def completeWithException(exception: Throwable) = try {
_lock.lock
if (!_completed) {
_completed = true
_exception = Some(exception)
onCompleteException(exception)
def completeWithException(exception: Throwable) {
val notify = try {
_lock.lock
if (!_completed) {
_completed = true
_exception = Some(exception)
true
} else false
} finally {
_signal.signalAll
_lock.unlock
}
} finally {
_signal.signalAll
_lock.unlock
if (notify)
notifyListeners
}
protected def onComplete(result: T) {}
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
val notifyNow = try {
_lock.lock
if (!_completed) {
_listeners ::= func
false
}
else
true
} finally {
_lock.unlock
}
protected def onCompleteException(exception: Throwable) {}
if (notifyNow)
notifyListener(func)
this
}
private def notifyListeners() {
for(l <- _listeners)
notifyListener(l)
}
private def notifyListener(func: Future[T] => Unit) {
func(this)
}
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
}

View file

@ -145,7 +145,11 @@ trait MessageDispatcher extends MailboxFactory with Logging {
}
}
private[akka] def timeoutMs: Long = 1000
/**
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
*/
private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference

View file

@ -192,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite {
a.start
assertDispatcher(dispatcher)(starts = 1, stops = 0)
a.stop
await(dispatcher.stops.get == 1)(withinMs = 10000)
await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 1, stops = 1)
assertRef(a,dispatcher)(
suspensions = 0,

View file

@ -5,6 +5,7 @@ import org.junit.Test
import akka.dispatch.Futures
import Actor._
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.CountDownLatch
object FutureSpec {
class TestActor extends Actor {
@ -53,7 +54,6 @@ class FutureSpec extends JUnitSuite {
actor.stop
}
/*
// FIXME: implement Futures.awaitEither, and uncomment these two tests
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start
@ -78,7 +78,7 @@ class FutureSpec extends JUnitSuite {
actor1.stop
actor2.stop
}
*/
@Test def shouldFutureAwaitOneLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start

View file

@ -13,6 +13,7 @@ import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.{String, IllegalArgumentException}
import reflect.Manifest
import akka.japi.Procedure
import akka.dispatch.Dispatchers
/**
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
@ -23,6 +24,8 @@ import akka.japi.Procedure
*/
object AMQP {
lazy val consumerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("amqp-consumers").build
lazy val producerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("amqp-producers").build
/**
* Parameters used to make the connection to the amqp broker. Uses the rabbitmq defaults.
*/
@ -57,7 +60,8 @@ object AMQP {
*/
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None) {
channelCallback: Option[ActorRef] = None,
prefetchSize: Int = 0) {
// Needed for Java API usage
def this() = this (None, None)
@ -148,7 +152,7 @@ object AMQP {
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
exchangeParameters: Option[ExchangeParameters],
exchangeParameters: Option[ExchangeParameters] = None,
queueDeclaration: Declaration = ActiveDeclaration(),
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
@ -232,6 +236,7 @@ object AMQP {
def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = {
val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters))
producer.dispatcher = producerDispatcher
connection.startLink(producer)
producer ! Start
producer
@ -239,7 +244,9 @@ object AMQP {
def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = {
val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters))
consumer.dispatcher = consumerDispatcher
val handler = consumerParameters.deliveryHandler
if (handler.isUnstarted) handler.dispatcher = consumerDispatcher
if (handler.supervisor.isEmpty) consumer.startLink(handler)
connection.startLink(consumer)
consumer ! Start
@ -342,7 +349,7 @@ object AMQP {
}
val deliveryHandler = actorOf( new Actor {
def receive = { case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) }
def receive = { case Delivery(payload, _, _, _, _, _) => handler.apply(new String(payload)) }
} ).start
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
@ -432,7 +439,7 @@ object AMQP {
}
val deliveryHandler = actorOf(new Actor {
def receive = { case Delivery(payload, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) }
def receive = { case Delivery(payload, _, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) }
}).start
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))

View file

@ -38,6 +38,7 @@ case class Delivery(
payload: Array[Byte],
routingKey: String,
deliveryTag: Long,
isRedeliver: Boolean,
properties: BasicProperties,
sender: Option[ActorRef]) extends AMQPMessage

View file

@ -10,7 +10,7 @@ import akka.util.Logging
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer}
import akka.amqp.AMQP.{NoActionDeclaration, ActiveDeclaration, PassiveDeclaration, ConsumerParameters}
import akka.amqp.AMQP._
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
extends FaultTolerantChannelActor(
@ -30,38 +30,38 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
protected def setupChannel(ch: Channel) = {
val queueDeclare: com.rabbitmq.client.AMQP.Queue.DeclareOk = {
queueName match {
case Some(name) =>
queueDeclaration match {
case PassiveDeclaration =>
log.debug("Passively declaring new queue [%s] for %s", name, toString)
ch.queueDeclarePassive(name)
case ActiveDeclaration(durable, autoDelete, exclusive) =>
log.debug("Actively declaring new queue [%s] for %s", name, toString)
val configurationArguments = exchangeParameters match {
case Some(params) => params.configurationArguments
case _ => Map.empty
}
ch.queueDeclare(name, durable, exclusive, autoDelete, JavaConversions.asMap(configurationArguments.toMap))
case NoActionDeclaration => new com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk(name, 0, 0) // do nothing here
channelParameters.foreach(params => ch.basicQos(params.prefetchSize))
val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName))
val consumingQueue = exchangeName match {
case Some(exchange) =>
val queueDeclare: com.rabbitmq.client.AMQP.Queue.DeclareOk = {
queueName match {
case Some(name) =>
declareQueue(ch, name, queueDeclaration)
case None =>
log.debug("Declaring new generated queue for %s", toString)
ch.queueDeclare
}
case None =>
log.debug("Declaring new generated queue for %s", toString)
ch.queueDeclare
}
}
log.debug("Binding new queue [%s] with [%s] for %s", queueDeclare.getQueue, routingKey, toString)
ch.queueBind(queueDeclare.getQueue, exchange, routingKey)
queueDeclare.getQueue
case None =>
// no exchange, use routing key as queuename
log.debug("No exchange specified, creating queue using routingkey as name (%s)", routingKey)
declareQueue(ch, routingKey, queueDeclaration)
routingKey
}
val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName))
log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString)
ch.queueBind(queueDeclare.getQueue, exchangeName.getOrElse(""), routingKey)
val tag = ch.basicConsume(queueDeclare.getQueue, false, new DefaultConsumer(ch) with Logging {
val tag = ch.basicConsume(consumingQueue, false, new DefaultConsumer(ch) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: BasicProperties, payload: Array[Byte]) {
try {
val deliveryTag = envelope.getDeliveryTag
log.debug("Passing a message on to %s", toString)
deliveryHandler ! Delivery(payload, envelope.getRoutingKey, envelope.getDeliveryTag, properties, someSelf)
import envelope._
deliveryHandler ! Delivery(payload, getRoutingKey, getDeliveryTag, isRedeliver, properties, someSelf)
if (selfAcknowledging) {
log.debug("Self acking...")
@ -78,6 +78,22 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
log.info("Intitialized %s", toString)
}
private def declareQueue(ch: Channel, queueName: String, queueDeclaration: Declaration): com.rabbitmq.client.AMQP.Queue.DeclareOk = {
queueDeclaration match {
case PassiveDeclaration =>
log.debug("Passively declaring new queue [%s] for %s", queueName, toString)
ch.queueDeclarePassive(queueName)
case ActiveDeclaration(durable, autoDelete, exclusive) =>
log.debug("Actively declaring new queue [%s] for %s", queueName, toString)
val configurationArguments = exchangeParameters match {
case Some(params) => params.configurationArguments
case _ => Map.empty
}
ch.queueDeclare(queueName, durable, exclusive, autoDelete, JavaConversions.asMap(configurationArguments.toMap))
case NoActionDeclaration => new com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk(queueName, 0, 0) // do nothing here
}
}
private def acknowledgeDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = {
log.debug("Acking message with delivery tag [%s]", deliveryTag)
channel.foreach {

View file

@ -70,7 +70,7 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
case Delivery(payload, _, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}}), None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
@ -85,11 +85,11 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
case Delivery(payload, _, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}}), None, Some(exchangeParameters)))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
case Delivery(payload, _, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}}), None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
@ -104,11 +104,11 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
case Delivery(payload, _, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}}), None, Some(exchangeParameters)))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
case Delivery(payload, _, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}}), None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
@ -200,8 +200,8 @@ object ExampleSession {
def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer,
requestHandler _, queueName = Some("rpc.in.key.queue"))
val rpcServer = RPC.newRpcServer(connection, exchangeName, rpcServerSerializer, requestHandler _,
routingKey = Some("rpc.in.key"), queueName = Some("rpc.in.key.queue"))
/** Client */
@ -213,9 +213,9 @@ object ExampleSession {
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeName, "rpc.in.key", rpcClientSerializer)
val rpcClient = RPC.newRpcClient(connection, exchangeName, rpcClientSerializer, Some("rpc.in.key"))
val response = (rpcClient !! "rpc_request")
val response = rpcClient.call("rpc_request")
log.info("Response: " + response)
}

View file

@ -14,8 +14,8 @@ object RPC {
def newRpcClient[O, I](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcClientSerializer[O, I]): ActorRef = {
newRpcClient(connection, exchangeName, routingKey, serializer, None)
serializer: RpcClientSerializer[O, I]): RpcClient[O,I] = {
newRpcClient(connection, exchangeName, serializer, Some(routingKey), None)
}
// Needed for Java API usage
@ -23,81 +23,93 @@ object RPC {
exchangeName: String,
routingKey: String,
serializer: RpcClientSerializer[O, I],
channelParameters: ChannelParameters): ActorRef = {
newRpcClient(connection, exchangeName, routingKey, serializer, Some(channelParameters))
channelParameters: ChannelParameters): RpcClient[O,I] = {
newRpcClient(connection, exchangeName, serializer, Some(routingKey), Some(channelParameters))
}
def newRpcClient[O, I](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcClientSerializer[O, I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
routingKey: Option[String] = None,
channelParameters: Option[ChannelParameters] = None): RpcClient[O,I] = {
val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](
ExchangeParameters(exchangeName, exchangeDeclaration = PassiveDeclaration), routingKey, serializer, channelParameters))
ExchangeParameters(exchangeName, exchangeDeclaration = PassiveDeclaration), rKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
new RpcClient(rpcActor)
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: japi.Function[I,O]): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _)
requestHandler: japi.Function[I,O],
routingKey: String): RpcServerHandle = {
newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey))
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: Function[I,O],
routingKey: String,
queueName: String): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName))
newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey), Some(queueName))
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: japi.Function[I,O],
routingKey: String,
channelParameters: ChannelParameters): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters))
newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey), None, Some(channelParameters))
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: japi.Function[I,O],
routingKey: String,
queueName: String,
channelParameters: ChannelParameters): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters))
newRpcServer(connection, exchangeName, serializer, requestHandler.apply _, Some(routingKey), Some(queueName), Some(channelParameters))
}
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None): RpcServerHandle = {
channelParameters: Option[ChannelParameters] = None,
poolSize: Int = 1): RpcServerHandle = {
val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
val qName = queueName.getOrElse("%s.in".format(rKey))
val producer = newProducer(connection, ProducerParameters(channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, ConsumerParameters(routingKey, rpcServer,
exchangeParameters = Some(ExchangeParameters(exchangeName)), channelParameters = channelParameters,
selfAcknowledging = false, queueName = queueName))
RpcServerHandle(producer, consumer)
val consumers = (1 to poolSize).map {
num =>
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
newConsumer(connection, ConsumerParameters(rKey, rpcServer,
exchangeParameters = Some(ExchangeParameters(exchangeName)), channelParameters = channelParameters,
selfAcknowledging = false, queueName = Some(qName)))
}
RpcServerHandle(producer, consumers)
}
case class RpcServerHandle(producer: ActorRef, consumer: ActorRef) {
case class RpcServerHandle(producer: ActorRef, consumers: Seq[ActorRef]) {
def stop = {
consumer.stop
consumers.foreach(_.stop)
producer.stop
}
}
@ -195,7 +207,7 @@ object RPC {
def toBinary(t: O) = t.toByteArray
})
startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer)
newRpcServer(connection, exchangeName, serializer, requestHandler, routingKey, queueName)
}
// Needed for Java API usage
@ -234,7 +246,7 @@ object RPC {
}
})
startClient(connection, exchangeName, routingKey, serializer)
newRpcClient(connection, exchangeName, serializer, routingKey)
}
// Needed for Java API usage
@ -276,7 +288,7 @@ object RPC {
def toBinary(t: String) = t.getBytes
})
startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer)
newRpcServer(connection, exchangeName, serializer, requestHandler, routingKey, queueName)
}
// Needed for Java API usage
@ -306,31 +318,7 @@ object RPC {
}
})
startClient(connection, exchange, routingKey, serializer)
}
private def startClient[O, I](connection: ActorRef,
exchangeName: String,
routingKey: Option[String] = None,
serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = {
val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
val client = newRpcClient(connection, exchangeName, rKey, serializer)
new RpcClient(client)
}
private def startServer[I, O](connection: ActorRef,
exchangeName: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
serializer: RpcServerSerializer[I, O]): RpcServerHandle = {
val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
val qName = queueName.getOrElse("%s.in".format(rKey))
newRpcServer(connection, exchangeName, rKey, serializer, requestHandler, Some(qName))
newRpcClient(connection, exchange, serializer, routingKey)
}
}

View file

@ -16,7 +16,7 @@ class RpcServerActor[I,O](
log.info("%s started", this)
protected def receive = {
case Delivery(payload, _, tag, props, sender) => {
case Delivery(payload, _, tag, _, props, sender) => {
log.debug("%s handling delivery with tag %d", this, tag)
val request = serializer.fromBinary.fromBinary(payload)

View file

@ -45,8 +45,8 @@ class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMat
val consumerExchangeParameters = ExchangeParameters("text_exchange")
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor {
def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
}).start,
def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open }
}),
exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)

View file

@ -65,9 +65,9 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must
val consumerExchangeParameters = ExchangeParameters("text_exchange")
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor {
def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
}).start,
exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open }
}), exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch

View file

@ -36,7 +36,7 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM
var deliveryTagCheck: Long = -1
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actorOf( new Actor {
def receive = {
case Delivery(payload, _, deliveryTag, _, sender) => {
case Delivery(payload, _, deliveryTag, _, _, sender) => {
if (!failLatch.isOpen) {
failLatch.open
error("Make it fail!")
@ -47,7 +47,7 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM
}
case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
}
}).start, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters),
}), queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters),
selfAcknowledging = false, channelParameters = Some(channelParameters),
queueDeclaration = ActiveDeclaration(autoDelete = false)))

View file

@ -35,10 +35,10 @@ class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatche
val rejectedLatch = new StandardLatch
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actorOf( new Actor {
def receive = {
case Delivery(payload, _, deliveryTag, _, sender) => sender.foreach(_ ! Reject(deliveryTag))
case Delivery(payload, _, deliveryTag, _, _, sender) => sender.foreach(_ ! Reject(deliveryTag))
case Rejected(deliveryTag) => rejectedLatch.open
}
}).start, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters),
}), queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters),
selfAcknowledging = false, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,

View file

@ -33,8 +33,8 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers {
val payloadLatch = new StandardLatch
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf(new Actor {
def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
}).start, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open }
}), exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters)))

View file

@ -0,0 +1,45 @@
package akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import akka.amqp._
import org.multiverse.api.latches.StandardLatch
import akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import akka.actor.Actor
class AMQPConsumerPrivateQueueTestIntegration extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val countDown = new CountDownLatch(2)
val channelCallback = actorOf(new Actor {
def receive = {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
}).start
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val payloadLatch = new StandardLatch
val consumer = AMQP.newConsumer(connection, ConsumerParameters("my.private.routing.key", actorOf(new Actor {
def receive = { case Delivery(payload, _, _, _, _, _) => payloadLatch.open }
}), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(channelParameters = Some(channelParameters)))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "my.private.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
}

View file

@ -44,8 +44,8 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, "rpc.routing", rpcServerSerializer,
requestHandler _, channelParameters = Some(channelParameters))
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, rpcServerSerializer,
requestHandler _, Some("rpc.routing"), channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
@ -54,11 +54,11 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeName, "rpc.routing", rpcClientSerializer,
val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeName, rpcClientSerializer, Some("rpc.routing"),
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
val response = rpcClient.call("some_payload")
response must be(Some(3))
}
}

View file

@ -17,12 +17,12 @@ import Actor._
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
case class Add(email: String, value: String)
case class AddEmail(email: String, value: String)
case class GetAll(email: String)
class MySortedSet extends Transactor {
def receive = {
case Add(userEmail, value) => {
case AddEmail(userEmail, value) => {
val registryId = "userValues:%s".format(userEmail)
val storageSet = RedisStorage.getSortedSet(registryId)
storageSet.add(value.getBytes, System.nanoTime.toFloat)
@ -58,8 +58,9 @@ class RedisTicket513Spec extends
val a = actorOf[MySortedSet]
a.start
it("should work with transactors") {
(a !! Add("test.user@gmail.com", "foo")).get should equal(1)
(a !! Add("test.user@gmail.com", "bar")).get should equal(2)
(a !! AddEmail("test.user@gmail.com", "foo")).get should equal(1)
Thread.sleep(10)
(a !! AddEmail("test.user@gmail.com", "bar")).get should equal(2)
(a !! GetAll("test.user@gmail.com")).get.asInstanceOf[List[_]].size should equal(2)
}
}

View file

@ -310,13 +310,11 @@ class RemoteClient private[akka] (
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult)
connection.getChannel.write(request)
Some(futureResult)
}
}
} else {
val exception = new RemoteClientException(

View file

@ -16,7 +16,6 @@ import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.Config._
import akka.config.ConfigurationException
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
@ -31,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
/**
* Use this object if you need a single remote server on a specific node.
@ -66,10 +66,10 @@ object RemoteNode extends RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
val UUID_PREFIX = "uuid:"
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val UUID_PREFIX = "uuid:"
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
@ -400,7 +400,7 @@ class RemoteServerPipelineFactory(
}
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
@ -497,7 +497,7 @@ class RemoteServerHandler(
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
log.debug("Received RemoteMessageProtocol[\n%s]", request.toString)
log.debug("Received RemoteMessageProtocol[\n%s]".format(request.toString))
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
@ -538,41 +538,46 @@ class RemoteServerHandler(
message,
request.getActorInfo.getTimeout,
None,
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
override def onComplete(result: AnyRef) {
log.debug("Returning result from actor invocation [%s]", result)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result),
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
onComplete(f => {
val result = f.result
val exception = f.exception
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
if (exception.isDefined) {
log.debug("Returning exception from actor invocation [%s]".format(exception.get))
try {
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
}
else if (result.isDefined) {
log.debug("Returning result from actor invocation [%s]".format(result.get))
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result.get),
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
try {
channel.write(messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
try {
channel.write(messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
}
}
override def onCompleteException(exception: Throwable) {
try {
channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor))
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
}
}
))
)
))
}
}
@ -589,7 +594,10 @@ class RemoteServerHandler(
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
else {
val result = messageReceiver.invoke(typedActor, args: _*)
val result = messageReceiver.invoke(typedActor, args: _*) match {
case f: Future[_] => f.await.result.get
case other => other
}
log.debug("Returning result from remote typed actor invocation [%s]", result)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.ticket
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import akka.remote.{RemoteClient, RemoteServer}
import akka.actor._
class Ticket519Spec extends Spec with ShouldMatchers {
val HOSTNAME = "localhost"
val PORT = 6666
describe("A remote TypedActor") {
it("should handle remote future replies") {
import akka.remote._
val server = { val s = new RemoteServer; s.start(HOSTNAME,PORT); s}
val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,HOSTNAME,PORT)
val r = actor.someFutureString
r.await.result.get should equal ("foo")
TypedActor.stop(actor)
server.shutdown
}
}
}

View file

@ -181,7 +181,6 @@ abstract class TypedActor extends Actor with Proxyable {
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed
else self.reply(joinPoint.proceed)
case Link(proxy) => self.link(proxy)
case Unlink(proxy) => self.unlink(proxy)
case unexpected => throw new IllegalActorStateException(
@ -851,6 +850,7 @@ private[akka] abstract class ActorAspect {
ActorType.TypedActor)
if (isOneWay) null // for void methods
else if (TypedActor.returnsFuture_?(methodRtti)) future.get
else {
if (future.isDefined) {
future.get.await

View file

@ -1,8 +1,9 @@
package akka.actor;
import java.util.concurrent.CountDownLatch;
import akka.dispatch.Future;
public interface SamplePojo {
public String greet(String s);
public String fail();
public Future<String> someFutureString();
}

View file

@ -25,6 +25,10 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
throw new RuntimeException("expected");
}
public akka.dispatch.Future<String> someFutureString() {
return future("foo");
}
@Override
public void preRestart(Throwable e) {
_pre = true;

View file

@ -26,6 +26,7 @@ akka {
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down
default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
@ -135,6 +136,7 @@ akka {
service = on
hostname = "localhost" # The hostname or IP that clients should connect to
port = 2552 # The port clients should connect to
message-frame-size = 1048576
connection-timeout = 1
require-cookie = on
untrusted-mode = off