diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala index 3395b6ac59..f6b45bcf53 100644 --- a/akka-actors/src/main/scala/actor/ActiveObject.scala +++ b/akka-actors/src/main/scala/actor/ActiveObject.scala @@ -161,7 +161,6 @@ private[akka] sealed case class AspectInit( */ @Aspect("perInstance") private[akka] sealed class ActiveObjectAspect { - import Actor._ @volatile var isInitialized = false var target: Class[_] = _ @@ -188,6 +187,7 @@ private[akka] sealed class ActiveObjectAspect { } private def localDispatch(joinPoint: JoinPoint): AnyRef = { + import Actor.Sender.Self val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] if (isOneWay(rtti)) actor ! Invocation(joinPoint, true, true) else { diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 3616868cbe..ea2c64d880 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -70,7 +70,18 @@ object Actor { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) - implicit val any: AnyRef = this + object Sender extends Actor { + implicit val Self: AnyRef = this + def receive = { + case unknown => + log.error( + "Actor.Sender can't process messages. Received message [%s]." + + "This error could occur if you either:" + + "\n\t- Explicitly send a message to the Actor.Sender object." + + "\n\t- Invoking the 'reply(..)' method or sending a message to the 'sender' reference " + + "\n\t when you have sent the original request from a instance *not* being an actor.", unknown) + } + } /** * Use to create an anonymous event-driven actor. @@ -165,7 +176,7 @@ object Actor { trait Actor extends Logging with TransactionManagement { ActorRegistry.register(this) - implicit val self: AnyRef = this + implicit protected val self: Actor = this // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait private[akka] var _uuid = Uuid.newUuid.toString @@ -398,12 +409,30 @@ trait Actor extends Logging with TransactionManagement { /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. *
+ * * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. + * + * * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable. - * - * If invoked from within another object then add this import to resolve the implicit argument: *- * import se.scalablesolutions.akka.actor.Actor._ + * actor ! message + *+ * + * + * If invoked from within a *non* Actor instance then either add this import to resolve the implicit argument: + *
+ * import Actor.Sender._ + * actor ! message + *+ * + * Or pass in the implicit argument explicitly: + *
+ * actor.!(message)(this) + *+ * + * Or use the 'send(..)' method; + *
+ * actor.send(message)
*
*/
def !(message: AnyRef)(implicit sender: AnyRef) = {
diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala
index 7851286902..658e400a65 100644
--- a/akka-actors/src/main/scala/nio/RemoteClient.scala
+++ b/akka-actors/src/main/scala/nio/RemoteClient.scala
@@ -156,6 +156,7 @@ class RemoteClientHandler(val name: String,
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap)
extends SimpleChannelUpstreamHandler with Logging {
+ import Actor.Sender.Self
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
@@ -166,7 +167,6 @@ class RemoteClientHandler(val name: String,
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
- import Actor._
try {
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index ba3338f39a..fd770bc6f5 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -139,12 +139,13 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
}
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
- import Actor._
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
actor.start
val message = RemoteProtocolBuilder.getMessage(request)
- if (request.getIsOneWay) actor ! message
+ if (request.getIsOneWay) {
+ actor.send(message)
+ }
else {
try {
val resultOrNone = actor !! message
diff --git a/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala b/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala
index 826f157593..2804e588d8 100644
--- a/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala
+++ b/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala
@@ -27,25 +27,27 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
@Test
def shouldReplyToBangMessageUsingReply = {
- import Actor._
+ import Actor.Sender.Self
+
val replyActor = new ReplyActor
replyActor.start
val senderActor = new SenderActor(replyActor)
senderActor.start
senderActor ! "Init"
- Thread.sleep(10000)
+ Thread.sleep(1000)
assert("Reply" === state.s)
}
@Test
def shouldReplyToBangMessageUsingImplicitSender = {
- import Actor._
+ import Actor.Sender.Self
+
val replyActor = new ReplyActor
replyActor.start
val senderActor = new SenderActor(replyActor)
senderActor.start
senderActor ! "InitImplicit"
- Thread.sleep(10000)
+ Thread.sleep(1000)
assert("ReplyImplicit" === state.s)
}
}
diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala
index 46661e3b97..e556a1a724 100644
--- a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala
+++ b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala
@@ -8,7 +8,8 @@ import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
class EventBasedSingleThreadActorTest extends JUnitSuite {
- import Actor._
+ import Actor.Sender.Self
+
private val unit = TimeUnit.MILLISECONDS
class TestActor extends Actor {
diff --git a/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala b/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala
index 2030942b7a..2d90145810 100644
--- a/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala
+++ b/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala
@@ -6,7 +6,8 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
class EventBasedThreadPoolActorTest extends JUnitSuite {
- import Actor._
+ import Actor.Sender.Self
+
private val unit = TimeUnit.MILLISECONDS
class TestActor extends Actor {
diff --git a/akka-actors/src/test/scala/RemoteActorTest.scala b/akka-actors/src/test/scala/RemoteActorTest.scala
index 191898269b..884a0d007f 100644
--- a/akka-actors/src/test/scala/RemoteActorTest.scala
+++ b/akka-actors/src/test/scala/RemoteActorTest.scala
@@ -27,7 +27,8 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
class RemoteActorTest extends JUnitSuite {
- import Actor._
+ import Actor.Sender.Self
+
akka.Config.config
new Thread(new Runnable() {
def run = {
diff --git a/akka-actors/src/test/scala/RemoteSupervisorTest.scala b/akka-actors/src/test/scala/RemoteSupervisorTest.scala
index 9405af6dc9..ac6f1777ab 100644
--- a/akka-actors/src/test/scala/RemoteSupervisorTest.scala
+++ b/akka-actors/src/test/scala/RemoteSupervisorTest.scala
@@ -20,7 +20,8 @@ object Log {
* @author Jonas Bonér
*/
class RemoteSupervisorTest extends JUnitSuite {
- import Actor._
+ import Actor.Sender.Self
+
akka.Config.config
new Thread(new Runnable() {
def run = {
diff --git a/akka-actors/src/test/scala/SupervisorTest.scala b/akka-actors/src/test/scala/SupervisorTest.scala
index df59ee832c..b143ef21e7 100644
--- a/akka-actors/src/test/scala/SupervisorTest.scala
+++ b/akka-actors/src/test/scala/SupervisorTest.scala
@@ -13,7 +13,8 @@ import org.junit.Test
* @author Jonas Bonér
*/
class SupervisorTest extends JUnitSuite {
- import Actor._
+ import Actor.Sender.Self
+
var messageLog: String = ""
var oneWayLog: String = ""
diff --git a/akka-actors/src/test/scala/ThreadBasedActorTest.scala b/akka-actors/src/test/scala/ThreadBasedActorTest.scala
index 335a6f461b..ead74068d1 100644
--- a/akka-actors/src/test/scala/ThreadBasedActorTest.scala
+++ b/akka-actors/src/test/scala/ThreadBasedActorTest.scala
@@ -8,7 +8,8 @@ import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
class ThreadBasedActorTest extends JUnitSuite {
- import Actor._
+ import Actor.Sender.Self
+
private val unit = TimeUnit.MILLISECONDS
class TestActor extends Actor {
diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala
index 9f591f9891..af56bfc8a1 100644
--- a/akka-amqp/src/main/scala/AMQP.scala
+++ b/akka-amqp/src/main/scala/AMQP.scala
@@ -31,10 +31,8 @@ import java.io.IOException
* val consumer = AMQP.newConsumer(params, hostname, port, exchange, ExchangeType.Direct, Serializer.ScalaJSON, None, 100)
*
- * consumer ! MessageConsumerListener(queue, routingKey, new Actor() {
- * def receive = {
- * case Message(payload, _, _, _, _) => log.debug("Received message: %s", payload)
- * }
+ * consumer ! MessageConsumerListener(queue, routingKey, actor {
+ * case Message(payload, _, _, _, _) => log.debug("Received message: %s", payload)
* })
*
* val producer = AMQP.newProducer(params, hostname, port, exchange, Serializer.ScalaJSON, None, None, 100)
@@ -43,15 +41,113 @@ import java.io.IOException
*
* @author Jonas Bonér
*/
-object AMQP extends Actor {
- private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor]
- faultHandler = Some(OneForOneStrategy(5, 5000))
- trapExit = List(classOf[Throwable])
- start
+object AMQP {
+ private val supervisor = new AMQPSupervisor
+
+ def newProducer(
+ config: ConnectionParameters,
+ hostname: String,
+ port: Int,
+ exchangeName: String,
+ returnListener: Option[ReturnListener],
+ shutdownListener: Option[ShutdownListener],
+ initReconnectDelay: Long) =
+ supervisor.newProducer(
+ config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay)
+
+ def newConsumer(
+ config: ConnectionParameters,
+ hostname: String,
+ port: Int,
+ exchangeName: String,
+ exchangeType: ExchangeType,
+ shutdownListener: Option[ShutdownListener],
+ initReconnectDelay: Long,
+ passive: Boolean,
+ durable: Boolean,
+ configurationArguments: Map[String, AnyRef]) =
+ supervisor.newConsumer(
+ config, hostname, port, exchangeName, exchangeType,
+ shutdownListener, initReconnectDelay, passive, durable, configurationArguments)
+
+ def stopConnection(connection: FaultTolerantConnectionActor) = supervisor.stopConnection(connection)
+
+ /**
+ * @author Jonas Bonér
+ */
+ class AMQPSupervisor extends Actor {
+ private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor]
+
+ faultHandler = Some(OneForOneStrategy(5, 5000))
+ trapExit = List(classOf[Throwable])
+ start
+
+ def newProducer(
+ config: ConnectionParameters,
+ hostname: String,
+ port: Int,
+ exchangeName: String,
+ returnListener: Option[ReturnListener],
+ shutdownListener: Option[ShutdownListener],
+ initReconnectDelay: Long): Producer = {
+ val producer = new Producer(
+ new ConnectionFactory(config),
+ hostname, port,
+ exchangeName,
+ returnListener,
+ shutdownListener,
+ initReconnectDelay)
+ startLink(producer)
+ producer
+ }
+
+ def newConsumer(
+ config: ConnectionParameters,
+ hostname: String,
+ port: Int,
+ exchangeName: String,
+ exchangeType: ExchangeType,
+ shutdownListener: Option[ShutdownListener],
+ initReconnectDelay: Long,
+ passive: Boolean,
+ durable: Boolean,
+ configurationArguments: Map[String, AnyRef]): Consumer = {
+ val consumer = new Consumer(
+ new ConnectionFactory(config),
+ hostname, port,
+ exchangeName,
+ exchangeType,
+ shutdownListener,
+ initReconnectDelay,
+ passive,
+ durable,
+ configurationArguments)
+ startLink(consumer)
+ consumer
+ }
+
+ def stopConnection(connection: FaultTolerantConnectionActor) = {
+ connection ! Stop
+ unlink(connection)
+ connections.remove(connection)
+ }
+
+ override def shutdown = {
+ connections.values.asScala.foreach(_ ! Stop)
+ exit
+ }
+
+ def receive = {
+ case _ => {} // ignore all messages
+ }
+ }
sealed trait AMQPMessage
private[akka] trait InternalAMQPMessage extends AMQPMessage
+ /**
+ * @author Jonas Bonér
+ */
class Message(val payload: Array[Byte],
val routingKey: String,
val mandatory: Boolean,
@@ -59,12 +155,15 @@ object AMQP extends Actor {
val properties: RabbitMQ.BasicProperties) extends AMQPMessage {
override def toString(): String =
"Message[payload=" + payload +
- ", routingKey=" + routingKey +
- ", mandatory=" + mandatory +
- ", immediate=" + immediate +
- ", properties=" + properties + "]"
+ ", routingKey=" + routingKey +
+ ", mandatory=" + mandatory +
+ ", immediate=" + immediate +
+ ", properties=" + properties + "]"
}
+ /**
+ * @author Jonas Bonér
+ */
object Message {
def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] =
Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties))
@@ -76,28 +175,31 @@ object AMQP extends Actor {
new Message(payload, routingKey, false, false, null)
}
- case class MessageConsumerListener(queueName: String,
- routingKey: String,
- isUsingExistingQueue: Boolean,
+ /**
+ * @author Jonas Bonér
+ */
+ case class MessageConsumerListener(queueName: String,
+ routingKey: String,
+ isUsingExistingQueue: Boolean,
actor: Actor) extends AMQPMessage {
- def this(queueName: String, routingKey: String, actor: Actor) = this(queueName, routingKey, false, actor)
-
+ def this(queueName: String, routingKey: String, actor: Actor) = this (queueName, routingKey, false, actor)
+
private[akka] var tag: Option[String] = None
- override def toString() =
+ override def toString() =
"MessageConsumerListener[actor=" + actor +
- ", queue=" + queueName +
- ", routingKey=" + routingKey +
- ", tag=" + tag +
- ", isUsingExistingQueue=" + isUsingExistingQueue + "]"
+ ", queue=" + queueName +
+ ", routingKey=" + routingKey +
+ ", tag=" + tag +
+ ", isUsingExistingQueue=" + isUsingExistingQueue + "]"
- def toString(exchangeName: String) =
+ def toString(exchangeName: String) =
"MessageConsumerListener[actor=" + actor +
- ", exchange=" + exchangeName +
- ", queue=" + queueName +
- ", routingKey=" + routingKey +
- ", tag=" + tag +
- ", isUsingExistingQueue=" + isUsingExistingQueue + "]"
+ ", exchange=" + exchangeName +
+ ", queue=" + queueName +
+ ", routingKey=" + routingKey +
+ ", tag=" + tag +
+ ", isUsingExistingQueue=" + isUsingExistingQueue + "]"
/**
* Hash code should only be based on on queue name and routing key.
@@ -114,31 +216,32 @@ object AMQP extends Actor {
*/
override def equals(that: Any): Boolean = synchronized {
that != null &&
- that.isInstanceOf[MessageConsumerListener] &&
- that.asInstanceOf[MessageConsumerListener].queueName== queueName &&
- that.asInstanceOf[MessageConsumerListener].routingKey == routingKey
+ that.isInstanceOf[MessageConsumerListener] &&
+ that.asInstanceOf[MessageConsumerListener].queueName == queueName &&
+ that.asInstanceOf[MessageConsumerListener].routingKey == routingKey
}
}
object MessageConsumerListener {
- def apply(queueName: String, routingKey: String, actor: Actor) = new MessageConsumerListener(queueName, routingKey, false, actor)
+ def apply(queueName: String, routingKey: String, actor: Actor) =
+ new MessageConsumerListener(queueName, routingKey, false, actor)
}
-
+
case object Stop extends AMQPMessage
-
+
private[akka] case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage
-
+
private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage
-
+
private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage
-
+
private[akka] class MessageNotDeliveredException(
- val message: String,
- val replyCode: Int,
- val replyText: String,
- val exchange: String,
- val routingKey: String,
- val properties: RabbitMQ.BasicProperties,
- val body: Array[Byte]) extends RuntimeException(message)
+ val message: String,
+ val replyCode: Int,
+ val replyText: String,
+ val exchange: String,
+ val routingKey: String,
+ val properties: RabbitMQ.BasicProperties,
+ val body: Array[Byte]) extends RuntimeException(message)
sealed trait ExchangeType
object ExchangeType {
@@ -156,84 +259,29 @@ object AMQP extends Actor {
}
}
- def newProducer(
- config: ConnectionParameters,
- hostname: String,
- port: Int,
- exchangeName: String,
- returnListener: Option[ReturnListener],
- shutdownListener: Option[ShutdownListener],
- initReconnectDelay: Long): Producer = {
- val producer = new Producer(
- new ConnectionFactory(config),
- hostname, port,
- exchangeName,
- returnListener,
- shutdownListener,
- initReconnectDelay)
- startLink(producer)
- producer
- }
-
- def newConsumer(
- config: ConnectionParameters,
- hostname: String,
- port: Int,
- exchangeName: String,
- exchangeType: ExchangeType,
- shutdownListener: Option[ShutdownListener],
- initReconnectDelay: Long,
- passive: Boolean,
- durable: Boolean,
- configurationArguments: Map[String, AnyRef]): Consumer = {
- val consumer = new Consumer(
- new ConnectionFactory(config),
- hostname, port,
- exchangeName,
- exchangeType,
- shutdownListener,
- initReconnectDelay,
- passive,
- durable,
- configurationArguments)
- startLink(consumer)
- consumer
- }
-
- def stopConnection(connection: FaultTolerantConnectionActor) = {
- connection ! Stop
- unlink(connection)
- connections.remove(connection)
- }
-
- override def shutdown = {
- connections.values.asScala.foreach(_ ! Stop)
- exit
- }
-
/**
* @author Jonas Bonér
*/
- class Producer private[amqp] (
- val connectionFactory: ConnectionFactory,
- val hostname: String,
- val port: Int,
- val exchangeName: String,
- val returnListener: Option[ReturnListener],
- val shutdownListener: Option[ShutdownListener],
- val initReconnectDelay: Long)
- extends FaultTolerantConnectionActor {
-
+ class Producer private[amqp](
+ val connectionFactory: ConnectionFactory,
+ val hostname: String,
+ val port: Int,
+ val exchangeName: String,
+ val returnListener: Option[ReturnListener],
+ val shutdownListener: Option[ShutdownListener],
+ val initReconnectDelay: Long)
+ extends FaultTolerantConnectionActor {
setupChannel
log.info("AMQP.Producer [%s] is started", toString)
def newRpcClient(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey)
-
+
def receive = {
- case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
+ case message@Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
- channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]])
+ channel.basicPublish(
+ exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]])
case Stop =>
disconnect
exit
@@ -246,18 +294,18 @@ object AMQP extends Actor {
case Some(listener) => channel.setReturnListener(listener)
case None => channel.setReturnListener(new ReturnListener() {
def handleBasicReturn(
- replyCode: Int,
- replyText: String,
- exchange: String,
- routingKey: String,
- properties: RabbitMQ.BasicProperties,
- body: Array[Byte]) = {
+ replyCode: Int,
+ replyText: String,
+ exchange: String,
+ routingKey: String,
+ properties: RabbitMQ.BasicProperties,
+ body: Array[Byte]) = {
throw new MessageNotDeliveredException(
"Could not deliver message [" + body +
- "] with reply code [" + replyCode +
- "] with reply text [" + replyText +
- "] and routing key [" + routingKey +
- "] to exchange [" + exchange + "]",
+ "] with reply code [" + replyCode +
+ "] with reply text [" + replyText +
+ "] and routing key [" + routingKey +
+ "] to exchange [" + exchange + "]",
replyCode, replyText, exchange, routingKey, properties, body)
}
})
@@ -267,25 +315,26 @@ object AMQP extends Actor {
override def toString(): String =
"AMQP.Producer[hostname=" + hostname +
- ", port=" + port +
- ", exchange=" + exchangeName + "]"
+ ", port=" + port +
+ ", exchange=" + exchangeName + "]"
}
/**
* @author Jonas Bonér
*/
- class Consumer private[amqp] (
- val connectionFactory: ConnectionFactory,
- val hostname: String,
- val port: Int,
- val exchangeName: String,
- val exchangeType: ExchangeType,
- val shutdownListener: Option[ShutdownListener],
- val initReconnectDelay: Long,
- val passive: Boolean,
- val durable: Boolean,
- val configurationArguments: Map[java.lang.String, Object])
- extends FaultTolerantConnectionActor { consumer: Consumer =>
+ class Consumer private[amqp](
+ val connectionFactory: ConnectionFactory,
+ val hostname: String,
+ val port: Int,
+ val exchangeName: String,
+ val exchangeType: ExchangeType,
+ val shutdownListener: Option[ShutdownListener],
+ val initReconnectDelay: Long,
+ val passive: Boolean,
+ val durable: Boolean,
+ val configurationArguments: Map[java.lang.String, Object])
+ extends FaultTolerantConnectionActor {
+ consumer: Consumer =>
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
@@ -302,7 +351,7 @@ object AMQP extends Actor {
body(requestBody, replyProperties)
}
}
- }
+ }
def receive = {
case listener: MessageConsumerListener =>
@@ -312,32 +361,34 @@ object AMQP extends Actor {
case UnregisterMessageConsumerListener(listener) =>
unregisterListener(listener)
-
- case Reconnect(delay) =>
+
+ case Reconnect(delay) =>
reconnect(delay)
- case Failure(cause) =>
+ case Failure(cause) =>
log.error(cause, "")
throw cause
- case Stop =>
+ case Stop =>
listeners.elements.toList.map(_._2).foreach(unregisterListener(_))
disconnect
exit
- case message: Message =>
- handleIllegalMessage("AMQP.Consumer [" + this + "] can't be used to send messages, ignoring message [" + message + "]")
+ case message: Message =>
+ handleIllegalMessage(
+ "AMQP.Consumer [" + this + "] can't be used to send messages, ignoring message [" + message + "]")
- case unknown =>
- handleIllegalMessage("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
+ case unknown =>
+ handleIllegalMessage(
+ "Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
}
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString,
- passive, durable,
- configurationArguments.asJava)
+ passive, durable,
+ configurationArguments.asJava)
listeners.elements.toList.map(_._2).foreach(registerListener)
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
@@ -352,12 +403,12 @@ object AMQP extends Actor {
}
log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName)
- channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
+ channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
val listenerTag = channel.basicConsume(listener.queueName, true, new DefaultConsumer(channel) with Logging {
- override def handleDelivery(tag: String,
- envelope: Envelope,
- properties: RabbitMQ.BasicProperties,
+ override def handleDelivery(tag: String,
+ envelope: Envelope,
+ properties: RabbitMQ.BasicProperties,
payload: Array[Byte]) {
try {
val mandatory = false // FIXME: where to find out if it's mandatory?
@@ -368,22 +419,27 @@ object AMQP extends Actor {
log.debug("Acking message with delivery tag [%s]", deliveryTag)
channel.basicAck(deliveryTag, false)
} catch {
- case cause =>
- log.error("Delivery of message to MessageConsumerListener [%s] failed due to [%s]", listener.toString(exchangeName), cause.toString)
+ case cause =>
+ log.error(
+ "Delivery of message to MessageConsumerListener [%s] failed due to [%s]",
+ listener.toString(exchangeName), cause.toString)
consumer ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
}
}
override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = {
def hasTag(listener: MessageConsumerListener, listenerTag: String): Boolean = {
- if (listener.tag.isEmpty) throw new IllegalStateException("MessageConsumerListener [" + listener + "] does not have a tag")
+ if (listener.tag.isEmpty) throw new IllegalStateException(
+ "MessageConsumerListener [" + listener + "] does not have a tag")
listener.tag.get == listenerTag
}
listeners.elements.toList.map(_._2).find(hasTag(_, listenerTag)) match {
- case None => log.error("Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
+ case None => log.error(
+ "Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
case Some(listener) =>
- log.warning("MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]",
- listener.toString(exchangeName), signal.getReference, signal.getReason)
+ log.warning(
+ "MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]",
+ listener.toString(exchangeName), signal.getReference, signal.getReason)
consumer ! UnregisterMessageConsumerListener(listener)
}
}
@@ -393,11 +449,15 @@ object AMQP extends Actor {
private def unregisterListener(listener: MessageConsumerListener) = {
listeners.get(listener) match {
- case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName))
+ case None => log.warning(
+ "Can't unregister message consumer listener [%s]; no such listener",
+ listener.toString(exchangeName))
case Some(listener) =>
listeners - listener
listener.tag match {
- case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName))
+ case None => log.warning(
+ "Can't unregister message consumer listener [%s]; no listener tag",
+ listener.toString(exchangeName))
case Some(tag) =>
channel.basicCancel(tag)
unlink(listener.actor)
@@ -406,21 +466,24 @@ object AMQP extends Actor {
}
}
}
-
+
private def handleIllegalMessage(errorMessage: String) = {
log.error(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
-
+
override def toString(): String =
"AMQP.Consumer[hostname=" + hostname +
- ", port=" + port +
- ", exchange=" + exchangeName +
- ", type=" + exchangeType +
- ", passive=" + passive +
- ", durable=" + durable + "]"
+ ", port=" + port +
+ ", exchange=" + exchangeName +
+ ", type=" + exchangeType +
+ ", passive=" + passive +
+ ", durable=" + durable + "]"
}
+ /**
+ * @author Jonas Bonér
+ */
trait FaultTolerantConnectionActor extends Actor {
val reconnectionTimer = new Timer
@@ -435,29 +498,32 @@ object AMQP extends Actor {
protected def setupChannel
- def createQueue: String = channel.queueDeclare("", false, false, true, true, null).getQueue
+ def createQueue: String =
+ channel.queueDeclare("", false, false, true, true, null).getQueue
- def createQueue(name: String) = channel.queueDeclare(name, false, false, true, true, null).getQueue
+ def createQueue(name: String) =
+ channel.queueDeclare(name, false, false, true, true, null).getQueue
- def createQueue(name: String, durable: Boolean) = channel.queueDeclare(name, false, durable, true, true, null).getQueue
+ def createQueue(name: String, durable: Boolean) =
+ channel.queueDeclare(name, false, durable, true, true, null).getQueue
- def createBindQueue: String = {
+ def createBindQueue: String = {
val name = createQueue
channel.queueBind(name, exchangeName, name)
name
}
- def createBindQueue(name: String) {
+ def createBindQueue(name: String) {
createQueue(name)
channel.queueBind(name, exchangeName, name)
}
- def createBindQueue(name: String, durable: Boolean) {
+ def createBindQueue(name: String, durable: Boolean) {
channel.queueDeclare(name, durable)
channel.queueBind(name, exchangeName, name)
}
- def deleteQueue(name: String) { channel.queueDelete(name) }
+ def deleteQueue(name: String) {channel.queueDelete(name)}
protected def disconnect = {
try {
@@ -492,10 +558,7 @@ object AMQP extends Actor {
}
override def preRestart(reason: AnyRef, config: Option[AnyRef]) = disconnect
+
override def postRestart(reason: AnyRef, config: Option[AnyRef]) = reconnect(initReconnectDelay)
}
-
- def receive = {
- case _ => {} // ignore all messages
- }
}
diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala
index 574b825470..158dbe46d0 100644
--- a/akka-amqp/src/main/scala/ExampleSession.scala
+++ b/akka-amqp/src/main/scala/ExampleSession.scala
@@ -5,6 +5,7 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.actor.Actor.Sender.Self
import com.rabbitmq.client.ConnectionParameters
diff --git a/akka-persistence/src/test/scala/AllTest.scala b/akka-persistence/src/test/scala/AllTest.scala
index 60374da92d..2e9bc78178 100644
--- a/akka-persistence/src/test/scala/AllTest.scala
+++ b/akka-persistence/src/test/scala/AllTest.scala
@@ -1,6 +1,6 @@
package se.scalablesolutions.akka
-import akka.state.{MongoStorageSpec, MongoPersistentActorSpec, CassandraPersistentActorSpec}
+import se.scalablesolutions.akka.state.{MongoStorageSpec, MongoPersistentActorSpec, CassandraPersistentActorSpec}
import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
diff --git a/changes.xml b/changes.xml
index 203bd588ac..658b8ab550 100644
--- a/changes.xml
+++ b/changes.xml
@@ -18,10 +18,12 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui