diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 172a89ea74..be3556e812 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -6,11 +6,11 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.util.Logging - import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.serialization.Serializer /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. @@ -31,20 +31,23 @@ object AMQP { connectionCallback: Option[ActorRef] = None) case class ChannelParameters( + shutdownListener: Option[ShutdownListener] = None, + channelCallback: Option[ActorRef] = None) + + case class ExchangeParameters( exchangeName: String, exchangeType: ExchangeType, exchangeDurable: Boolean = false, exchangeAutoDelete: Boolean = true, exchangePassive: Boolean = false, - shutdownListener: Option[ShutdownListener] = None, - configurationArguments: Map[String, AnyRef] = Map(), - channelCallback: Option[ActorRef] = None) + configurationArguments: Map[String, AnyRef] = Map()) - case class ProducerParameters(channelParameters: ChannelParameters, - producerId: Option[String] = None, - returnListener: Option[ReturnListener] = None) + case class ProducerParameters(exchangeParameters: ExchangeParameters, + producerId: Option[String] = None, + returnListener: Option[ReturnListener] = None, + channelParameters: Option[ChannelParameters] = None) - case class ConsumerParameters(channelParameters: ChannelParameters, + case class ConsumerParameters(exchangeParameters: ExchangeParameters, routingKey: String, deliveryHandler: ActorRef, queueName: Option[String] = None, @@ -52,7 +55,9 @@ object AMQP { queueAutoDelete: Boolean = true, queuePassive: Boolean = false, queueExclusive: Boolean = false, - selfAcknowledging: Boolean = true) { + selfAcknowledging: Boolean = true, + channelParameters: Option[ChannelParameters] = None) { + if (queueDurable && queueName.isEmpty) { throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") } @@ -79,6 +84,33 @@ object AMQP { consumer } + def newRpcClient(connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + inSerializer: Serializer, + outSerializer: Serializer, + channelParameters: Option[ChannelParameters] = None): ActorRef = { + val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters)) + connection.startLink(rpcActor) + rpcActor ! Start + rpcActor + } + + def newRpcServer(connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + inSerializer: Serializer, + outSerializer: Serializer, + requestHandler: PartialFunction[AnyRef, AnyRef], + channelParameters: Option[ChannelParameters] = None) = { + val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler)) + val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer + , channelParameters = channelParameters + , selfAcknowledging = false)) + + } + private val supervisor = new AMQPSupervisor class AMQPSupervisor extends Logging { diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/ConsumerActor.scala index dd2e811702..26d1ac00db 100644 --- a/akka-amqp/src/main/scala/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/ConsumerActor.scala @@ -12,9 +12,11 @@ import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer} import com.rabbitmq.client.AMQP.BasicProperties import java.lang.Throwable -private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) extends FaultTolerantChannelActor(consumerParameters.channelParameters) { +private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) + extends FaultTolerantChannelActor(consumerParameters.exchangeParameters, consumerParameters.channelParameters) { + import consumerParameters._ - import channelParameters._ + import exchangeParameters._ var listenerTag: Option[String] = None diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 79cd97c5bf..8049eb74ab 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -6,8 +6,10 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ -import se.scalablesolutions.akka.amqp.AMQP.{ConnectionParameters, ConsumerParameters, ChannelParameters, ProducerParameters} import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP._ +import se.scalablesolutions.akka.serialization.Serializer +import java.lang.Class object ExampleSession { def main(args: Array[String]) = { @@ -31,6 +33,11 @@ object ExampleSession { TimeUnit.SECONDS.sleep(2) + println("==== RPC ===") + rpc + + TimeUnit.SECONDS.sleep(2) + ActorRegistry.shutdownAll System.exit(0) } @@ -40,13 +47,13 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct) + val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct) - val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "some.routing", actor { + val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "some.routing", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) })) - val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing") } @@ -55,17 +62,17 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val channelParameters = ChannelParameters("my_fanout_exchange", ExchangeType.Fanout) + val exchangeParameters = ExchangeParameters("my_fanout_exchange", ExchangeType.Fanout) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor { + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) })) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor { + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor { case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) })) - val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) producer ! Message("@jonas_boner: I'm going surfing".getBytes, "") } @@ -74,17 +81,17 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val channelParameters = ChannelParameters("my_topic_exchange", ExchangeType.Topic) + val exchangeParameters = ExchangeParameters("my_topic_exchange", ExchangeType.Topic) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor { + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) })) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor { + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor { case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) })) - val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush") producer ! Message("@jonas_boner: Yes I can!".getBytes, "@barack_obama") } @@ -107,16 +114,38 @@ object ExampleSession { case Restarting => // not used, sent when channel or connection fails and initiates a restart case Stopped => log.info("Channel callback: Stopped") } - val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct, channelCallback = Some(channelCallback)) + val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "callback.routing", actor { + val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor { case _ => () // not used - })) + }, channelParameters = Some(channelParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) // Wait until both channels (producer & consumer) are started before stopping the connection channelCountdown.await(2, TimeUnit.SECONDS) connection.stop } + + def rpc = { + val connection = AMQP.newConnection() + + val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic) + + val stringSerializer = new Serializer { + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) + def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes + } + + val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, { + case "rpc_request" => "rpc_response" + case _ => error("unknown request") + }) + + val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer) + + val response = (rpcClient !! "rpc_request") + log.info("Response: " + response) + } } diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala index b29b26b621..40bcd5de57 100644 --- a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala @@ -8,12 +8,14 @@ import collection.JavaConversions import java.lang.Throwable import se.scalablesolutions.akka.actor.Actor import Actor._ -import se.scalablesolutions.akka.amqp.AMQP.ChannelParameters import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener} import scala.PartialFunction +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} -abstract private[amqp] class FaultTolerantChannelActor(channelParameters: ChannelParameters) extends Actor { - import channelParameters._ +abstract private[amqp] class FaultTolerantChannelActor( + exchangeParameters: ExchangeParameters, channelParameters: Option[ChannelParameters]) extends Actor { + + import exchangeParameters._ protected[amqp] var channel: Option[Channel] = None log.info("%s is started", toString) @@ -62,20 +64,20 @@ abstract private[amqp] class FaultTolerantChannelActor(channelParameters: Channe protected def setupChannel(ch: Channel) private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) { - log.info("Exchange declare") - if (exchangePassive) { - ch.exchangeDeclarePassive(exchangeName) - } else { - ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments)) + if (exchangeName != "") { + if (exchangePassive) { + ch.exchangeDeclarePassive(exchangeName) + } else { + ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments)) + } } ch.addShutdownListener(new ShutdownListener { def shutdownCompleted(cause: ShutdownSignalException) = { self ! ChannelShutdown(cause) } }) - shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl)) + channelParameters.foreach(_.shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl))) - log.info("shutdown listener added") setupChannel(ch) channel = Some(ch) notifyCallback(Started) @@ -93,7 +95,7 @@ abstract private[amqp] class FaultTolerantChannelActor(channelParameters: Channe } private def notifyCallback(message: AMQPMessage) = { - channelCallback.foreach(cb => if (cb.isRunning) cb ! message) + channelParameters.foreach(_.channelCallback.foreach(cb => if (cb.isRunning) cb ! message)) } override def preRestart(reason: Throwable) = { diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 6e3a936256..5f0a49910e 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -6,18 +6,21 @@ package se.scalablesolutions.akka.amqp import java.util.{TimerTask, Timer} import java.io.IOException -import se.scalablesolutions.akka.util.Logging import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import se.scalablesolutions.akka.config.OneForOneStrategy -private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor with Logging { +private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { import connectionParameters._ self.id = "amqp-connection-%s".format(host) self.lifeCycle = Some(LifeCycle(Permanent)) + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + val reconnectionTimer = new Timer("%s-timer".format(self.id)) val connectionFactory: ConnectionFactory = new ConnectionFactory() @@ -39,7 +42,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio } case None => { log.warning("Unable to create new channel - no connection") - reply(None) + self.reply(None) } } } diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/ProducerActor.scala index 7e4a11a089..db290a5ac1 100644 --- a/akka-amqp/src/main/scala/ProducerActor.scala +++ b/akka-amqp/src/main/scala/ProducerActor.scala @@ -7,9 +7,11 @@ package se.scalablesolutions.akka.amqp import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters -private[amqp] class ProducerActor(producerParameters: ProducerParameters) extends FaultTolerantChannelActor(producerParameters.channelParameters) { +private[amqp] class ProducerActor(producerParameters: ProducerParameters) + extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) { + import producerParameters._ - import channelParameters._ + import exchangeParameters._ producerId.foreach(id => self.id = id) diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala new file mode 100644 index 0000000000..8ff7d8a0ac --- /dev/null +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp + +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} +import com.rabbitmq.client.{Channel, RpcClient} + +class RpcClientActor(exchangeParameters: ExchangeParameters, + routingKey: String, + inSerializer: Serializer, + outSerializer: Serializer, + channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { + + import exchangeParameters._ + + var rpcClient: Option[RpcClient] = None + + log.info("%s started", this) + + def specificMessageHandler = { + case payload: AnyRef => { + + rpcClient match { + case Some(client) => + val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload)) + self.reply(outSerializer.fromBinary(response, None)) + case None => error("%s has no client to send messages with".format(this)) + } + } + } + + protected def setupChannel(ch: Channel) = { + rpcClient = Some(new RpcClient(ch, exchangeName, routingKey)) + } + + override def preRestart(reason: Throwable) = { + rpcClient = None + super.preRestart(reason) + } + + + override def toString(): String = + "AMQP.RpcClient[exchange=" +exchangeName + + ", routingKey=" + routingKey+ "]" + +} \ No newline at end of file diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala new file mode 100644 index 0000000000..fa760edda8 --- /dev/null +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp + +import se.scalablesolutions.akka.actor.{ActorRef, Actor} +import com.rabbitmq.client.AMQP.BasicProperties +import se.scalablesolutions.akka.serialization.Serializer + +class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor { + + log.info("%s started", this) + + protected def receive = { + case Delivery(payload, _, tag, props, sender) => { + + log.debug("%s handling delivery with tag %d", this, tag) + val request = inSerializer.fromBinary(payload, None) + val response: Array[Byte] = outSerializer.toBinary(requestHandler(request)) + + log.debug("%s sending reply to %s", this, props.getReplyTo) + val replyProps = new BasicProperties + replyProps.setCorrelationId(props.getCorrelationId) + producer ! new Message(response, props.getReplyTo, properties = Some(replyProps)) + + sender.foreach(_ ! Acknowledge(tag)) + } + case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag) + } + + override def toString(): String = + "AMQP.RpcServer[]" +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala index 72ccab3cc1..3bc2cb20dd 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala @@ -17,8 +17,9 @@ import org.scalatest.matchers.MustMatchers class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def connectionAndRecovery = { + @Test + def connectionAndRecovery = if (AMQPTest.enabled) { + val connectedLatch = new StandardLatch val reconnectingLatch = new StandardLatch val reconnectedLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala index 11de861a6a..0f6fadfcc4 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala @@ -11,20 +11,20 @@ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers -import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters} import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test +import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerChannelRecovery = { + @Test + def consumerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { val producer = AMQP.newProducer(connection, ProducerParameters( - ChannelParameters("text_exchange", ExchangeType.Direct))) + ExchangeParameters("text_exchange", ExchangeType.Direct))) val consumerStartedLatch = new StandardLatch val consumerRestartedLatch = new StandardLatch @@ -41,10 +41,11 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with } val payloadLatch = new StandardLatch - val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor { + val consumerExchangeParameters = ExchangeParameters("text_exchange", ExchangeType.Direct) + val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback)) + val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerExchangeParameters, "non.interesting.routing.key", actor { case Delivery(payload, _, _, _, _) => payloadLatch.open - })) + }, channelParameters = Some(consumerChannelParameters))) consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) val listenerLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala index fe339cb6c5..9dccd43be8 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala @@ -11,15 +11,15 @@ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers -import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters} import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test +import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerConnectionRecovery = { + @Test + def consumerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { @@ -37,8 +37,9 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi case Stopped => () } + val channelParameters = ChannelParameters(channelCallback = Some(producerChannelCallback)) val producer = AMQP.newProducer(connection, ProducerParameters( - ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerChannelCallback)))) + ExchangeParameters("text_exchange", ExchangeType.Direct), channelParameters = Some(channelParameters))) producerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) @@ -58,10 +59,11 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi val payloadLatch = new StandardLatch - val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor { + val consumerExchangeParameters = ExchangeParameters("text_exchange", ExchangeType.Direct) + val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback)) + val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerExchangeParameters, "non.interesting.routing.key", actor { case Delivery(payload, _, _, _, _) => payloadLatch.open - })) + }, channelParameters = Some(consumerChannelParameters))) consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) val listenerLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala index 448b93689d..d48f38afc5 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala @@ -6,19 +6,19 @@ package se.scalablesolutions.akka.amqp.test import se.scalablesolutions.akka.util.Logging import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters} import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp._ -import org.junit.{After, Test} -import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef} +import org.junit.Test +import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessageManualAcknowledge = { + @Test + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) @@ -27,19 +27,28 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit case Restarting => () case Stopped => () } - val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback)) + val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + val failLatch = new StandardLatch val acknowledgeLatch = new StandardLatch var deliveryTagCheck: Long = -1 - val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "manual.ack.this", actor { + val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.ack.this", actor { case Delivery(payload, _, deliveryTag, _, sender) => { - deliveryTagCheck = deliveryTag - sender.foreach(_ ! Acknowledge(deliveryTag)) + if (!failLatch.isOpen) { + failLatch.open + error("Make it fail!") + } else { + deliveryTagCheck = deliveryTag + sender.foreach(_ ! Acknowledge(deliveryTag)) + } } case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open - }, selfAcknowledging = false)) + }, queueName = Some("self.ack.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters))) + + val producer = AMQP.newProducer(connection, + ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) countDown.await(2, TimeUnit.SECONDS) must be (true) producer ! Message("some_payload".getBytes, "manual.ack.this") diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala index 31cff4414c..af94b0a515 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala @@ -8,16 +8,16 @@ import se.scalablesolutions.akka.util.Logging import org.scalatest.junit.JUnitSuite import org.junit.Test import se.scalablesolutions.akka.amqp._ -import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters} import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { @@ -28,14 +28,17 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging case Stopped => () } - val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback)) + val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val payloadLatch = new StandardLatch - val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "non.interesting.routing.key", actor { + val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "non.interesting.routing.key", actor { case Delivery(payload, _, _, _, _) => payloadLatch.open - })) + }, channelParameters = Some(channelParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + val producer = AMQP.newProducer(connection, + ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) + countDown.await(2, TimeUnit.SECONDS) must be (true) producer ! Message("some_payload".getBytes, "non.interesting.routing.key") payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala index 9a02424e7f..095a21fc86 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala @@ -7,19 +7,18 @@ package se.scalablesolutions.akka.amqp.test import se.scalablesolutions.akka.util.Logging import org.scalatest.junit.JUnitSuite import org.junit.Test -import junit.framework.Assert import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters} import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerChannelRecovery = { + @Test + def producerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) @@ -40,8 +39,9 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with case Stopped => () }) + val channelParameters = ChannelParameters(channelCallback = Some(producerCallback)) val producerParameters = ProducerParameters( - ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback))) + ExchangeParameters("text_exchange", ExchangeType.Direct), channelParameters = Some(channelParameters)) val producer = AMQP.newProducer(connection, producerParameters) startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala index c69d37a24b..71bc08bdaa 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala @@ -12,13 +12,13 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters} import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerConnectionRecovery = { + @Test + def producerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { @@ -38,8 +38,9 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi case Stopped => () }) + val channelParameters = ChannelParameters(channelCallback = Some(producerCallback)) val producerParameters = ProducerParameters( - ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback))) + ExchangeParameters("text_exchange", ExchangeType.Direct), channelParameters = Some(channelParameters)) val producer = AMQP.newProducer(connection, producerParameters) startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala index 1bcd5168f7..ab9bb00e7c 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala @@ -14,13 +14,13 @@ import se.scalablesolutions.akka.amqp._ import com.rabbitmq.client.ReturnListener import com.rabbitmq.client.AMQP.BasicProperties import java.lang.String -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters} import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters} class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerMessage = { + @Test + def producerMessage = if (AMQPTest.enabled) { val connection: ActorRef = AMQP.newConnection() try { @@ -31,8 +31,7 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging } } val producerParameters = ProducerParameters( - ChannelParameters("text_exchange", ExchangeType.Direct), - returnListener = Some(returnListener)) + ExchangeParameters("text_exchange", ExchangeType.Direct), returnListener = Some(returnListener)) val producer = AMQP.newProducer(connection, producerParameters) diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala new file mode 100644 index 0000000000..eebcfccce3 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.amqp._ +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} +import se.scalablesolutions.akka.serialization.Serializer + +class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { + + @Test + def consumerMessage = if (AMQPTest.enabled) { + val connection = AMQP.newConnection() + try { + + val countDown = new CountDownLatch(3) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + + val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + val stringSerializer = new Serializer { + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) + def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes + } + + val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, { + case "some_payload" => "some_result" + case _ => error("Unhandled message") + }, channelParameters = Some(channelParameters)) + + val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer + , channelParameters = Some(channelParameters)) + + countDown.await(2, TimeUnit.SECONDS) must be (true) + val response = rpcClient !! "some_payload" + response must be (Some("some_result")) + } finally { + connection.stop + } + } + + @Test + def dummy { + // amqp tests need local rabbitmq server running, so a disabled by default. + // this dummy test makes sure that the whole test class doesn't fail because of missing tests + assert(true) + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala new file mode 100644 index 0000000000..e50ab673f6 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp.test + +object AMQPTest { + def enabled = false +} \ No newline at end of file diff --git a/akka-core/.ensime b/akka-core/.ensime new file mode 100644 index 0000000000..15e1ae85be --- /dev/null +++ b/akka-core/.ensime @@ -0,0 +1,79 @@ +( + + ;; Where you unpacked the ENSIME distribution. + :server-root "/home/jboner/emacs-config/lib/ensime" + + ;; The command with which to invoke the ENSIME server. Change this to + ;; "bin/server.bat" if your're on Windows. + :server-cmd "bin/server.bat" + + + ;; The host to connect to. Connecting to remote ENSIME servers is not + ;; currently supported. + ;; ------------------------------ + ;; :server-host "localhost" + + + ;; Assume a standard sbt directory structure. Look in default sbt + ;; locations for dependencies, sources, target, etc. + ;; + ;; Note for sbt subprojects: Each subproject needs it's own .ensime + ;; file. + ;; ----------------------------- + :use-sbt t + :sbt-compile-conf "compile" + + + ;; Use an existing pom.xml to determine the dependencies + ;; for the project. A Maven-style directory structure is assumed. + ;; ----------------------------- + ;; :use-maven t + ;; :maven-compile-scopes "compile" + ;; :maven-runtime-scopes "runtime" + + + ;; Use an existing ivy.xml to determine the dependencies + ;; for the project. A Maven-style directory structure is assumed. + ;; ----------------------------- + ;; :use-ivy t + ;; :ivy-compile-conf "compile" + ;; :ivy-runtime-conf "compile" + + + ;; The home package for your project. + ;; Used by ENSIME to populate the project outline view. + ;; ------------------------------ + :project-package "se.scalablesolutions.akka" + + + ;; :sources ([dir | file]*) + ;; Include source files by directory(recursively) or by filename. + ;; ------------------------------ + :sources ("src/main/") + + + ;; :dependency-jars ([dir | file]*) + ;; Include jars by directory(recursively) or by filename. + ;; ------------------------------ + ;; :dependency-jars ("lib") + + + ;; :dependency-dirs ([dir | file]*) + ;; Include directories of .class files. + ;; ------------------------------ + ;; :dependency-dirs ("target/classes") + + + ;; :target dir + ;; Specify the target of the project build process. Should be + ;; the directory where .class files are written + ;; + ;; The target is used to populate the classpath when launching + ;; the inferior scala repl. + ;; ------------------------------ + ;; :target "target/classes" + + ) + + + diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index b38401a4a6..e1227168b2 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -408,22 +408,6 @@ trait Actor extends Logging { */ def initTransactionalState {} - /** - * Use reply(..) to reply with a message to the original sender of the message currently - * being processed. - *

- * Throws an IllegalStateException if unable to determine what to reply to. - */ - def reply(message: Any) = self.reply(message) - - /** - * Use reply_?(..) to reply with a message to the original sender of the message currently - * being processed. - *

- * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - def reply_?(message: Any): Boolean = self.reply_?(message) - /** * Is the actor able to handle the message passed in as arguments? */ diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index d14c3af29a..2472ea924d 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -197,10 +197,13 @@ trait ActorRef extends TransactionManagement { */ protected[akka] val dispatcherLock = new ReentrantLock - protected[akka] var _sender: Option[ActorRef] = None - protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None - protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s } - protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf } + /** + * This is a reference to the message currently being processed by the actor + */ + protected[akka] var _currentMessage: Option[MessageInvocation] = None + + protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } + protected[akka] def currentMessage = guard.withGuard { _currentMessage } /** * Returns the uuid for the actor. @@ -211,13 +214,27 @@ trait ActorRef extends TransactionManagement { * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ - def sender: Option[ActorRef] = guard.withGuard { _sender } + def sender: Option[ActorRef] = { + //Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } + val msg = currentMessage + if(msg.isEmpty) + None + else + msg.get.sender + } /** * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture } + def senderFuture: Option[CompletableFuture[Any]] = { + //Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } + val msg = currentMessage + if(msg.isEmpty) + None + else + msg.get.senderFuture + } /** * Is the actor being restarted? @@ -404,13 +421,13 @@ trait ActorRef extends TransactionManagement { * Returns the home address and port for this actor. */ def homeAddress: InetSocketAddress = _homeAddress - + /** * Set the home address and port for this actor. */ def homeAddress_=(hostnameAndPort: Tuple2[String, Int]): Unit = homeAddress_=(new InetSocketAddress(hostnameAndPort._1, hostnameAndPort._2)) - + /** * Set the home address and port for this actor. */ @@ -531,7 +548,7 @@ trait ActorRef extends TransactionManagement { protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit protected[akka] def mailbox: Deque[MessageInvocation] - + protected[akka] def restart(reason: Throwable): Unit protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit @@ -609,8 +626,7 @@ sealed class LocalActorRef private[akka]( __format.asInstanceOf[SerializerBasedActorFormat[_]] .serializer .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] - else - actorClass.newInstance.asInstanceOf[Actor] + else actorClass.newInstance.asInstanceOf[Actor] }) loader = Some(__loader) isDeserialized = true @@ -993,14 +1009,15 @@ sealed class LocalActorRef private[akka]( Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) return } - sender = messageHandle.sender - senderFuture = messageHandle.senderFuture + currentMessage = Option(messageHandle) try { dispatch(messageHandle) } catch { case e => Actor.log.error(e, "Could not invoke actor [%s]", this) throw e + } finally { + currentMessage = None //TODO: Don't reset this, we might want to resend the message } } @@ -1058,20 +1075,22 @@ sealed class LocalActorRef private[akka]( protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { - if (faultHandler.isDefined) { - faultHandler.get match { - // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy - case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => - restartLinkedActors(reason) + faultHandler match { + // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy + case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) => + restartLinkedActors(reason) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => - dead.restart(reason) - } - } else throw new IllegalActorStateException( - "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) + case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) => + dead.restart(reason) + + case None => + throw new IllegalActorStateException( + "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + + "\n\tto non-empty list of exception classes - can't proceed " + toString) + } } else { - _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on + if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle + _supervisor.foreach(_ ! Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on } } diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index fea1b3b01c..c568c8de03 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor import scala.collection.mutable.ListBuffer import scala.reflect.Manifest -import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap} +import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set=>JSet} import se.scalablesolutions.akka.util.ListenerManagement @@ -29,6 +29,11 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends ListenerManagement { + + private val refComparator = new java.util.Comparator[ActorRef]{ + def compare(a: ActorRef,b: ActorRef) = a.uuid.compareTo(b.uuid) + } + private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]] @@ -36,9 +41,7 @@ object ActorRegistry extends ListenerManagement { /** * Returns all actors in the system. */ - def actors: List[ActorRef] = { - filter(_=> true) - } + def actors: List[ActorRef] = filter(_ => true) /** * Invokes a function for all actors. @@ -52,7 +55,7 @@ object ActorRegistry extends ListenerManagement { * Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message. */ def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): List[ActorRef] = - filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message)) + filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message)) /** * Finds all actors that satisfy a predicate. @@ -73,7 +76,7 @@ object ActorRegistry extends ListenerManagement { * Finds all actors that are subtypes of the class passed in as the Manifest argument. */ def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = - filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass)) + filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass)) /** * Finds any actor that matches T. @@ -119,7 +122,7 @@ object ActorRegistry extends ListenerManagement { if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) if (actorsById.containsKey(id)) actorsById.get(id).add(actor) else { - val set = new CopyOnWriteArraySet[ActorRef] + val set = new ConcurrentSkipListSet[ActorRef](refComparator) set.add(actor) actorsById.put(id, set) } @@ -128,7 +131,7 @@ object ActorRegistry extends ListenerManagement { val className = actor.actor.getClass.getName if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor) else { - val set = new CopyOnWriteArraySet[ActorRef] + val set = new ConcurrentSkipListSet[ActorRef](refComparator) set.add(actor) actorsByClassName.put(className, set) } diff --git a/akka-core/src/test/scala/SupervisorHierarchySpec.scala b/akka-core/src/test/scala/SupervisorHierarchySpec.scala new file mode 100644 index 0000000000..75751e3d58 --- /dev/null +++ b/akka-core/src/test/scala/SupervisorHierarchySpec.scala @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import java.lang.Throwable +import Actor._ +import se.scalablesolutions.akka.config.OneForOneStrategy +import java.util.concurrent.{TimeUnit, CountDownLatch} + +class SupervisorHierarchySpec extends JUnitSuite { + + @Test + def killWorkerShouldRestartMangerAndOtherWorkers = { + val countDown = new CountDownLatch(4) + + val workerOne = actorOf(new CountDownActor(countDown)) + val workerTwo = actorOf(new CountDownActor(countDown)) + val workerThree = actorOf(new CountDownActor( countDown)) + + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 1000)) + + protected def receive = { case _ => () } + }).start + + val manager = actorOf(new CountDownActor(countDown)) + boss.startLink(manager) + + manager.startLink(workerOne) + manager.startLink(workerTwo) + manager.startLink(workerThree) + + workerOne ! Exit(workerOne, new RuntimeException("Fire the worker!")) + + // manager + all workers should be restarted by only killing a worker + // manager doesn't trap exits, so boss will restart manager + + assert(countDown.await(4, TimeUnit.SECONDS)) + } + + class CountDownActor(countDown: CountDownLatch) extends Actor { + + protected def receive = { case _ => () } + + override def postRestart(reason: Throwable) = countDown.countDown + } +} + diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala index 387b1bcd13..f9e456aeae 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala @@ -83,9 +83,7 @@ private[akka] object CassandraStorageBackend extends if (column.isDefined) Some(column.get.getColumn.value) else None } catch { - case e => - log.info("Could not retreive Ref from storage") - None + case e => None } } @@ -195,9 +193,7 @@ private[akka] object CassandraStorageBackend extends if (column.isDefined) Some(column.get.getColumn.value) else None } catch { - case e => - log.info("Could not retreive Map from storage") - None + case e => None } } diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar new file mode 100644 index 0000000000..351ff49c9d Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC7-1.4/redisclient-2.8.0.RC7-1.4.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC7-1.4/redisclient-2.8.0.RC7-1.4.jar new file mode 100644 index 0000000000..d25fcfdccf Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0.RC7-1.4/redisclient-2.8.0.RC7-1.4.jar differ diff --git a/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.RC7/sjson-0.7-SNAPSHOT-2.8.RC7.jar b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.RC7/sjson-0.7-SNAPSHOT-2.8.RC7.jar new file mode 100644 index 0000000000..6c77d359b4 Binary files /dev/null and b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.RC7/sjson-0.7-SNAPSHOT-2.8.RC7.jar differ diff --git a/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.RC7/sjson-0.7-SNAPSHOT-2.8.RC7.pom b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.RC7/sjson-0.7-SNAPSHOT-2.8.RC7.pom new file mode 100644 index 0000000000..7c02578f5f --- /dev/null +++ b/embedded-repo/sjson/json/sjson/0.7-SNAPSHOT-2.8.RC7/sjson-0.7-SNAPSHOT-2.8.RC7.pom @@ -0,0 +1,9 @@ + + + 4.0.0 + sjson.json + sjson + 0.7-SNAPSHOT-2.8.RC7 + POM was created from install:install-file + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index d41f453260..ee80e6dd98 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -248,7 +248,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile" + val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4" % "compile" val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil }