re-enable akka-camel and akka-camel-typed and fix compile and test errors

This commit is contained in:
Martin Krasser 2011-06-28 16:53:11 +02:00
parent 86cfc8672e
commit d25b8ce92c
8 changed files with 63 additions and 66 deletions

View file

@ -167,7 +167,7 @@ trait CamelService extends Bootable {
* activations that occurred in the past are not considered.
*/
private def expectEndpointActivationCount(count: Int): CountDownLatch =
(activationTracker !! SetExpectedActivationCount(count)).as[CountDownLatch].get
(activationTracker ? SetExpectedActivationCount(count)).as[CountDownLatch].get
/**
* Sets an expectation on the number of upcoming endpoint de-activations and returns
@ -175,7 +175,7 @@ trait CamelService extends Bootable {
* de-activations that occurred in the past are not considered.
*/
private def expectEndpointDeactivationCount(count: Int): CountDownLatch =
(activationTracker !! SetExpectedDeactivationCount(count)).as[CountDownLatch].get
(activationTracker ? SetExpectedDeactivationCount(count)).as[CountDownLatch].get
private[camel] def registerPublishRequestor: Unit =
registry.addListener(publishRequestor)

View file

@ -35,20 +35,20 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerRegisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer.address, consumer, None)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer]))
}
@Test
def shouldReceiveOneConsumerUnregisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorUnregistered(consumer.address, consumer, None)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer]))
}
}

View file

