From 9eb3f800170838914493617e86b7090caa70d00a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 12 Oct 2010 12:03:09 +0200 Subject: [PATCH] Removing anonymous actor methods as per discussion on ML --- akka-actor/src/main/scala/actor/Actor.scala | 93 ------------------- .../test/scala/actor/actor/AgentSpec.scala | 8 +- .../test/scala/actor/actor/HotSwapSpec.scala | 19 ++-- .../src/test/scala/misc/SchedulerSpec.scala | 12 +-- .../src/test/scala/routing/RoutingSpec.scala | 42 +++++---- .../se/scalablesolutions/akka/amqp/AMQP.scala | 14 ++- .../akka/amqp/ExampleSession.scala | 69 ++++++++------ ...MQPConnectionRecoveryTestIntegration.scala | 22 +++-- ...nsumerChannelRecoveryTestIntegration.scala | 29 +++--- ...merConnectionRecoveryTestIntegration.scala | 47 +++++----- ...umerManualAcknowledgeTestIntegration.scala | 36 +++---- ...PConsumerManualRejectTestIntegration.scala | 24 ++--- .../AMQPConsumerMessageTestIntegration.scala | 19 ++-- ...oducerChannelRecoveryTestIntegration.scala | 20 ++-- ...cerConnectionRecoveryTestIntegration.scala | 20 ++-- .../AMQPRpcClientServerTestIntegration.scala | 13 ++- .../ServerInitiatedRemoteActorSpec.scala | 2 +- 17 files changed, 217 insertions(+), 272 deletions(-) diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index a7ef729e2d..dab4afae8a 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -143,99 +143,6 @@ object Actor extends Logging { */ def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) - /** - * Use to create an anonymous event-driven actor. - *

- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - *

- * The actor is started when created. - * Example: - *

-   * import Actor._
-   *
-   * val a = actor  {
-   *   case msg => ... // handle message
-   * }
-   * 
- */ - def actor(body: Receive): ActorRef = - actorOf(new Actor() { - self.lifeCycle = Permanent - def receive: Receive = body - }).start - - /** - * Use to create an anonymous transactional event-driven actor. - *

- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - *

- * The actor is started when created. - * Example: - *

-   * import Actor._
-   *
-   * val a = transactor  {
-   *   case msg => ... // handle message
-   * }
-   * 
- */ - def transactor(body: Receive): ActorRef = - actorOf(new Transactor() { - self.lifeCycle = Permanent - def receive: Receive = body - }).start - - /** - * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration, - * which means that if the actor is supervised and dies it will *not* be restarted. - *

- * The actor is started when created. - * Example: - *

-   * import Actor._
-   *
-   * val a = temporaryActor  {
-   *   case msg => ... // handle message
-   * }
-   * 
- */ - def temporaryActor(body: Receive): ActorRef = - actorOf(new Actor() { - self.lifeCycle = Temporary - def receive = body - }).start - - /** - * Use to create an anonymous event-driven actor with both an init block and a message loop block. - *

- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - *

- * The actor is started when created. - * Example: - *

-   * val a = Actor.init  {
-   *   ... // init stuff
-   * } receive   {
-   *   case msg => ... // handle message
-   * }
-   * 
- * - */ - def init[A](body: => Unit) = { - def handler[A](body: => Unit) = new { - def receive(handler: Receive) = - actorOf(new Actor() { - self.lifeCycle = Permanent - body - def receive = handler - }).start - } - handler(body) - } - /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when * the block has been executed. diff --git a/akka-actor/src/test/scala/actor/actor/AgentSpec.scala b/akka-actor/src/test/scala/actor/actor/AgentSpec.scala index 71911c3ad8..449fd89a7f 100644 --- a/akka-actor/src/test/scala/actor/actor/AgentSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/AgentSpec.scala @@ -1,9 +1,9 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.actor.Actor.transactor import org.scalatest.Suite import org.scalatest.junit.JUnitRunner import org.scalatest.matchers.MustMatchers +import Actor._ import org.junit.runner.RunWith import org.junit.Test @@ -45,9 +45,9 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers { case object Go val agent = Agent(5) val latch = new CountDownLatch(1) - val tx = transactor { - case Go => agent send { e => latch.countDown; e + 1 } - } + val tx = actorOf( new Transactor { + def receive = { case Go => agent send { e => latch.countDown; e + 1 } } + } ).start tx ! Go assert(latch.await(5, TimeUnit.SECONDS)) val result = agent() diff --git a/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala b/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala index d8892a7bdf..7caa194dbe 100644 --- a/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala @@ -13,10 +13,9 @@ class HotSwapSpec extends WordSpec with MustMatchers { "be able to hotswap its behavior with HotSwap(..)" in { val barrier = new CyclicBarrier(2) @volatile var _log = "" - val a = actor { - case _ => - _log += "default" - } + val a = actorOf( new Actor { + def receive = { case _ => _log += "default" } + }).start a ! HotSwap { case _ => _log += "swapped" @@ -58,11 +57,13 @@ class HotSwapSpec extends WordSpec with MustMatchers { "be able to revert hotswap its behavior with RevertHotSwap(..)" in { val barrier = new CyclicBarrier(2) @volatile var _log = "" - val a = actor { - case "init" => - _log += "init" - barrier.await - } + val a = actorOf( new Actor { + def receive = { + case "init" => + _log += "init" + barrier.await + } + }).start a ! "init" barrier.await diff --git a/akka-actor/src/test/scala/misc/SchedulerSpec.scala b/akka-actor/src/test/scala/misc/SchedulerSpec.scala index 83daff2e01..2c7d43896c 100644 --- a/akka-actor/src/test/scala/misc/SchedulerSpec.scala +++ b/akka-actor/src/test/scala/misc/SchedulerSpec.scala @@ -20,9 +20,9 @@ class SchedulerSpec extends JUnitSuite { case object Tick val countDownLatch = new CountDownLatch(3) - val tickActor = actor { - case Tick => countDownLatch.countDown - } + val tickActor = actorOf(new Actor { + def receive = { case Tick => countDownLatch.countDown } + }).start // run every 50 millisec Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS) @@ -40,9 +40,9 @@ class SchedulerSpec extends JUnitSuite { @Test def schedulerShouldScheduleOnce = withCleanEndState { case object Tick val countDownLatch = new CountDownLatch(3) - val tickActor = actor { - case Tick => countDownLatch.countDown - } + val tickActor = actorOf(new Actor { + def receive = { case Tick => countDownLatch.countDown } + }).start // run every 50 millisec Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS) Scheduler.scheduleOnce( () => countDownLatch.countDown, 50, TimeUnit.MILLISECONDS) diff --git a/akka-actor/src/test/scala/routing/RoutingSpec.scala b/akka-actor/src/test/scala/routing/RoutingSpec.scala index b51fa11a0e..a7cf233fb0 100644 --- a/akka-actor/src/test/scala/routing/RoutingSpec.scala +++ b/akka-actor/src/test/scala/routing/RoutingSpec.scala @@ -54,9 +54,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers @Test def testLogger = { val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any] val latch = new CountDownLatch(2) - val t1 = actor { - case _ => - } + val t1 = actorOf(new Actor { def receive = { case _ => } }).start val l = loggerActor(t1,(x) => { msgs.add(x); latch.countDown }).start val foo : Any = "foo" val bar : Any = "bar" @@ -72,18 +70,22 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers @Test def testSmallestMailboxFirstDispatcher = { val t1ProcessedCount = new AtomicInteger(0) val latch = new CountDownLatch(500) - val t1 = actor { - case x => - Thread.sleep(50) // slow actor - t1ProcessedCount.incrementAndGet - latch.countDown - } + val t1 = actorOf(new Actor { + def receive = { + case x => + Thread.sleep(50) // slow actor + t1ProcessedCount.incrementAndGet + latch.countDown + } + }).start val t2ProcessedCount = new AtomicInteger(0) - val t2 = actor { - case x => t2ProcessedCount.incrementAndGet - latch.countDown - } + val t2 = actorOf(new Actor { + def receive = { + case x => t2ProcessedCount.incrementAndGet + latch.countDown + } + }).start val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) for (i <- 1 to 500) d ! i val done = latch.await(10,TimeUnit.SECONDS) @@ -103,12 +105,14 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers }) i.start - def newListener = actor { - case "bar" => - num.incrementAndGet - latch.countDown - case "foo" => foreachListener.countDown - } + def newListener = actorOf(new Actor { + def receive = { + case "bar" => + num.incrementAndGet + latch.countDown + case "foo" => foreachListener.countDown + } + }).start val a1 = newListener val a2 = newListener diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 73389f910b..9553aebf20 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -341,9 +341,9 @@ object AMQP { throw new IllegalArgumentException("Either exchange name or routing key is mandatory") } - val deliveryHandler = actor { - case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) - } + val deliveryHandler = actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) } + } ).start val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get)) @@ -431,11 +431,9 @@ object AMQP { throw new IllegalArgumentException("Either exchange name or routing key is mandatory") } - val deliveryHandler = actor { - case Delivery(payload, _, _, _, _) => { - handler.apply(createProtobufFromBytes[I](payload)) - } - } + val deliveryHandler = actorOf(new Actor { + def receive = { case Delivery(payload, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) } + }).start val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get)) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index 00756aa959..f560acd807 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -69,9 +69,10 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct) - val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor { - case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) - }, None, Some(exchangeParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => + log.info("@george_bush received message from: %s", new String(payload)) } + }).start, None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing") @@ -84,13 +85,15 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { - case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) - }, None, Some(exchangeParameters))) + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => + log.info("@george_bush received message from: %s", new String(payload)) } + }).start, None, Some(exchangeParameters))) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor { - case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) - }, None, Some(exchangeParameters))) + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => + log.info("@barack_obama received message from: %s", new String(payload)) } + }).start, None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: I'm going surfing".getBytes, "") @@ -103,13 +106,15 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { - case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) - }, None, Some(exchangeParameters))) + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => + log.info("@george_bush received message from: %s", new String(payload)) } + }).start, None, Some(exchangeParameters))) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor { - case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) - }, None, Some(exchangeParameters))) + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => + log.info("@barack_obama received message from: %s", new String(payload)) } + }).start, None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush") @@ -120,27 +125,31 @@ object ExampleSession { val channelCountdown = new CountDownLatch(2) - val connectionCallback = actor { - case Connected => log.info("Connection callback: Connected!") - case Reconnecting => () // not used, sent when connection fails and initiates a reconnect - case Disconnected => log.info("Connection callback: Disconnected!") - } + val connectionCallback = actorOf( new Actor { + def receive = { + case Connected => log.info("Connection callback: Connected!") + case Reconnecting => () // not used, sent when connection fails and initiates a reconnect + case Disconnected => log.info("Connection callback: Disconnected!") + } + }).start val connection = AMQP.newConnection(new ConnectionParameters(connectionCallback = Some(connectionCallback))) - val channelCallback = actor { - case Started => { - log.info("Channel callback: Started") - channelCountdown.countDown + val channelCallback = actorOf( new Actor { + def receive = { + case Started => { + log.info("Channel callback: Started") + channelCountdown.countDown + } + case Restarting => // not used, sent when channel or connection fails and initiates a restart + case Stopped => log.info("Channel callback: Stopped") } - case Restarting => // not used, sent when channel or connection fails and initiates a restart - case Stopped => log.info("Channel callback: Stopped") - } + }).start val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor { - case _ => () // not used - }, None, Some(exchangeParameters), channelParameters = Some(channelParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", + actorOf( new Actor { def receive = { case _ => } }).start, + None, Some(exchangeParameters), channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala index abd1e4c498..0187ef0cbe 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala @@ -24,16 +24,18 @@ class AMQPConnectionRecoveryTestIntegration extends JUnitSuite with MustMatchers val reconnectedLatch = new StandardLatch val disconnectedLatch = new StandardLatch - val connectionCallback: ActorRef = Actor.actor({ - case Connected => - if (!connectedLatch.isOpen) { - connectedLatch.open - } else { - reconnectedLatch.open - } - case Reconnecting => reconnectingLatch.open - case Disconnected => disconnectedLatch.open - }) + val connectionCallback: ActorRef = Actor.actorOf( new Actor { + def receive = { + case Connected => + if (!connectedLatch.isOpen) { + connectedLatch.open + } else { + reconnectedLatch.open + } + case Reconnecting => reconnectingLatch.open + case Disconnected => disconnectedLatch.open + } + }).start val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50, connectionCallback = Some(connectionCallback))) try { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala index 03d315187a..ba96439d58 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala @@ -9,11 +9,11 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.TimeUnit -import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{Actor, ActorRef} class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMatchers { @@ -27,24 +27,27 @@ class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMat val consumerStartedLatch = new StandardLatch val consumerRestartedLatch = new StandardLatch - val consumerChannelCallback: ActorRef = actor { - case Started => { - if (!consumerStartedLatch.isOpen) { - consumerStartedLatch.open - } else { - consumerRestartedLatch.open + val consumerChannelCallback: ActorRef = actorOf( new Actor { + def receive = { + case Started => { + if (!consumerStartedLatch.isOpen) { + consumerStartedLatch.open + } else { + consumerRestartedLatch.open + } } + case Restarting => () + case Stopped => () } - case Restarting => () - case Stopped => () - } + }).start val payloadLatch = new StandardLatch val consumerExchangeParameters = ExchangeParameters("text_exchange") val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor { - case Delivery(payload, _, _, _, _) => payloadLatch.open - }, exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open } + }).start, + exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) val listenerLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala index 90889d8dc4..1e8fc23ac6 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala @@ -24,17 +24,19 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must try { val producerStartedLatch = new StandardLatch val producerRestartedLatch = new StandardLatch - val producerChannelCallback: ActorRef = actor { - case Started => { - if (!producerStartedLatch.isOpen) { - producerStartedLatch.open - } else { - producerRestartedLatch.open + val producerChannelCallback: ActorRef = actorOf( new Actor { + def receive = { + case Started => { + if (!producerStartedLatch.isOpen) { + producerStartedLatch.open + } else { + producerRestartedLatch.open + } } + case Restarting => () + case Stopped => () } - case Restarting => () - case Stopped => () - } + }).start val channelParameters = ChannelParameters(channelCallback = Some(producerChannelCallback)) val producer = AMQP.newProducer(connection, ProducerParameters( @@ -44,25 +46,28 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must val consumerStartedLatch = new StandardLatch val consumerRestartedLatch = new StandardLatch - val consumerChannelCallback: ActorRef = actor { - case Started => { - if (!consumerStartedLatch.isOpen) { - consumerStartedLatch.open - } else { - consumerRestartedLatch.open + val consumerChannelCallback: ActorRef = actorOf( new Actor { + def receive = { + case Started => { + if (!consumerStartedLatch.isOpen) { + consumerStartedLatch.open + } else { + consumerRestartedLatch.open + } } + case Restarting => () + case Stopped => () } - case Restarting => () - case Stopped => () - } + }).start val payloadLatch = new StandardLatch val consumerExchangeParameters = ExchangeParameters("text_exchange") val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor { - case Delivery(payload, _, _, _, _) => payloadLatch.open - }, exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor { + def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open } + }).start, + exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters))) consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) val listenerLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala index cfb5c920d4..03f39a669d 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala @@ -8,11 +8,11 @@ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp._ import org.junit.Test -import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.{CountDownLatch, TimeUnit} import org.multiverse.api.latches.StandardLatch import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.amqp.AMQP._ +import se.scalablesolutions.akka.actor.{Actor, ActorRef} class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustMatchers { @@ -21,29 +21,33 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => () - case Stopped => () - } + val channelCallback = actorOf( new Actor { + def receive = { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + }).start val exchangeParameters = ExchangeParameters("text_exchange") val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val failLatch = new StandardLatch val acknowledgeLatch = new StandardLatch var deliveryTagCheck: Long = -1 - val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actor { - case Delivery(payload, _, deliveryTag, _, sender) => { - if (!failLatch.isOpen) { - failLatch.open - error("Make it fail!") - } else { - deliveryTagCheck = deliveryTag - sender.foreach(_ ! Acknowledge(deliveryTag)) + val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actorOf( new Actor { + def receive = { + case Delivery(payload, _, deliveryTag, _, sender) => { + if (!failLatch.isOpen) { + failLatch.open + error("Make it fail!") + } else { + deliveryTagCheck = deliveryTag + sender.foreach(_ ! Acknowledge(deliveryTag)) + } } + case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open } - case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open - }, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters), + }).start, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters), selfAcknowledging = false, channelParameters = Some(channelParameters), queueDeclaration = ActiveDeclaration(autoDelete = false))) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala index f0b21d1286..ecdb6fb785 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala @@ -8,11 +8,11 @@ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp._ import org.junit.Test -import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} import org.multiverse.api.latches.StandardLatch import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.{Actor, ActorRef} class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatchers { @@ -22,21 +22,23 @@ class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatche try { val countDown = new CountDownLatch(2) val restartingLatch = new StandardLatch - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => restartingLatch.open - case Stopped => () - } + val channelCallback = actorOf(new Actor { + def receive = { + case Started => countDown.countDown + case Restarting => restartingLatch.open + case Stopped => () + } + }).start val exchangeParameters = ExchangeParameters("text_exchange") val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val rejectedLatch = new StandardLatch - val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actor { - case Delivery(payload, _, deliveryTag, _, sender) => { - sender.foreach(_ ! Reject(deliveryTag)) + val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actorOf( new Actor { + def receive = { + case Delivery(payload, _, deliveryTag, _, sender) => sender.foreach(_ ! Reject(deliveryTag)) + case Rejected(deliveryTag) => rejectedLatch.open } - case Rejected(deliveryTag) => rejectedLatch.open - }, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters), + }).start, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters), selfAcknowledging = false, channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala index baa6b4e551..5b057d25b2 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala @@ -12,6 +12,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} import org.scalatest.junit.JUnitSuite import org.junit.Test +import se.scalablesolutions.akka.actor.Actor class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers { @@ -19,19 +20,21 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers { def consumerMessage = AMQPTest.withCleanEndState { val connection = AMQP.newConnection() val countDown = new CountDownLatch(2) - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => () - case Stopped => () - } + val channelCallback = actorOf(new Actor { + def receive = { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + }).start val exchangeParameters = ExchangeParameters("text_exchange") val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val payloadLatch = new StandardLatch - val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor { - case Delivery(payload, _, _, _, _) => payloadLatch.open - }, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf(new Actor { + def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open } + }).start, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters))) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala index 81b9c29945..c51c134e40 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala @@ -26,17 +26,19 @@ class AMQPProducerChannelRecoveryTestIntegration extends JUnitSuite with MustMat val restartingLatch = new StandardLatch val restartedLatch = new StandardLatch - val producerCallback: ActorRef = Actor.actor({ - case Started => { - if (!startedLatch.isOpen) { - startedLatch.open - } else { - restartedLatch.open + val producerCallback: ActorRef = Actor.actorOf( new Actor { + def receive = { + case Started => { + if (!startedLatch.isOpen) { + startedLatch.open + } else { + restartedLatch.open + } } + case Restarting => restartingLatch.open + case Stopped => () } - case Restarting => restartingLatch.open - case Stopped => () - }) + }).start val channelParameters = ChannelParameters(channelCallback = Some(producerCallback)) val producerParameters = ProducerParameters( diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala index 9e74bbaf46..253a55f10b 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala @@ -25,17 +25,19 @@ class AMQPProducerConnectionRecoveryTestIntegration extends JUnitSuite with Must val restartingLatch = new StandardLatch val restartedLatch = new StandardLatch - val producerCallback: ActorRef = Actor.actor({ - case Started => { - if (!startedLatch.isOpen) { - startedLatch.open - } else { - restartedLatch.open + val producerCallback: ActorRef = Actor.actorOf(new Actor{ + def receive = { + case Started => { + if (!startedLatch.isOpen) { + startedLatch.open + } else { + restartedLatch.open + } } + case Restarting => restartingLatch.open + case Stopped => () } - case Restarting => restartingLatch.open - case Stopped => () - }) + }).start val channelParameters = ChannelParameters(channelCallback = Some(producerCallback)) val producerParameters = ProducerParameters( diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala index b4f2a49939..0e9b6a152d 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala @@ -13,6 +13,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP._ import org.scalatest.junit.JUnitSuite import org.junit.Test +import se.scalablesolutions.akka.actor.Actor class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers { @@ -22,11 +23,13 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers { val connection = AMQP.newConnection() val countDown = new CountDownLatch(3) - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => () - case Stopped => () - } + val channelCallback = actorOf( new Actor { + def receive = { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + }).start val exchangeName = "text_topic_exchange" val channelParameters = ChannelParameters(channelCallback diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index e961b500f2..b0cbc5ec08 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -143,7 +143,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { var found = RemoteServer.serverFor("localhost", 9990) assert(found.isDefined, "sever not found") - val a = actor { case _ => } + val a = actorOf( new Actor { def receive = { case _ => } } ).start found = RemoteServer.serverFor("localhost", 9990) assert(found.isDefined, "sever not found after creating an actor")