diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index a7ef729e2d..dab4afae8a 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -143,99 +143,6 @@ object Actor extends Logging { */ def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) - /** - * Use to create an anonymous event-driven actor. - *
- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - * - * The actor is started when created. - * Example: - *
- * import Actor._
- *
- * val a = actor {
- * case msg => ... // handle message
- * }
- *
- */
- def actor(body: Receive): ActorRef =
- actorOf(new Actor() {
- self.lifeCycle = Permanent
- def receive: Receive = body
- }).start
-
- /**
- * Use to create an anonymous transactional event-driven actor.
- *
- * The actor is created with a 'permanent' life-cycle configuration, which means that
- * if the actor is supervised and dies it will be restarted.
- *
- * The actor is started when created.
- * Example:
- *
- * import Actor._
- *
- * val a = transactor {
- * case msg => ... // handle message
- * }
- *
- */
- def transactor(body: Receive): ActorRef =
- actorOf(new Transactor() {
- self.lifeCycle = Permanent
- def receive: Receive = body
- }).start
-
- /**
- * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
- * which means that if the actor is supervised and dies it will *not* be restarted.
- *
- * The actor is started when created.
- * Example:
- *
- * import Actor._
- *
- * val a = temporaryActor {
- * case msg => ... // handle message
- * }
- *
- */
- def temporaryActor(body: Receive): ActorRef =
- actorOf(new Actor() {
- self.lifeCycle = Temporary
- def receive = body
- }).start
-
- /**
- * Use to create an anonymous event-driven actor with both an init block and a message loop block.
- *
- * The actor is created with a 'permanent' life-cycle configuration, which means that
- * if the actor is supervised and dies it will be restarted.
- *
- * The actor is started when created.
- * Example:
- *
- * val a = Actor.init {
- * ... // init stuff
- * } receive {
- * case msg => ... // handle message
- * }
- *
- *
- */
- def init[A](body: => Unit) = {
- def handler[A](body: => Unit) = new {
- def receive(handler: Receive) =
- actorOf(new Actor() {
- self.lifeCycle = Permanent
- body
- def receive = handler
- }).start
- }
- handler(body)
- }
-
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
diff --git a/akka-actor/src/test/scala/actor/actor/AgentSpec.scala b/akka-actor/src/test/scala/actor/actor/AgentSpec.scala
index 71911c3ad8..449fd89a7f 100644
--- a/akka-actor/src/test/scala/actor/actor/AgentSpec.scala
+++ b/akka-actor/src/test/scala/actor/actor/AgentSpec.scala
@@ -1,9 +1,9 @@
package se.scalablesolutions.akka.actor
-import se.scalablesolutions.akka.actor.Actor.transactor
import org.scalatest.Suite
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
+import Actor._
import org.junit.runner.RunWith
import org.junit.Test
@@ -45,9 +45,9 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
case object Go
val agent = Agent(5)
val latch = new CountDownLatch(1)
- val tx = transactor {
- case Go => agent send { e => latch.countDown; e + 1 }
- }
+ val tx = actorOf( new Transactor {
+ def receive = { case Go => agent send { e => latch.countDown; e + 1 } }
+ } ).start
tx ! Go
assert(latch.await(5, TimeUnit.SECONDS))
val result = agent()
diff --git a/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala b/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala
index d8892a7bdf..7caa194dbe 100644
--- a/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala
+++ b/akka-actor/src/test/scala/actor/actor/HotSwapSpec.scala
@@ -13,10 +13,9 @@ class HotSwapSpec extends WordSpec with MustMatchers {
"be able to hotswap its behavior with HotSwap(..)" in {
val barrier = new CyclicBarrier(2)
@volatile var _log = ""
- val a = actor {
- case _ =>
- _log += "default"
- }
+ val a = actorOf( new Actor {
+ def receive = { case _ => _log += "default" }
+ }).start
a ! HotSwap {
case _ =>
_log += "swapped"
@@ -58,11 +57,13 @@ class HotSwapSpec extends WordSpec with MustMatchers {
"be able to revert hotswap its behavior with RevertHotSwap(..)" in {
val barrier = new CyclicBarrier(2)
@volatile var _log = ""
- val a = actor {
- case "init" =>
- _log += "init"
- barrier.await
- }
+ val a = actorOf( new Actor {
+ def receive = {
+ case "init" =>
+ _log += "init"
+ barrier.await
+ }
+ }).start
a ! "init"
barrier.await
diff --git a/akka-actor/src/test/scala/misc/SchedulerSpec.scala b/akka-actor/src/test/scala/misc/SchedulerSpec.scala
index 83daff2e01..2c7d43896c 100644
--- a/akka-actor/src/test/scala/misc/SchedulerSpec.scala
+++ b/akka-actor/src/test/scala/misc/SchedulerSpec.scala
@@ -20,9 +20,9 @@ class SchedulerSpec extends JUnitSuite {
case object Tick
val countDownLatch = new CountDownLatch(3)
- val tickActor = actor {
- case Tick => countDownLatch.countDown
- }
+ val tickActor = actorOf(new Actor {
+ def receive = { case Tick => countDownLatch.countDown }
+ }).start
// run every 50 millisec
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
@@ -40,9 +40,9 @@ class SchedulerSpec extends JUnitSuite {
@Test def schedulerShouldScheduleOnce = withCleanEndState {
case object Tick
val countDownLatch = new CountDownLatch(3)
- val tickActor = actor {
- case Tick => countDownLatch.countDown
- }
+ val tickActor = actorOf(new Actor {
+ def receive = { case Tick => countDownLatch.countDown }
+ }).start
// run every 50 millisec
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
Scheduler.scheduleOnce( () => countDownLatch.countDown, 50, TimeUnit.MILLISECONDS)
diff --git a/akka-actor/src/test/scala/routing/RoutingSpec.scala b/akka-actor/src/test/scala/routing/RoutingSpec.scala
index b51fa11a0e..a7cf233fb0 100644
--- a/akka-actor/src/test/scala/routing/RoutingSpec.scala
+++ b/akka-actor/src/test/scala/routing/RoutingSpec.scala
@@ -54,9 +54,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
@Test def testLogger = {
val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
val latch = new CountDownLatch(2)
- val t1 = actor {
- case _ =>
- }
+ val t1 = actorOf(new Actor { def receive = { case _ => } }).start
val l = loggerActor(t1,(x) => { msgs.add(x); latch.countDown }).start
val foo : Any = "foo"
val bar : Any = "bar"
@@ -72,18 +70,22 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
@Test def testSmallestMailboxFirstDispatcher = {
val t1ProcessedCount = new AtomicInteger(0)
val latch = new CountDownLatch(500)
- val t1 = actor {
- case x =>
- Thread.sleep(50) // slow actor
- t1ProcessedCount.incrementAndGet
- latch.countDown
- }
+ val t1 = actorOf(new Actor {
+ def receive = {
+ case x =>
+ Thread.sleep(50) // slow actor
+ t1ProcessedCount.incrementAndGet
+ latch.countDown
+ }
+ }).start
val t2ProcessedCount = new AtomicInteger(0)
- val t2 = actor {
- case x => t2ProcessedCount.incrementAndGet
- latch.countDown
- }
+ val t2 = actorOf(new Actor {
+ def receive = {
+ case x => t2ProcessedCount.incrementAndGet
+ latch.countDown
+ }
+ }).start
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
for (i <- 1 to 500) d ! i
val done = latch.await(10,TimeUnit.SECONDS)
@@ -103,12 +105,14 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
})
i.start
- def newListener = actor {
- case "bar" =>
- num.incrementAndGet
- latch.countDown
- case "foo" => foreachListener.countDown
- }
+ def newListener = actorOf(new Actor {
+ def receive = {
+ case "bar" =>
+ num.incrementAndGet
+ latch.countDown
+ case "foo" => foreachListener.countDown
+ }
+ }).start
val a1 = newListener
val a2 = newListener
diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala
index 73389f910b..9553aebf20 100644
--- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala
+++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala
@@ -341,9 +341,9 @@ object AMQP {
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
}
- val deliveryHandler = actor {
- case Delivery(payload, _, _, _, _) => handler.apply(new String(payload))
- }
+ val deliveryHandler = actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) }
+ } ).start
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
@@ -431,11 +431,9 @@ object AMQP {
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
}
- val deliveryHandler = actor {
- case Delivery(payload, _, _, _, _) => {
- handler.apply(createProtobufFromBytes[I](payload))
- }
- }
+ val deliveryHandler = actorOf(new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) }
+ }).start
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala
index 00756aa959..f560acd807 100644
--- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala
+++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala
@@ -69,9 +69,10 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
- val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor {
- case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
- }, None, Some(exchangeParameters)))
+ val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) =>
+ log.info("@george_bush received message from: %s", new String(payload)) }
+ }).start, None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing")
@@ -84,13 +85,15 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout)
- val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
- case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
- }, None, Some(exchangeParameters)))
+ val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) =>
+ log.info("@george_bush received message from: %s", new String(payload)) }
+ }).start, None, Some(exchangeParameters)))
- val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor {
- case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
- }, None, Some(exchangeParameters)))
+ val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) =>
+ log.info("@barack_obama received message from: %s", new String(payload)) }
+ }).start, None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
@@ -103,13 +106,15 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
- val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
- case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
- }, None, Some(exchangeParameters)))
+ val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) =>
+ log.info("@george_bush received message from: %s", new String(payload)) }
+ }).start, None, Some(exchangeParameters)))
- val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor {
- case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
- }, None, Some(exchangeParameters)))
+ val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) =>
+ log.info("@barack_obama received message from: %s", new String(payload)) }
+ }).start, None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush")
@@ -120,27 +125,31 @@ object ExampleSession {
val channelCountdown = new CountDownLatch(2)
- val connectionCallback = actor {
- case Connected => log.info("Connection callback: Connected!")
- case Reconnecting => () // not used, sent when connection fails and initiates a reconnect
- case Disconnected => log.info("Connection callback: Disconnected!")
- }
+ val connectionCallback = actorOf( new Actor {
+ def receive = {
+ case Connected => log.info("Connection callback: Connected!")
+ case Reconnecting => () // not used, sent when connection fails and initiates a reconnect
+ case Disconnected => log.info("Connection callback: Disconnected!")
+ }
+ }).start
val connection = AMQP.newConnection(new ConnectionParameters(connectionCallback = Some(connectionCallback)))
- val channelCallback = actor {
- case Started => {
- log.info("Channel callback: Started")
- channelCountdown.countDown
+ val channelCallback = actorOf( new Actor {
+ def receive = {
+ case Started => {
+ log.info("Channel callback: Started")
+ channelCountdown.countDown
+ }
+ case Restarting => // not used, sent when channel or connection fails and initiates a restart
+ case Stopped => log.info("Channel callback: Stopped")
}
- case Restarting => // not used, sent when channel or connection fails and initiates a restart
- case Stopped => log.info("Channel callback: Stopped")
- }
+ }).start
val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
- val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor {
- case _ => () // not used
- }, None, Some(exchangeParameters), channelParameters = Some(channelParameters)))
+ val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing",
+ actorOf( new Actor { def receive = { case _ => } }).start,
+ None, Some(exchangeParameters), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala
index abd1e4c498..0187ef0cbe 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTestIntegration.scala
@@ -24,16 +24,18 @@ class AMQPConnectionRecoveryTestIntegration extends JUnitSuite with MustMatchers
val reconnectedLatch = new StandardLatch
val disconnectedLatch = new StandardLatch
- val connectionCallback: ActorRef = Actor.actor({
- case Connected =>
- if (!connectedLatch.isOpen) {
- connectedLatch.open
- } else {
- reconnectedLatch.open
- }
- case Reconnecting => reconnectingLatch.open
- case Disconnected => disconnectedLatch.open
- })
+ val connectionCallback: ActorRef = Actor.actorOf( new Actor {
+ def receive = {
+ case Connected =>
+ if (!connectedLatch.isOpen) {
+ connectedLatch.open
+ } else {
+ reconnectedLatch.open
+ }
+ case Reconnecting => reconnectingLatch.open
+ case Disconnected => disconnectedLatch.open
+ }
+ }).start
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50, connectionCallback = Some(connectionCallback)))
try {
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala
index 03d315187a..ba96439d58 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTestIntegration.scala
@@ -9,11 +9,11 @@ import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.TimeUnit
-import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
import se.scalablesolutions.akka.amqp.AMQP._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor._
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMatchers {
@@ -27,24 +27,27 @@ class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMat
val consumerStartedLatch = new StandardLatch
val consumerRestartedLatch = new StandardLatch
- val consumerChannelCallback: ActorRef = actor {
- case Started => {
- if (!consumerStartedLatch.isOpen) {
- consumerStartedLatch.open
- } else {
- consumerRestartedLatch.open
+ val consumerChannelCallback: ActorRef = actorOf( new Actor {
+ def receive = {
+ case Started => {
+ if (!consumerStartedLatch.isOpen) {
+ consumerStartedLatch.open
+ } else {
+ consumerRestartedLatch.open
+ }
}
+ case Restarting => ()
+ case Stopped => ()
}
- case Restarting => ()
- case Stopped => ()
- }
+ }).start
val payloadLatch = new StandardLatch
val consumerExchangeParameters = ExchangeParameters("text_exchange")
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
- val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
- case Delivery(payload, _, _, _, _) => payloadLatch.open
- }, exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
+ val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
+ }).start,
+ exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala
index 90889d8dc4..1e8fc23ac6 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTestIntegration.scala
@@ -24,17 +24,19 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must
try {
val producerStartedLatch = new StandardLatch
val producerRestartedLatch = new StandardLatch
- val producerChannelCallback: ActorRef = actor {
- case Started => {
- if (!producerStartedLatch.isOpen) {
- producerStartedLatch.open
- } else {
- producerRestartedLatch.open
+ val producerChannelCallback: ActorRef = actorOf( new Actor {
+ def receive = {
+ case Started => {
+ if (!producerStartedLatch.isOpen) {
+ producerStartedLatch.open
+ } else {
+ producerRestartedLatch.open
+ }
}
+ case Restarting => ()
+ case Stopped => ()
}
- case Restarting => ()
- case Stopped => ()
- }
+ }).start
val channelParameters = ChannelParameters(channelCallback = Some(producerChannelCallback))
val producer = AMQP.newProducer(connection, ProducerParameters(
@@ -44,25 +46,28 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must
val consumerStartedLatch = new StandardLatch
val consumerRestartedLatch = new StandardLatch
- val consumerChannelCallback: ActorRef = actor {
- case Started => {
- if (!consumerStartedLatch.isOpen) {
- consumerStartedLatch.open
- } else {
- consumerRestartedLatch.open
+ val consumerChannelCallback: ActorRef = actorOf( new Actor {
+ def receive = {
+ case Started => {
+ if (!consumerStartedLatch.isOpen) {
+ consumerStartedLatch.open
+ } else {
+ consumerRestartedLatch.open
+ }
}
+ case Restarting => ()
+ case Stopped => ()
}
- case Restarting => ()
- case Stopped => ()
- }
+ }).start
val payloadLatch = new StandardLatch
val consumerExchangeParameters = ExchangeParameters("text_exchange")
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
- val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
- case Delivery(payload, _, _, _, _) => payloadLatch.open
- }, exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
+ val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
+ }).start,
+ exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala
index cfb5c920d4..03f39a669d 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTestIntegration.scala
@@ -8,11 +8,11 @@ import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp._
import org.junit.Test
-import se.scalablesolutions.akka.actor.ActorRef
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.multiverse.api.latches.StandardLatch
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP._
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustMatchers {
@@ -21,29 +21,33 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
- val channelCallback = actor {
- case Started => countDown.countDown
- case Restarting => ()
- case Stopped => ()
- }
+ val channelCallback = actorOf( new Actor {
+ def receive = {
+ case Started => countDown.countDown
+ case Restarting => ()
+ case Stopped => ()
+ }
+ }).start
val exchangeParameters = ExchangeParameters("text_exchange")
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val failLatch = new StandardLatch
val acknowledgeLatch = new StandardLatch
var deliveryTagCheck: Long = -1
- val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actor {
- case Delivery(payload, _, deliveryTag, _, sender) => {
- if (!failLatch.isOpen) {
- failLatch.open
- error("Make it fail!")
- } else {
- deliveryTagCheck = deliveryTag
- sender.foreach(_ ! Acknowledge(deliveryTag))
+ val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actorOf( new Actor {
+ def receive = {
+ case Delivery(payload, _, deliveryTag, _, sender) => {
+ if (!failLatch.isOpen) {
+ failLatch.open
+ error("Make it fail!")
+ } else {
+ deliveryTagCheck = deliveryTag
+ sender.foreach(_ ! Acknowledge(deliveryTag))
+ }
}
+ case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
}
- case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
- }, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters),
+ }).start, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters),
selfAcknowledging = false, channelParameters = Some(channelParameters),
queueDeclaration = ActiveDeclaration(autoDelete = false)))
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala
index f0b21d1286..ecdb6fb785 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTestIntegration.scala
@@ -8,11 +8,11 @@ import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp._
import org.junit.Test
-import se.scalablesolutions.akka.actor.ActorRef
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import org.scalatest.junit.JUnitSuite
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatchers {
@@ -22,21 +22,23 @@ class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatche
try {
val countDown = new CountDownLatch(2)
val restartingLatch = new StandardLatch
- val channelCallback = actor {
- case Started => countDown.countDown
- case Restarting => restartingLatch.open
- case Stopped => ()
- }
+ val channelCallback = actorOf(new Actor {
+ def receive = {
+ case Started => countDown.countDown
+ case Restarting => restartingLatch.open
+ case Stopped => ()
+ }
+ }).start
val exchangeParameters = ExchangeParameters("text_exchange")
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val rejectedLatch = new StandardLatch
- val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actor {
- case Delivery(payload, _, deliveryTag, _, sender) => {
- sender.foreach(_ ! Reject(deliveryTag))
+ val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actorOf( new Actor {
+ def receive = {
+ case Delivery(payload, _, deliveryTag, _, sender) => sender.foreach(_ ! Reject(deliveryTag))
+ case Rejected(deliveryTag) => rejectedLatch.open
}
- case Rejected(deliveryTag) => rejectedLatch.open
- }, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters),
+ }).start, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters),
selfAcknowledging = false, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala
index baa6b4e551..5b057d25b2 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala
@@ -12,6 +12,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
+import se.scalablesolutions.akka.actor.Actor
class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers {
@@ -19,19 +20,21 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers {
def consumerMessage = AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val countDown = new CountDownLatch(2)
- val channelCallback = actor {
- case Started => countDown.countDown
- case Restarting => ()
- case Stopped => ()
- }
+ val channelCallback = actorOf(new Actor {
+ def receive = {
+ case Started => countDown.countDown
+ case Restarting => ()
+ case Stopped => ()
+ }
+ }).start
val exchangeParameters = ExchangeParameters("text_exchange")
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val payloadLatch = new StandardLatch
- val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
- case Delivery(payload, _, _, _, _) => payloadLatch.open
- }, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
+ val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf(new Actor {
+ def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
+ }).start, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters)))
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala
index 81b9c29945..c51c134e40 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTestIntegration.scala
@@ -26,17 +26,19 @@ class AMQPProducerChannelRecoveryTestIntegration extends JUnitSuite with MustMat
val restartingLatch = new StandardLatch
val restartedLatch = new StandardLatch
- val producerCallback: ActorRef = Actor.actor({
- case Started => {
- if (!startedLatch.isOpen) {
- startedLatch.open
- } else {
- restartedLatch.open
+ val producerCallback: ActorRef = Actor.actorOf( new Actor {
+ def receive = {
+ case Started => {
+ if (!startedLatch.isOpen) {
+ startedLatch.open
+ } else {
+ restartedLatch.open
+ }
}
+ case Restarting => restartingLatch.open
+ case Stopped => ()
}
- case Restarting => restartingLatch.open
- case Stopped => ()
- })
+ }).start
val channelParameters = ChannelParameters(channelCallback = Some(producerCallback))
val producerParameters = ProducerParameters(
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala
index 9e74bbaf46..253a55f10b 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTestIntegration.scala
@@ -25,17 +25,19 @@ class AMQPProducerConnectionRecoveryTestIntegration extends JUnitSuite with Must
val restartingLatch = new StandardLatch
val restartedLatch = new StandardLatch
- val producerCallback: ActorRef = Actor.actor({
- case Started => {
- if (!startedLatch.isOpen) {
- startedLatch.open
- } else {
- restartedLatch.open
+ val producerCallback: ActorRef = Actor.actorOf(new Actor{
+ def receive = {
+ case Started => {
+ if (!startedLatch.isOpen) {
+ startedLatch.open
+ } else {
+ restartedLatch.open
+ }
}
+ case Restarting => restartingLatch.open
+ case Stopped => ()
}
- case Restarting => restartingLatch.open
- case Stopped => ()
- })
+ }).start
val channelParameters = ChannelParameters(channelCallback = Some(producerCallback))
val producerParameters = ProducerParameters(
diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala
index b4f2a49939..0e9b6a152d 100644
--- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala
+++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala
@@ -13,6 +13,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
+import se.scalablesolutions.akka.actor.Actor
class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
@@ -22,11 +23,13 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
val connection = AMQP.newConnection()
val countDown = new CountDownLatch(3)
- val channelCallback = actor {
- case Started => countDown.countDown
- case Restarting => ()
- case Stopped => ()
- }
+ val channelCallback = actorOf( new Actor {
+ def receive = {
+ case Started => countDown.countDown
+ case Restarting => ()
+ case Stopped => ()
+ }
+ }).start
val exchangeName = "text_topic_exchange"
val channelParameters = ChannelParameters(channelCallback
diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
index e961b500f2..b0cbc5ec08 100644
--- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
@@ -143,7 +143,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
var found = RemoteServer.serverFor("localhost", 9990)
assert(found.isDefined, "sever not found")
- val a = actor { case _ => }
+ val a = actorOf( new Actor { def receive = { case _ => } } ).start
found = RemoteServer.serverFor("localhost", 9990)
assert(found.isDefined, "sever not found after creating an actor")