diff --git a/akka-camel/src/main/scala/akka/camel/CamelService.scala b/akka-camel/src/main/scala/akka/camel/CamelService.scala index ee0d6c2afd..9aa1c77043 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelService.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelService.scala @@ -167,7 +167,7 @@ trait CamelService extends Bootable { * activations that occurred in the past are not considered. */ private def expectEndpointActivationCount(count: Int): CountDownLatch = - (activationTracker !! SetExpectedActivationCount(count)).as[CountDownLatch].get + (activationTracker ? SetExpectedActivationCount(count)).as[CountDownLatch].get /** * Sets an expectation on the number of upcoming endpoint de-activations and returns @@ -175,7 +175,7 @@ trait CamelService extends Bootable { * de-activations that occurred in the past are not considered. */ private def expectEndpointDeactivationCount(count: Int): CountDownLatch = - (activationTracker !! SetExpectedDeactivationCount(count)).as[CountDownLatch].get + (activationTracker ? SetExpectedDeactivationCount(count)).as[CountDownLatch].get private[camel] def registerPublishRequestor: Unit = registry.addListener(publishRequestor) diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala index 2cf4e3400f..655ce0adc2 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala @@ -35,20 +35,20 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerRegisteredEvent = { - val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get + val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get requestor ! ActorRegistered(consumer.address, consumer, None) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - assert((publisher !! GetRetainedMessage) === - Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer]))) + assert((publisher ? GetRetainedMessage).get === + ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer])) } @Test def shouldReceiveOneConsumerUnregisteredEvent = { - val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get + val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get requestor ! ActorUnregistered(consumer.address, consumer, None) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - assert((publisher !! GetRetainedMessage) === - Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer]))) + assert((publisher ? GetRetainedMessage).get === + ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])) } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index d80e847efa..61909b0db0 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -170,10 +170,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher "A supervised consumer" must { "be able to reply during receive" in { val consumer = Actor.actorOf(new SupervisedConsumer("reply-channel-test-1")).start - (consumer !! "succeed") match { - case Some(r) ⇒ r must equal("ok") - case None ⇒ fail("reply expected") - } + (consumer ? "succeed").get must equal("ok") } "be able to reply on failure during preRestart" in { diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 8b22ef46d8..a7755f1346 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -34,13 +34,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val producer = actorOf(new TestProducer("direct:producer-test-2", true)) producer.start - when("a test message is sent to the producer with !!") + when("a test message is sent to the producer with ?") val message = Message("test", Map(Message.MessageExchangeId -> "123")) - val result = producer !! message + val result = (producer ? message).get then("a normal response should have been returned by the producer") val expected = Message("received TEST", Map(Message.MessageExchangeId -> "123")) - assert(result === Some(expected)) + assert(result === expected) } scenario("produce message and receive failure response") { @@ -48,9 +48,9 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val producer = actorOf(new TestProducer("direct:producer-test-2")) producer.start - when("a test message causing an exception is sent to the producer with !!") + when("a test message causing an exception is sent to the producer with ?") val message = Message("fail", Map(Message.MessageExchangeId -> "123")) - val result = (producer !! message).as[Failure] + val result = (producer ? message).as[Failure] then("a failure response should have been returned by the producer") val expectedFailureText = result.get.cause.getMessage @@ -93,13 +93,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val producer = actorOf(new TestProducer("direct:producer-test-3")) producer.start - when("a test message is sent to the producer with !!") + when("a test message is sent to the producer with ?") val message = Message("test", Map(Message.MessageExchangeId -> "123")) - val result = producer !! message + val result = (producer ? message).get then("a normal response should have been returned by the producer") val expected = Message("received test", Map(Message.MessageExchangeId -> "123")) - assert(result === Some(expected)) + assert(result === expected) } scenario("produce message and receive failure response") { @@ -107,9 +107,9 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val producer = actorOf(new TestProducer("direct:producer-test-3")) producer.start - when("a test message causing an exception is sent to the producer with !!") + when("a test message causing an exception is sent to the producer with ?") val message = Message("fail", Map(Message.MessageExchangeId -> "123")) - val result = (producer !! message).as[Failure] + val result = (producer ? message).as[Failure] then("a failure response should have been returned by the producer") val expectedFailureText = result.get.cause.getMessage @@ -126,13 +126,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val target = actorOf[ReplyingForwardTarget].start val producer = actorOf(new TestForwarder("direct:producer-test-2", target)).start - when("a test message is sent to the producer with !!") + when("a test message is sent to the producer with ?") val message = Message("test", Map(Message.MessageExchangeId -> "123")) - val result = producer !! message + val result = (producer ? message).get then("a normal response should have been returned by the forward target") val expected = Message("received test", Map(Message.MessageExchangeId -> "123", "test" -> "result")) - assert(result === Some(expected)) + assert(result === expected) } scenario("produce message, forward failure response to a replying target actor and receive response") { @@ -140,13 +140,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val target = actorOf[ReplyingForwardTarget].start val producer = actorOf(new TestForwarder("direct:producer-test-2", target)).start - when("a test message causing an exception is sent to the producer with !!") + when("a test message causing an exception is sent to the producer with ?") val message = Message("fail", Map(Message.MessageExchangeId -> "123")) - val result = (producer !! message).as[Failure] + val result = (producer ? message).as[Failure].get then("a failure response should have been returned by the forward target") - val expectedFailureText = result.get.cause.getMessage - val expectedHeaders = result.get.headers + val expectedFailureText = result.cause.getMessage + val expectedHeaders = result.headers assert(expectedFailureText === "failure") assert(expectedHeaders === Map(Message.MessageExchangeId -> "123", "test" -> "failure")) } @@ -186,13 +186,13 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val target = actorOf[ReplyingForwardTarget].start val producer = actorOf(new TestForwarder("direct:producer-test-3", target)).start - when("a test message is sent to the producer with !!") + when("a test message is sent to the producer with ?") val message = Message("test", Map(Message.MessageExchangeId -> "123")) - val result = producer !! message + val result = (producer ? message).get then("a normal response should have been returned by the forward target") val expected = Message("received test", Map(Message.MessageExchangeId -> "123", "test" -> "result")) - assert(result === Some(expected)) + assert(result === expected) } scenario("produce message, forward failure response to a replying target actor and receive response") { @@ -200,9 +200,9 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before val target = actorOf[ReplyingForwardTarget].start val producer = actorOf(new TestForwarder("direct:producer-test-3", target)).start - when("a test message causing an exception is sent to the producer with !!") + when("a test message causing an exception is sent to the producer with ?") val message = Message("fail", Map(Message.MessageExchangeId -> "123")) - val result = (producer !! message).as[Failure] + val result = (producer ? message).as[Failure] then("a failure response should have been returned by the forward target") val expectedFailureText = result.get.cause.getMessage diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala index 3c66f3de6a..96510ad415 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala @@ -33,9 +33,9 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with val producer = actorOf(classOf[SampleUntypedReplyingProducer]) producer.start - when("a test message is sent to the producer with !!") + when("a test message is sent to the producer with ?") val message = Message("test", Map(Message.MessageExchangeId -> "123")) - val result = producer.sendRequestReply(message) + val result = producer.ask(message).get then("a normal response should have been returned by the producer") val expected = Message("received test", Map(Message.MessageExchangeId -> "123")) @@ -47,9 +47,9 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with val producer = actorOf(classOf[SampleUntypedReplyingProducer]) producer.start - when("a test message causing an exception is sent to the producer with !!") + when("a test message causing an exception is sent to the producer with ?") val message = Message("fail", Map(Message.MessageExchangeId -> "123")) - val result = producer.sendRequestReply(message).asInstanceOf[Failure] + val result = producer.ask(message).as[Failure].get then("a failure response should have been returned by the producer") val expectedFailureText = result.cause.getMessage diff --git a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala index c35ddb114a..3c1ce039b1 100644 --- a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala @@ -34,10 +34,10 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with scenario("one-way communication") { val actor = actorOf[Tester1].start - val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get mandatoryTemplate.sendBody("actor:uuid:%s" format actor.uuid, "Martin") assert(latch.await(5000, TimeUnit.MILLISECONDS)) - val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply.body === "Martin") } @@ -71,10 +71,10 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with scenario("one-way communication") { val actor = actorOf[Tester1].start - val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get mandatoryTemplate.sendBody("actor:%s" format actor.address, "Martin") assert(latch.await(5000, TimeUnit.MILLISECONDS)) - val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply.body === "Martin") } diff --git a/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala index b9096e0523..d1ebb5357b 100644 --- a/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala @@ -23,14 +23,14 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldSendMessageToActorWithSyncProcessor = { val actor = actorOf[Tester1].start - val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOnly) exchange.getIn.setBody("Martin") exchange.getIn.setHeader("k1", "v1") actorProducer(endpoint).process(exchange) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply.body === "Martin") assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1")) } @@ -38,14 +38,14 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldSendMessageToActorWithAsyncProcessor = { val actor = actorOf[Tester1].start - val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:uuid:%s" format actor.uuid) val exchange = endpoint.createExchange(ExchangePattern.InOnly) exchange.getIn.setBody("Martin") exchange.getIn.setHeader("k1", "v1") actorAsyncProducer(endpoint).process(exchange, expectSyncCompletion) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] + val reply = (actor ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply.body === "Martin") assert(reply.headers === Map(Message.MessageExchangeId -> exchange.getExchangeId, "k1" -> "v1")) } @@ -118,8 +118,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { val actor2 = actorOf[Tester1]("y") actor1.start actor2.start - val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get - val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:id:%s" format actor1.address) val exchange1 = endpoint.createExchange(ExchangePattern.InOnly) val exchange2 = endpoint.createExchange(ExchangePattern.InOnly) @@ -130,8 +130,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { actorProducer(endpoint).process(exchange2) assert(latch1.await(5, TimeUnit.SECONDS)) assert(latch2.await(5, TimeUnit.SECONDS)) - val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message] - val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message] + val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message] + val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply1.body === "Test1") assert(reply2.body === "Test2") } @@ -142,8 +142,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { val actor2 = actorOf[Tester1]("y") actor1.start actor2.start - val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get - val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:id:") val exchange1 = endpoint.createExchange(ExchangePattern.InOnly) val exchange2 = endpoint.createExchange(ExchangePattern.InOnly) @@ -155,8 +155,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { actorProducer(endpoint).process(exchange2) assert(latch1.await(5, TimeUnit.SECONDS)) assert(latch2.await(5, TimeUnit.SECONDS)) - val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message] - val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message] + val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message] + val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply1.body === "Test1") assert(reply2.body === "Test2") } @@ -165,8 +165,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { def shouldDynamicallyRouteMessageToActorWithDefaultUuid = { val actor1 = actorOf[Tester1].start val actor2 = actorOf[Tester1].start - val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get - val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:uuid:%s" format actor1.uuid) val exchange1 = endpoint.createExchange(ExchangePattern.InOnly) val exchange2 = endpoint.createExchange(ExchangePattern.InOnly) @@ -177,8 +177,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { actorProducer(endpoint).process(exchange2) assert(latch1.await(5, TimeUnit.SECONDS)) assert(latch2.await(5, TimeUnit.SECONDS)) - val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message] - val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message] + val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message] + val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply1.body === "Test1") assert(reply2.body === "Test2") } @@ -187,8 +187,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { def shouldDynamicallyRouteMessageToActorWithoutDefaultUuid = { val actor1 = actorOf[Tester1].start val actor2 = actorOf[Tester1].start - val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get - val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch1 = (actor1 ? SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch2 = (actor2 ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:uuid:") val exchange1 = endpoint.createExchange(ExchangePattern.InOnly) val exchange2 = endpoint.createExchange(ExchangePattern.InOnly) @@ -200,8 +200,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { actorProducer(endpoint).process(exchange2) assert(latch1.await(5, TimeUnit.SECONDS)) assert(latch2.await(5, TimeUnit.SECONDS)) - val reply1 = (actor1 !! GetRetainedMessage).get.asInstanceOf[Message] - val reply2 = (actor2 !! GetRetainedMessage).get.asInstanceOf[Message] + val reply1 = (actor1 ? GetRetainedMessage).get.asInstanceOf[Message] + val reply2 = (actor2 ? GetRetainedMessage).get.asInstanceOf[Message] assert(reply1.body === "Test1") assert(reply2.body === "Test2") } @@ -209,7 +209,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldThrowExceptionWhenIdNotSet: Unit = { val actor = actorOf[Tester1].start - val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:id:") intercept[ActorIdentifierNotSetException] { actorProducer(endpoint).process(endpoint.createExchange(ExchangePattern.InOnly)) @@ -219,7 +219,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldThrowExceptionWhenUuidNotSet: Unit = { val actor = actorOf[Tester1].start - val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get + val latch = (actor ? SetExpectedMessageCount(1)).as[CountDownLatch].get val endpoint = actorEndpoint("actor:uuid:") intercept[ActorIdentifierNotSetException] { actorProducer(endpoint).process(endpoint.createExchange(ExchangePattern.InOnly)) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 8dc37fac60..a662c21e6d 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -167,8 +167,8 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_stm, akka_actor_tests) lazy val akka_durable_mailboxes = project("akka-durable-mailboxes", "akka-durable-mailboxes", new AkkaDurableMailboxesParentProject(_), akka_cluster) - //lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_actor, akka_slf4j) - //lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel) + lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_actor, akka_slf4j) + lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel) //lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_cluster, akka_camel) //lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), akka_cluster, akka_http, akka_slf4j, akka_camel)