@ -170,10 +170,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher
"A supervised consumer" must {
"be able to reply during receive" in {
val consumer = Actor.actorOf(new SupervisedConsumer("reply-channel-test-1")).start
(consumer !! "succeed") match {
case Some(r) r must equal("ok")
case None fail("reply expected")
}
(consumer ? "succeed").get must equal("ok")
}
"be able to reply on failure during preRestart" in {

View file

@ -34,13 +34,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val producer = actorOf(new TestProducer("direct:producer-test-2", true))
producer.start
when("a test message is sent to the producer with !!")
when("a test message is sent to the producer with ?")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer !! message
val result = (producer ? message).get
then("a normal response should have been returned by the producer")
val expected = Message("received TEST", Map(Message.MessageExchangeId -> "123"))
assert(result === Some(expected))
assert(result === expected)
}
scenario("produce message and receive failure response") {
@ -48,9 +48,9 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val producer = actorOf(new TestProducer("direct:producer-test-2"))
producer.start
when("a test message causing an exception is sent to the producer with !!")
when("a test message causing an exception is sent to the producer with ?")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = (producer !! message).as[Failure]
val result = (producer ? message).as[Failure]
then("a failure response should have been returned by the producer")
val expectedFailureText = result.get.cause.getMessage
@ -93,13 +93,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val producer = actorOf(new TestProducer("direct:producer-test-3"))
producer.start
when("a test message is sent to the producer with !!")
when("a test message is sent to the producer with ?")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer !! message
val result = (producer ? message).get
then("a normal response should have been returned by the producer")
val expected = Message("received test", Map(Message.MessageExchangeId -> "123"))
assert(result === Some(expected))
assert(result === expected)
}
scenario("produce message and receive failure response") {
@ -107,9 +107,9 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val producer = actorOf(new TestProducer("direct:producer-test-3"))
producer.start
when("a test message causing an exception is sent to the producer with !!")
when("a test message causing an exception is sent to the producer with ?")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = (producer !! message).as[Failure]
val result = (producer ? message).as[Failure]
then("a failure response should have been returned by the producer")
val expectedFailureText = result.get.cause.getMessage
@ -126,13 +126,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val target = actorOf[ReplyingForwardTarget].start
val producer = actorOf(new TestForwarder("direct:producer-test-2", target)).start
when("a test message is sent to the producer with !!")
when("a test message is sent to the producer with ?")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer !! message
val result = (producer ? message).get
then("a normal response should have been returned by the forward target")
val expected = Message("received test", Map(Message.MessageExchangeId -> "123", "test" -> "result"))
assert(result === Some(expected))
assert(result === expected)
}
scenario("produce message, forward failure response to a replying target actor and receive response") {
@ -140,13 +140,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val target = actorOf[ReplyingForwardTarget].start
val producer = actorOf(new TestForwarder("direct:producer-test-2", target)).start
when("a test message causing an exception is sent to the producer with !!")
when("a test message causing an exception is sent to the producer with ?")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = (producer !! message).as[Failure]
val result = (producer ? message).as[Failure].get
then("a failure response should have been returned by the forward target")
val expectedFailureText = result.get.cause.getMessage
val expectedHeaders = result.get.headers
val expectedFailureText = result.cause.getMessage
val expectedHeaders = result.headers
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(Message.MessageExchangeId -> "123", "test" -> "failure"))
}
@ -186,13 +186,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val target = actorOf[ReplyingForwardTarget].start
val producer = actorOf(new TestForwarder("direct:producer-test-3", target)).start
when("a test message is sent to the producer with !!")
when("a test message is sent to the producer with ?")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer !! message
val result = (producer ? message).get
then("a normal response should have been returned by the forward target")
val expected = Message("received test", Map(Message.MessageExchangeId -> "123", "test" -> "result"))
assert(result === Some(expected))
assert(result === expected)
}
scenario("produce message, forward failure response to a replying target actor and receive response") {
@ -200,9 +200,9 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
val target = actorOf[ReplyingForwardTarget].start
val producer = actorOf(new TestForwarder("direct:producer-test-3", target)).start
when("a test message causing an exception is sent to the producer with !!")
when("a test message causing an exception is sent to the producer with ?")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = (producer !! message).as[Failure]
val result = (producer ? message).as[Failure]
then("a failure response should have been returned by the forward target")
val expectedFailureText = result.get.cause.getMessage

View file

@ -33,9 +33,9 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
val producer = actorOf(classOf[SampleUntypedReplyingProducer])
producer.start
when("a test message is sent to the producer with !!")
when("a test message is sent to the producer with ?")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer.sendRequestReply(message)
val result = producer.ask(message).get
then("a normal response should have been returned by the producer")
val expected = Message("received test", Map(Message.MessageExchangeId -> "123"))
@ -47,9 +47,9 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
val producer = actorOf(classOf[SampleUntypedReplyingProducer])
producer.start
when("a test message causing an exception is sent to the producer with !!")
when("a test message causing an exception is sent to the producer with ?")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = producer.sendRequestReply(message).asInstanceOf[Failure]
val result = producer.ask(message).as[Failure].get
then("a failure response should have been returned by the producer")
val expectedFailureText = result.cause.getMessage

View file

@ -34,10 +34,10 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("one-way communication") {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get
mandatoryTemplate.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}
@ -71,10 +71,10 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("one-way communication") {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get
mandatoryTemplate.sendBody("actor:%s" format actor.address, "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}

View file

@ -23,14 +23,14 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@Test
def shouldSendMessageToActorWithSyncProcessor = {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOnly)
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
actorProducer(endpoint).process(exchange)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
}
@ -38,14 +38,14 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@Test
def shouldSendMessageToActorWithAsyncProcessor = {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOnly)
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
actorAsyncProducer(endpoint).process(exchange, expectSyncCompletion)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1"))
}
@ -118,8 +118,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
val actor2 = actorOf[Tester1]("y")
actor1.start
actor2.start
val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:id:%s" format actor1.address)
val exchange1 = endpoint.createExchange(ExchangePattern.InOnly)
val exchange2 = endpoint.createExchange(ExchangePattern.InOnly)
@ -130,8 +130,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
actorProducer(endpoint).process(exchange2)
assert(latch1.await(5, TimeUnit.SECONDS))
assert(latch2.await(5, TimeUnit.SECONDS))
val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply1.body === "Test1")
assert(reply2.body === "Test2")
}
@ -142,8 +142,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
val actor2 = actorOf[Tester1]("y")
actor1.start
actor2.start
val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:id:")
val exchange1 = endpoint.createExchange(ExchangePattern.InOnly)
val exchange2 = endpoint.createExchange(ExchangePattern.InOnly)
@ -155,8 +155,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
actorProducer(endpoint).process(exchange2)
assert(latch1.await(5, TimeUnit.SECONDS))
assert(latch2.await(5, TimeUnit.SECONDS))
val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply1.body === "Test1")
assert(reply2.body === "Test2")
}
@ -165,8 +165,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
def shouldDynamicallyRouteMessageToActorWithDefaultUuid = {
val actor1 = actorOf[Tester1].start
val actor2 = actorOf[Tester1].start
val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:uuid:%s" format actor1.uuid)
val exchange1 = endpoint.createExchange(ExchangePattern.InOnly)
val exchange2 = endpoint.createExchange(ExchangePattern.InOnly)
@ -177,8 +177,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
actorProducer(endpoint).process(exchange2)
assert(latch1.await(5, TimeUnit.SECONDS))
assert(latch2.await(5, TimeUnit.SECONDS))
val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply1.body === "Test1")
assert(reply2.body === "Test2")
}
@ -187,8 +187,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
def shouldDynamicallyRouteMessageToActorWithoutDefaultUuid = {
val actor1 = actorOf[Tester1].start
val actor2 = actorOf[Tester1].start
val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:uuid:")
val exchange1 = endpoint.createExchange(ExchangePattern.InOnly)
val exchange2 = endpoint.createExchange(ExchangePattern.InOnly)
@ -200,8 +200,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
actorProducer(endpoint).process(exchange2)
assert(latch1.await(5, TimeUnit.SECONDS))
assert(latch2.await(5, TimeUnit.SECONDS))
val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message]
val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message]
val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message]
assert(reply1.body === "Test1")
assert(reply2.body === "Test2")
}
@ -209,7 +209,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@Test
def shouldThrowExceptionWhenIdNotSet: Unit = {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:id:")
intercept[ActorIdentifierNotSetException] {
actorProducer(endpoint).process(endpoint.createExchange(ExchangePattern.InOnly))
@ -219,7 +219,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@Test
def shouldThrowExceptionWhenUuidNotSet: Unit = {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:uuid:")
intercept[ActorIdentifierNotSetException] {
actorProducer(endpoint).process(endpoint.createExchange(ExchangePattern.InOnly))

View file

@ -167,8 +167,8 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_stm, akka_actor_tests)
lazy val akka_durable_mailboxes = project("akka-durable-mailboxes", "akka-durable-mailboxes", new AkkaDurableMailboxesParentProject(_), akka_cluster)
//lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_actor, akka_slf4j)
//lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel)
lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_actor, akka_slf4j)
lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel)
//lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_cluster, akka_camel)
//lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), akka_cluster, akka_http, akka_slf4j, akka_camel)