Merge branch 'master' into wip_141_SSL_enable_remote_actors

This commit is contained in:
Viktor Klang 2010-07-14 22:38:20 +02:00
commit 2f2d16a34f
29 changed files with 529 additions and 148 deletions

View file

@ -6,11 +6,11 @@ package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import java.lang.IllegalArgumentException
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.serialization.Serializer
/**
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
@ -31,20 +31,23 @@ object AMQP {
connectionCallback: Option[ActorRef] = None)
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
shutdownListener: Option[ShutdownListener] = None,
configurationArguments: Map[String, AnyRef] = Map(),
channelCallback: Option[ActorRef] = None)
configurationArguments: Map[String, AnyRef] = Map())
case class ProducerParameters(channelParameters: ChannelParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None)
case class ProducerParameters(exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
case class ConsumerParameters(channelParameters: ChannelParameters,
case class ConsumerParameters(exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
@ -52,7 +55,9 @@ object AMQP {
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true) {
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
@ -79,6 +84,33 @@ object AMQP {
consumer
}
def newRpcClient(connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
inSerializer: Serializer,
outSerializer: Serializer,
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
}
def newRpcServer(connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
inSerializer: Serializer,
outSerializer: Serializer,
requestHandler: PartialFunction[AnyRef, AnyRef],
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false))
}
private val supervisor = new AMQPSupervisor
class AMQPSupervisor extends Logging {

View file

@ -12,9 +12,11 @@ import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer}
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.Throwable
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) extends FaultTolerantChannelActor(consumerParameters.channelParameters) {
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
extends FaultTolerantChannelActor(consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
import consumerParameters._
import channelParameters._
import exchangeParameters._
var listenerTag: Option[String] = None

View file

@ -6,8 +6,10 @@ package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import Actor._
import se.scalablesolutions.akka.amqp.AMQP.{ConnectionParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.serialization.Serializer
import java.lang.Class
object ExampleSession {
def main(args: Array[String]) = {
@ -31,6 +33,11 @@ object ExampleSession {
TimeUnit.SECONDS.sleep(2)
println("==== RPC ===")
rpc
TimeUnit.SECONDS.sleep(2)
ActorRegistry.shutdownAll
System.exit(0)
}
@ -40,13 +47,13 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct)
val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct)
val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "some.routing", actor {
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "some.routing", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing")
}
@ -55,17 +62,17 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val channelParameters = ChannelParameters("my_fanout_exchange", ExchangeType.Fanout)
val exchangeParameters = ExchangeParameters("my_fanout_exchange", ExchangeType.Fanout)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor {
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor {
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
}
@ -74,17 +81,17 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val channelParameters = ChannelParameters("my_topic_exchange", ExchangeType.Topic)
val exchangeParameters = ExchangeParameters("my_topic_exchange", ExchangeType.Topic)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor {
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor {
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush")
producer ! Message("@jonas_boner: Yes I can!".getBytes, "@barack_obama")
}
@ -107,16 +114,38 @@ object ExampleSession {
case Restarting => // not used, sent when channel or connection fails and initiates a restart
case Stopped => log.info("Channel callback: Stopped")
}
val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct, channelCallback = Some(channelCallback))
val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "callback.routing", actor {
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor {
case _ => () // not used
}))
}, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
// Wait until both channels (producer & consumer) are started before stopping the connection
channelCountdown.await(2, TimeUnit.SECONDS)
connection.stop
}
def rpc = {
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic)
val stringSerializer = new Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
}
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, {
case "rpc_request" => "rpc_response"
case _ => error("unknown request")
})
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer)
val response = (rpcClient !! "rpc_request")
log.info("Response: " + response)
}
}

View file

@ -8,12 +8,14 @@ import collection.JavaConversions
import java.lang.Throwable
import se.scalablesolutions.akka.actor.Actor
import Actor._
import se.scalablesolutions.akka.amqp.AMQP.ChannelParameters
import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener}
import scala.PartialFunction
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters}
abstract private[amqp] class FaultTolerantChannelActor(channelParameters: ChannelParameters) extends Actor {
import channelParameters._
abstract private[amqp] class FaultTolerantChannelActor(
exchangeParameters: ExchangeParameters, channelParameters: Option[ChannelParameters]) extends Actor {
import exchangeParameters._
protected[amqp] var channel: Option[Channel] = None
log.info("%s is started", toString)
@ -62,20 +64,20 @@ abstract private[amqp] class FaultTolerantChannelActor(channelParameters: Channe
protected def setupChannel(ch: Channel)
private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) {
log.info("Exchange declare")
if (exchangePassive) {
ch.exchangeDeclarePassive(exchangeName)
} else {
ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments))
if (exchangeName != "") {
if (exchangePassive) {
ch.exchangeDeclarePassive(exchangeName)
} else {
ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments))
}
}
ch.addShutdownListener(new ShutdownListener {
def shutdownCompleted(cause: ShutdownSignalException) = {
self ! ChannelShutdown(cause)
}
})
shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl))
channelParameters.foreach(_.shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl)))
log.info("shutdown listener added")
setupChannel(ch)
channel = Some(ch)
notifyCallback(Started)
@ -93,7 +95,7 @@ abstract private[amqp] class FaultTolerantChannelActor(channelParameters: Channe
}
private def notifyCallback(message: AMQPMessage) = {
channelCallback.foreach(cb => if (cb.isRunning) cb ! message)
channelParameters.foreach(_.channelCallback.foreach(cb => if (cb.isRunning) cb ! message))
}
override def preRestart(reason: Throwable) = {

View file

@ -6,18 +6,21 @@ package se.scalablesolutions.akka.amqp
import java.util.{TimerTask, Timer}
import java.io.IOException
import se.scalablesolutions.akka.util.Logging
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
import se.scalablesolutions.akka.config.OneForOneStrategy
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor with Logging {
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor {
import connectionParameters._
self.id = "amqp-connection-%s".format(host)
self.lifeCycle = Some(LifeCycle(Permanent))
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
val reconnectionTimer = new Timer("%s-timer".format(self.id))
val connectionFactory: ConnectionFactory = new ConnectionFactory()
@ -39,7 +42,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
}
case None => {
log.warning("Unable to create new channel - no connection")
reply(None)
self.reply(None)
}
}
}

View file

@ -7,9 +7,11 @@ package se.scalablesolutions.akka.amqp
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters
private[amqp] class ProducerActor(producerParameters: ProducerParameters) extends FaultTolerantChannelActor(producerParameters.channelParameters) {
private[amqp] class ProducerActor(producerParameters: ProducerParameters)
extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) {
import producerParameters._
import channelParameters._
import exchangeParameters._
producerId.foreach(id => self.id = id)

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
import com.rabbitmq.client.{Channel, RpcClient}
class RpcClientActor(exchangeParameters: ExchangeParameters,
routingKey: String,
inSerializer: Serializer,
outSerializer: Serializer,
channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
import exchangeParameters._
var rpcClient: Option[RpcClient] = None
log.info("%s started", this)
def specificMessageHandler = {
case payload: AnyRef => {
rpcClient match {
case Some(client) =>
val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload))
self.reply(outSerializer.fromBinary(response, None))
case None => error("%s has no client to send messages with".format(this))
}
}
}
protected def setupChannel(ch: Channel) = {
rpcClient = Some(new RpcClient(ch, exchangeName, routingKey))
}
override def preRestart(reason: Throwable) = {
rpcClient = None
super.preRestart(reason)
}
override def toString(): String =
"AMQP.RpcClient[exchange=" +exchangeName +
", routingKey=" + routingKey+ "]"
}

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.serialization.Serializer
class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor {
log.info("%s started", this)
protected def receive = {
case Delivery(payload, _, tag, props, sender) => {
log.debug("%s handling delivery with tag %d", this, tag)
val request = inSerializer.fromBinary(payload, None)
val response: Array[Byte] = outSerializer.toBinary(requestHandler(request))
log.debug("%s sending reply to %s", this, props.getReplyTo)
val replyProps = new BasicProperties
replyProps.setCorrelationId(props.getCorrelationId)
producer ! new Message(response, props.getReplyTo, properties = Some(replyProps))
sender.foreach(_ ! Acknowledge(tag))
}
case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag)
}
override def toString(): String =
"AMQP.RpcServer[]"
}

View file

@ -17,8 +17,9 @@ import org.scalatest.matchers.MustMatchers
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def connectionAndRecovery = {
@Test
def connectionAndRecovery = if (AMQPTest.enabled) {
val connectedLatch = new StandardLatch
val reconnectingLatch = new StandardLatch
val reconnectedLatch = new StandardLatch

View file

@ -11,20 +11,20 @@ import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
import se.scalablesolutions.akka.amqp.AMQP._
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerChannelRecovery = {
@Test
def consumerChannelRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
val producer = AMQP.newProducer(connection, ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct)))
ExchangeParameters("text_exchange", ExchangeType.Direct)))
val consumerStartedLatch = new StandardLatch
val consumerRestartedLatch = new StandardLatch
@ -41,10 +41,11 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with
}
val payloadLatch = new StandardLatch
val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor {
val consumerExchangeParameters = ExchangeParameters("text_exchange", ExchangeType.Direct)
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerExchangeParameters, "non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}))
}, channelParameters = Some(consumerChannelParameters)))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch

View file

@ -11,15 +11,15 @@ import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
import se.scalablesolutions.akka.amqp.AMQP._
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerConnectionRecovery = {
@Test
def consumerConnectionRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
@ -37,8 +37,9 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
case Stopped => ()
}
val channelParameters = ChannelParameters(channelCallback = Some(producerChannelCallback))
val producer = AMQP.newProducer(connection, ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerChannelCallback))))
ExchangeParameters("text_exchange", ExchangeType.Direct), channelParameters = Some(channelParameters)))
producerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
@ -58,10 +59,11 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
val payloadLatch = new StandardLatch
val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor {
val consumerExchangeParameters = ExchangeParameters("text_exchange", ExchangeType.Direct)
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerExchangeParameters, "non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}))
}, channelParameters = Some(consumerChannelParameters)))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch

View file

@ -6,19 +6,19 @@ package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp._
import org.junit.{After, Test}
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef}
import org.junit.Test
import se.scalablesolutions.akka.actor.ActorRef
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessageManualAcknowledge = {
@Test
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
@ -27,19 +27,28 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit
case Restarting => ()
case Stopped => ()
}
val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback))
val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val failLatch = new StandardLatch
val acknowledgeLatch = new StandardLatch
var deliveryTagCheck: Long = -1
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "manual.ack.this", actor {
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.ack.this", actor {
case Delivery(payload, _, deliveryTag, _, sender) => {
deliveryTagCheck = deliveryTag
sender.foreach(_ ! Acknowledge(deliveryTag))
if (!failLatch.isOpen) {
failLatch.open
error("Make it fail!")
} else {
deliveryTagCheck = deliveryTag
sender.foreach(_ ! Acknowledge(deliveryTag))
}
}
case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
}, selfAcknowledging = false))
}, queueName = Some("self.ack.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "manual.ack.this")

View file

@ -8,16 +8,16 @@ import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessage = {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {
@ -28,14 +28,17 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging
case Stopped => ()
}
val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback))
val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val payloadLatch = new StandardLatch
val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "non.interesting.routing.key", actor {
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}))
}, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
val producer = AMQP.newProducer(connection,
ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters)))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "non.interesting.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)

View file

@ -7,19 +7,18 @@ package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import junit.framework.Assert
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters}
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerChannelRecovery = {
@Test
def producerChannelRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
@ -40,8 +39,9 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with
case Stopped => ()
})
val channelParameters = ChannelParameters(channelCallback = Some(producerCallback))
val producerParameters = ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback)))
ExchangeParameters("text_exchange", ExchangeType.Direct), channelParameters = Some(channelParameters))
val producer = AMQP.newProducer(connection, producerParameters)
startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)

View file

@ -12,13 +12,13 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters}
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerConnectionRecovery = {
@Test
def producerConnectionRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
@ -38,8 +38,9 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
case Stopped => ()
})
val channelParameters = ChannelParameters(channelCallback = Some(producerCallback))
val producerParameters = ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback)))
ExchangeParameters("text_exchange", ExchangeType.Direct), channelParameters = Some(channelParameters))
val producer = AMQP.newProducer(connection, producerParameters)
startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)

View file

@ -14,13 +14,13 @@ import se.scalablesolutions.akka.amqp._
import com.rabbitmq.client.ReturnListener
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.String
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters}
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters}
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerMessage = {
@Test
def producerMessage = if (AMQPTest.enabled) {
val connection: ActorRef = AMQP.newConnection()
try {
@ -31,8 +31,7 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging
}
}
val producerParameters = ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct),
returnListener = Some(returnListener))
ExchangeParameters("text_exchange", ExchangeType.Direct), returnListener = Some(returnListener))
val producer = AMQP.newProducer(connection, producerParameters)

View file

@ -0,0 +1,60 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters}
import se.scalablesolutions.akka.serialization.Serializer
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(3)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val stringSerializer = new Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
}
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, {
case "some_payload" => "some_result"
case _ => error("Unhandled message")
}, channelParameters = Some(channelParameters))
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer
, channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
val response = rpcClient !! "some_payload"
response must be (Some("some_result"))
} finally {
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -0,0 +1,9 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
object AMQPTest {
def enabled = false
}

79
akka-core/.ensime Normal file
View file

@ -0,0 +1,79 @@
(
;; Where you unpacked the ENSIME distribution.
:server-root "/home/jboner/emacs-config/lib/ensime"
;; The command with which to invoke the ENSIME server. Change this to
;; "bin/server.bat" if your're on Windows.
:server-cmd "bin/server.bat"
;; The host to connect to. Connecting to remote ENSIME servers is not
;; currently supported.
;; ------------------------------
;; :server-host "localhost"
;; Assume a standard sbt directory structure. Look in default sbt
;; locations for dependencies, sources, target, etc.
;;
;; Note for sbt subprojects: Each subproject needs it's own .ensime
;; file.
;; -----------------------------
:use-sbt t
:sbt-compile-conf "compile"
;; Use an existing pom.xml to determine the dependencies
;; for the project. A Maven-style directory structure is assumed.
;; -----------------------------
;; :use-maven t
;; :maven-compile-scopes "compile"
;; :maven-runtime-scopes "runtime"
;; Use an existing ivy.xml to determine the dependencies
;; for the project. A Maven-style directory structure is assumed.
;; -----------------------------
;; :use-ivy t
;; :ivy-compile-conf "compile"
;; :ivy-runtime-conf "compile"
;; The home package for your project.
;; Used by ENSIME to populate the project outline view.
;; ------------------------------
:project-package "se.scalablesolutions.akka"
;; :sources ([dir | file]*)
;; Include source files by directory(recursively) or by filename.
;; ------------------------------
:sources ("src/main/")
;; :dependency-jars ([dir | file]*)
;; Include jars by directory(recursively) or by filename.
;; ------------------------------
;; :dependency-jars ("lib")
;; :dependency-dirs ([dir | file]*)
;; Include directories of .class files.
;; ------------------------------
;; :dependency-dirs ("target/classes")
;; :target dir
;; Specify the target of the project build process. Should be
;; the directory where .class files are written
;;
;; The target is used to populate the classpath when launching
;; the inferior scala repl.
;; ------------------------------
;; :target "target/classes"
)

View file

@ -408,22 +408,6 @@ trait Actor extends Logging {
*/
def initTransactionalState {}
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = self.reply(message)
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
def reply_?(message: Any): Boolean = self.reply_?(message)
/**
* Is the actor able to handle the message passed in as arguments?
*/

View file

@ -197,10 +197,13 @@ trait ActorRef extends TransactionManagement {
*/
protected[akka] val dispatcherLock = new ReentrantLock
protected[akka] var _sender: Option[ActorRef] = None
protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
/**
* This is a reference to the message currently being processed by the actor
*/
protected[akka] var _currentMessage: Option[MessageInvocation] = None
protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg }
protected[akka] def currentMessage = guard.withGuard { _currentMessage }
/**
* Returns the uuid for the actor.
@ -211,13 +214,27 @@ trait ActorRef extends TransactionManagement {
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
def sender: Option[ActorRef] = guard.withGuard { _sender }
def sender: Option[ActorRef] = {
//Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
val msg = currentMessage
if(msg.isEmpty)
None
else
msg.get.sender
}
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
def senderFuture: Option[CompletableFuture[Any]] = {
//Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
val msg = currentMessage
if(msg.isEmpty)
None
else
msg.get.senderFuture
}
/**
* Is the actor being restarted?
@ -404,13 +421,13 @@ trait ActorRef extends TransactionManagement {
* Returns the home address and port for this actor.
*/
def homeAddress: InetSocketAddress = _homeAddress
/**
* Set the home address and port for this actor.
*/
def homeAddress_=(hostnameAndPort: Tuple2[String, Int]): Unit =
homeAddress_=(new InetSocketAddress(hostnameAndPort._1, hostnameAndPort._2))
/**
* Set the home address and port for this actor.
*/
@ -531,7 +548,7 @@ trait ActorRef extends TransactionManagement {
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
protected[akka] def mailbox: Deque[MessageInvocation]
protected[akka] def restart(reason: Throwable): Unit
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
@ -609,8 +626,7 @@ sealed class LocalActorRef private[akka](
__format.asInstanceOf[SerializerBasedActorFormat[_]]
.serializer
.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
else
actorClass.newInstance.asInstanceOf[Actor]
else actorClass.newInstance.asInstanceOf[Actor]
})
loader = Some(__loader)
isDeserialized = true
@ -993,14 +1009,15 @@ sealed class LocalActorRef private[akka](
Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
return
}
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
currentMessage = Option(messageHandle)
try {
dispatch(messageHandle)
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
} finally {
currentMessage = None //TODO: Don't reset this, we might want to resend the message
}
}
@ -1058,20 +1075,22 @@ sealed class LocalActorRef private[akka](
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) {
faultHandler.get match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) =>
restartLinkedActors(reason)
faultHandler match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
restartLinkedActors(reason)
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) =>
dead.restart(reason)
}
} else throw new IllegalActorStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"\n\tto non-empty list of exception classes - can't proceed " + toString)
case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
dead.restart(reason)
case None =>
throw new IllegalActorStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"\n\tto non-empty list of exception classes - can't proceed " + toString)
}
} else {
_supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle
_supervisor.foreach(_ ! Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on
}
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap}
import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set=>JSet}
import se.scalablesolutions.akka.util.ListenerManagement
@ -29,6 +29,11 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends ListenerManagement {
private val refComparator = new java.util.Comparator[ActorRef]{
def compare(a: ActorRef,b: ActorRef) = a.uuid.compareTo(b.uuid)
}
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]]
@ -36,9 +41,7 @@ object ActorRegistry extends ListenerManagement {
/**
* Returns all actors in the system.
*/
def actors: List[ActorRef] = {
filter(_=> true)
}
def actors: List[ActorRef] = filter(_ => true)
/**
* Invokes a function for all actors.
@ -52,7 +55,7 @@ object ActorRegistry extends ListenerManagement {
* Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message.
*/
def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): List[ActorRef] =
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message))
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message))
/**
* Finds all actors that satisfy a predicate.
@ -73,7 +76,7 @@ object ActorRegistry extends ListenerManagement {
* Finds all actors that are subtypes of the class passed in as the Manifest argument.
*/
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] =
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass))
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass))
/**
* Finds any actor that matches T.
@ -119,7 +122,7 @@ object ActorRegistry extends ListenerManagement {
if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor)
if (actorsById.containsKey(id)) actorsById.get(id).add(actor)
else {
val set = new CopyOnWriteArraySet[ActorRef]
val set = new ConcurrentSkipListSet[ActorRef](refComparator)
set.add(actor)
actorsById.put(id, set)
}
@ -128,7 +131,7 @@ object ActorRegistry extends ListenerManagement {
val className = actor.actor.getClass.getName
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor)
else {
val set = new CopyOnWriteArraySet[ActorRef]
val set = new ConcurrentSkipListSet[ActorRef](refComparator)
set.add(actor)
actorsByClassName.put(className, set)
}

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.lang.Throwable
import Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import java.util.concurrent.{TimeUnit, CountDownLatch}
class SupervisorHierarchySpec extends JUnitSuite {
@Test
def killWorkerShouldRestartMangerAndOtherWorkers = {
val countDown = new CountDownLatch(4)
val workerOne = actorOf(new CountDownActor(countDown))
val workerTwo = actorOf(new CountDownActor(countDown))
val workerThree = actorOf(new CountDownActor( countDown))
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 1000))
protected def receive = { case _ => () }
}).start
val manager = actorOf(new CountDownActor(countDown))
boss.startLink(manager)
manager.startLink(workerOne)
manager.startLink(workerTwo)
manager.startLink(workerThree)
workerOne ! Exit(workerOne, new RuntimeException("Fire the worker!"))
// manager + all workers should be restarted by only killing a worker
// manager doesn't trap exits, so boss will restart manager
assert(countDown.await(4, TimeUnit.SECONDS))
}
class CountDownActor(countDown: CountDownLatch) extends Actor {
protected def receive = { case _ => () }
override def postRestart(reason: Throwable) = countDown.countDown
}
}

View file

@ -83,9 +83,7 @@ private[akka] object CassandraStorageBackend extends
if (column.isDefined) Some(column.get.getColumn.value)
else None
} catch {
case e =>
log.info("Could not retreive Ref from storage")
None
case e => None
}
}
@ -195,9 +193,7 @@ private[akka] object CassandraStorageBackend extends
if (column.isDefined) Some(column.get.getColumn.value)
else None
} catch {
case e =>
log.info("Could not retreive Map from storage")
None
case e => None
}
}

View file

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.7-SNAPSHOT-2.8.RC7</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -248,7 +248,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile"
val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}