- moved all into package folder structure
- added simple protobuf based rpc convenience
This commit is contained in:
parent
09d7cc75eb
commit
be4be45a8d
23 changed files with 238 additions and 196 deletions
|
|
@ -1,15 +1,16 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
|
||||
import java.lang.IllegalArgumentException
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import Actor._
|
||||
|
||||
/**
|
||||
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
|
||||
*
|
||||
|
|
@ -80,33 +81,6 @@ object AMQP {
|
|||
consumer
|
||||
}
|
||||
|
||||
def newRpcClient[O,I](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcClientSerializer[O,I],
|
||||
channelParameters: Option[ChannelParameters] = None): ActorRef = {
|
||||
val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters))
|
||||
connection.startLink(rpcActor)
|
||||
rpcActor ! Start
|
||||
rpcActor
|
||||
}
|
||||
|
||||
def newRpcServer[I,O](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcServerSerializer[I,O],
|
||||
requestHandler: I => O,
|
||||
queueName: Option[String] = None,
|
||||
channelParameters: Option[ChannelParameters] = None) = {
|
||||
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
|
||||
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
|
||||
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
|
||||
, channelParameters = channelParameters
|
||||
, selfAcknowledging = false
|
||||
, queueName = queueName))
|
||||
|
||||
}
|
||||
|
||||
private val supervisor = new AMQPSupervisor
|
||||
|
||||
class AMQPSupervisor extends Logging {
|
||||
|
|
@ -129,17 +103,4 @@ object AMQP {
|
|||
connectionActor
|
||||
}
|
||||
}
|
||||
|
||||
trait FromBinary[T] {
|
||||
def fromBinary(bytes: Array[Byte]): T
|
||||
}
|
||||
|
||||
trait ToBinary[T] {
|
||||
def toBinary(t: T): Array[Byte]
|
||||
}
|
||||
|
||||
|
||||
case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
|
||||
|
||||
case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
|
||||
}
|
||||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package se.scalablesolutions.akka.amqp
|
||||
|
||||
import rpc.RPC
|
||||
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, ToBinary, FromBinary}
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
|
||||
import Actor._
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
|
@ -146,7 +148,7 @@ object ExampleSession {
|
|||
|
||||
def requestHandler(request: String) = 3
|
||||
|
||||
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
|
||||
val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
|
||||
requestHandler, queueName = Some("rpc.in.key.queue"))
|
||||
|
||||
|
||||
|
|
@ -159,7 +161,7 @@ object ExampleSession {
|
|||
}
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
|
||||
|
||||
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
|
||||
val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
|
||||
|
||||
val response = (rpcClient !! "rpc_request")
|
||||
log.info("Response: " + response)
|
||||
|
|
@ -0,0 +1,110 @@
|
|||
package se.scalablesolutions.akka.amqp.rpc
|
||||
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import com.google.protobuf.Message
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
|
||||
object RPC {
|
||||
|
||||
def newRpcClient[O, I](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcClientSerializer[O, I],
|
||||
channelParameters: Option[ChannelParameters] = None): ActorRef = {
|
||||
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](exchangeParameters, routingKey, serializer, channelParameters))
|
||||
connection.startLink(rpcActor)
|
||||
rpcActor ! Start
|
||||
rpcActor
|
||||
}
|
||||
|
||||
def newRpcServer[I, O](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcServerSerializer[I, O],
|
||||
requestHandler: I => O,
|
||||
queueName: Option[String] = None,
|
||||
channelParameters: Option[ChannelParameters] = None) = {
|
||||
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
|
||||
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
|
||||
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
|
||||
, channelParameters = channelParameters
|
||||
, selfAcknowledging = false
|
||||
, queueName = queueName))
|
||||
|
||||
}
|
||||
|
||||
trait FromBinary[T] {
|
||||
def fromBinary(bytes: Array[Byte]): T
|
||||
}
|
||||
|
||||
trait ToBinary[T] {
|
||||
def toBinary(t: T): Array[Byte]
|
||||
}
|
||||
|
||||
|
||||
case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
|
||||
|
||||
case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
|
||||
|
||||
|
||||
/**
|
||||
* RPC convenience
|
||||
*/
|
||||
trait RpcClient[O, I] {
|
||||
def callService(request: O, timeout: Long = 5000): Option[I]
|
||||
}
|
||||
|
||||
private class RpcServiceClient[O, I](client: ActorRef) extends RpcClient[O, I] {
|
||||
def callService(request: O, timeout: Long = 5000): Option[I] = {
|
||||
(client.!!(request, timeout)).as[I]
|
||||
}
|
||||
}
|
||||
|
||||
private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
||||
|
||||
def startProtobufServer[I <: Message, O <: Message](
|
||||
connection: ActorRef, serviceName: String, requestHandler: I => O)(implicit manifest: Manifest[I]) = {
|
||||
|
||||
val serializer = new RpcServerSerializer[I, O](
|
||||
new FromBinary[I] {
|
||||
def fromBinary(bytes: Array[Byte]): I = {
|
||||
createProtobufFromBytes[I](bytes)
|
||||
}
|
||||
}, new ToBinary[O] {
|
||||
def toBinary(t: O) = t.toByteArray
|
||||
})
|
||||
|
||||
val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic)
|
||||
val routingKey = "%s.request".format(serviceName)
|
||||
val queueName = "%s.in".format(routingKey)
|
||||
|
||||
newRpcServer[I, O](connection, exchangeParameters, routingKey, serializer, requestHandler,
|
||||
queueName = Some(queueName))
|
||||
}
|
||||
|
||||
def startProtobufClient[O <: Message, I <: Message](
|
||||
connection: ActorRef, serviceName: String)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
|
||||
|
||||
val serializer = new RpcClientSerializer[O, I](
|
||||
new ToBinary[O] {
|
||||
def toBinary(t: O) = t.toByteArray
|
||||
}, new FromBinary[I] {
|
||||
def fromBinary(bytes: Array[Byte]): I = {
|
||||
createProtobufFromBytes[I](bytes)
|
||||
}
|
||||
})
|
||||
|
||||
val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic)
|
||||
val routingKey = "%s.request".format(serviceName)
|
||||
val queueName = "%s.in".format(routingKey)
|
||||
|
||||
val client = newRpcClient[O, I](connection, exchangeParameters, routingKey, serializer)
|
||||
new RpcServiceClient[O, I](client)
|
||||
}
|
||||
|
||||
private def createProtobufFromBytes[I](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = {
|
||||
manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I]
|
||||
}
|
||||
}
|
||||
|
|
@ -4,11 +4,9 @@
|
|||
|
||||
package se.scalablesolutions.akka.amqp
|
||||
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
|
||||
|
||||
import com.rabbitmq.client.{Channel, RpcClient}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters}
|
||||
import rpc.RPC.RpcClientSerializer
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
|
||||
|
||||
class RpcClientActor[I,O](exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
|
|
@ -4,9 +4,9 @@
|
|||
|
||||
package se.scalablesolutions.akka.amqp
|
||||
|
||||
import rpc.RPC.RpcServerSerializer
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
|
||||
import com.rabbitmq.client.AMQP.BasicProperties
|
||||
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
|
||||
|
||||
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor {
|
||||
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
|
@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException
|
|||
import se.scalablesolutions.akka.amqp._
|
||||
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def connectionAndRecovery = if (AMQPTest.enabled) {
|
||||
|
|
@ -50,10 +49,4 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Loggi
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import com.rabbitmq.client.ShutdownSignalException
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
|
|
@ -15,8 +12,10 @@ import java.util.concurrent.TimeUnit
|
|||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
|
||||
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def consumerChannelRecovery = if (AMQPTest.enabled) {
|
||||
|
|
@ -60,11 +59,4 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,22 +1,21 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import com.rabbitmq.client.ShutdownSignalException
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import Actor._
|
||||
|
||||
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def consumerConnectionRecovery = if (AMQPTest.enabled) {
|
||||
|
|
@ -79,11 +78,4 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
|
|
@ -14,8 +11,10 @@ import org.junit.Test
|
|||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) {
|
||||
|
|
@ -57,11 +56,4 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,20 +1,19 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def consumerMessage = if (AMQPTest.enabled) {
|
||||
|
|
@ -46,11 +45,4 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
|
@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException
|
|||
import se.scalablesolutions.akka.amqp._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def producerChannelRecovery = if (AMQPTest.enabled) {
|
||||
|
|
@ -53,11 +52,4 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
|
@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException
|
|||
import se.scalablesolutions.akka.amqp._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def producerConnectionRecovery = if (AMQPTest.enabled) {
|
||||
|
|
@ -52,11 +51,4 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
|
@ -16,8 +13,10 @@ import com.rabbitmq.client.AMQP.BasicProperties
|
|||
import java.lang.String
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def producerMessage = if (AMQPTest.enabled) {
|
||||
|
|
@ -41,11 +40,4 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,19 +1,21 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
import rpc.RPC
|
||||
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, FromBinary, ToBinary}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
|
||||
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def consumerMessage = if (AMQPTest.enabled) {
|
||||
val connection = AMQP.newConnection()
|
||||
|
|
@ -39,7 +41,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
|
|||
|
||||
def requestHandler(request: String) = 3
|
||||
|
||||
val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
|
||||
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
|
||||
requestHandler, channelParameters = Some(channelParameters))
|
||||
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int](
|
||||
|
|
@ -49,7 +51,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
|
|||
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
|
||||
})
|
||||
|
||||
val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
|
||||
val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
|
||||
channelParameters = Some(channelParameters))
|
||||
|
||||
countDown.await(2, TimeUnit.SECONDS) must be(true)
|
||||
|
|
@ -59,11 +61,4 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
|
|||
connection.stop
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def dummy {
|
||||
// amqp tests need local rabbitmq server running, so a disabled by default.
|
||||
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.amqp.AMQP
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp.rpc.RPC
|
||||
|
||||
class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers {
|
||||
|
||||
@Test
|
||||
def consumerMessage = if (AMQPTest.enabled) {
|
||||
|
||||
val connection = AMQP.newConnection()
|
||||
|
||||
RPC.startProtobufServer(connection, "protoservice", requestHandler)
|
||||
|
||||
val protobufClient = RPC.startProtobufClient[AddressProtocol, AddressProtocol](connection, "protoservice")
|
||||
|
||||
val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build
|
||||
|
||||
protobufClient.callService(request) match {
|
||||
case Some(response) => assert(response.getHostname == request.getHostname.reverse)
|
||||
case None => fail("no response")
|
||||
}
|
||||
}
|
||||
|
||||
def requestHandler(request: AddressProtocol): AddressProtocol = {
|
||||
AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build
|
||||
}
|
||||
}
|
||||
|
|
@ -5,5 +5,5 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
object AMQPTest {
|
||||
def enabled = false
|
||||
def enabled = true
|
||||
}
|
||||
|
|
@ -41,15 +41,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
object Repositories {
|
||||
lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||
lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
|
||||
// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||
// lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
|
||||
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
|
||||
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
|
||||
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
// lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
|
||||
// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
@ -60,23 +60,26 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
import Repositories._
|
||||
lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
|
||||
lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
|
||||
// lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo)
|
||||
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
|
||||
lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
|
||||
lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
|
||||
lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
|
||||
lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
|
||||
lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
|
||||
lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
|
||||
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo)
|
||||
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
|
||||
// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
|
||||
// lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
|
||||
// // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo)
|
||||
// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
|
||||
// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
|
||||
// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
|
||||
// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
|
||||
// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
|
||||
// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
|
||||
// lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
|
||||
// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo)
|
||||
// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
|
||||
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
|
||||
val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository"
|
||||
|
||||
val efgfpNexusReleasesRepository = "Nexus Releases" at "http://nexus/nexus/content/groups/public"
|
||||
val efgfpNexusSnaphotsRepository = "Nexus Snapshots" at "http://nexus/nexus/content/groups/public-snapshots"
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// Versions
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
@ -358,6 +361,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
|
||||
val commons_io = Dependencies.commons_io
|
||||
val rabbit = Dependencies.rabbit
|
||||
val protobuf = Dependencies.protobuf
|
||||
|
||||
// testing
|
||||
val junit = Dependencies.junit
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue