cosmetic and disable the tests
This commit is contained in:
parent
f1b090ce14
commit
6022c63535
11 changed files with 22 additions and 15 deletions
|
|
@ -23,9 +23,11 @@ class RpcClientActor(exchangeParameters: ExchangeParameters,
|
|||
def specificMessageHandler = {
|
||||
case payload: AnyRef => {
|
||||
|
||||
rpcClient.foreach {client =>
|
||||
val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload))
|
||||
reply(outSerializer.fromBinary(response, None))
|
||||
rpcClient match {
|
||||
case Some(client) =>
|
||||
val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload))
|
||||
reply(outSerializer.fromBinary(response, None))
|
||||
case None => error("%s has no client to send messages with".format(this))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -34,6 +36,12 @@ class RpcClientActor(exchangeParameters: ExchangeParameters,
|
|||
rpcClient = Some(new RpcClient(ch, exchangeName, routingKey))
|
||||
}
|
||||
|
||||
override def preRestart(reason: Throwable) = {
|
||||
rpcClient = None
|
||||
super.preRestart(reason)
|
||||
}
|
||||
|
||||
|
||||
override def toString(): String =
|
||||
"AMQP.RpcClient[exchange=" +exchangeName +
|
||||
", routingKey=" + routingKey+ "]"
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer
|
|||
val request = inSerializer.fromBinary(payload, None)
|
||||
val response: Array[Byte] = outSerializer.toBinary(requestHandler(request))
|
||||
|
||||
log.info("%s sending reply to %s", this, props.getReplyTo)
|
||||
log.debug("%s sending reply to %s", this, props.getReplyTo)
|
||||
val replyProps = new BasicProperties
|
||||
replyProps.setCorrelationId(props.getCorrelationId)
|
||||
producer ! new Message(response, props.getReplyTo, properties = Some(replyProps))
|
||||
|
|
@ -30,5 +30,5 @@ class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer
|
|||
}
|
||||
|
||||
override def toString(): String =
|
||||
"AMQP.RpcServer[producerId=" + producer.id + "]"
|
||||
"AMQP.RpcServer[]"
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
|
||||
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def connectionAndRecovery = {
|
||||
val connectedLatch = new StandardLatch
|
||||
val reconnectingLatch = new StandardLatch
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import se.scalablesolutions.akka.amqp.AMQP._
|
|||
|
||||
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def consumerChannelRecovery = {
|
||||
|
||||
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import se.scalablesolutions.akka.amqp.AMQP._
|
|||
|
||||
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def consumerConnectionRecovery = {
|
||||
|
||||
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete
|
|||
|
||||
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def consumerMessageManualAcknowledge = {
|
||||
val connection = AMQP.newConnection()
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete
|
|||
|
||||
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def consumerMessage = {
|
||||
val connection = AMQP.newConnection()
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter
|
|||
|
||||
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def producerChannelRecovery = {
|
||||
|
||||
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter
|
|||
|
||||
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def producerConnectionRecovery = {
|
||||
|
||||
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParamete
|
|||
|
||||
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def producerMessage = {
|
||||
|
||||
val connection: ActorRef = AMQP.newConnection()
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import se.scalablesolutions.akka.util.Logging
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
|
@ -17,7 +16,7 @@ import se.scalablesolutions.akka.serialization.Serializer
|
|||
|
||||
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
def consumerMessage = {
|
||||
val connection = AMQP.newConnection()
|
||||
try {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue