Removing anonymous actor methods as per discussion on ML
This commit is contained in:
parent
f0d581ebb9
commit
9eb3f80017
17 changed files with 217 additions and 272 deletions
|
|
@ -143,99 +143,6 @@ object Actor extends Logging {
|
|||
*/
|
||||
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
|
||||
|
||||
/**
|
||||
* Use to create an anonymous event-driven actor.
|
||||
* <p/>
|
||||
* The actor is created with a 'permanent' life-cycle configuration, which means that
|
||||
* if the actor is supervised and dies it will be restarted.
|
||||
* <p/>
|
||||
* The actor is started when created.
|
||||
* Example:
|
||||
* <pre>
|
||||
* import Actor._
|
||||
*
|
||||
* val a = actor {
|
||||
* case msg => ... // handle message
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
def actor(body: Receive): ActorRef =
|
||||
actorOf(new Actor() {
|
||||
self.lifeCycle = Permanent
|
||||
def receive: Receive = body
|
||||
}).start
|
||||
|
||||
/**
|
||||
* Use to create an anonymous transactional event-driven actor.
|
||||
* <p/>
|
||||
* The actor is created with a 'permanent' life-cycle configuration, which means that
|
||||
* if the actor is supervised and dies it will be restarted.
|
||||
* <p/>
|
||||
* The actor is started when created.
|
||||
* Example:
|
||||
* <pre>
|
||||
* import Actor._
|
||||
*
|
||||
* val a = transactor {
|
||||
* case msg => ... // handle message
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
def transactor(body: Receive): ActorRef =
|
||||
actorOf(new Transactor() {
|
||||
self.lifeCycle = Permanent
|
||||
def receive: Receive = body
|
||||
}).start
|
||||
|
||||
/**
|
||||
* Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
|
||||
* which means that if the actor is supervised and dies it will *not* be restarted.
|
||||
* <p/>
|
||||
* The actor is started when created.
|
||||
* Example:
|
||||
* <pre>
|
||||
* import Actor._
|
||||
*
|
||||
* val a = temporaryActor {
|
||||
* case msg => ... // handle message
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
def temporaryActor(body: Receive): ActorRef =
|
||||
actorOf(new Actor() {
|
||||
self.lifeCycle = Temporary
|
||||
def receive = body
|
||||
}).start
|
||||
|
||||
/**
|
||||
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
|
||||
* <p/>
|
||||
* The actor is created with a 'permanent' life-cycle configuration, which means that
|
||||
* if the actor is supervised and dies it will be restarted.
|
||||
* <p/>
|
||||
* The actor is started when created.
|
||||
* Example:
|
||||
* <pre>
|
||||
* val a = Actor.init {
|
||||
* ... // init stuff
|
||||
* } receive {
|
||||
* case msg => ... // handle message
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
*/
|
||||
def init[A](body: => Unit) = {
|
||||
def handler[A](body: => Unit) = new {
|
||||
def receive(handler: Receive) =
|
||||
actorOf(new Actor() {
|
||||
self.lifeCycle = Permanent
|
||||
body
|
||||
def receive = handler
|
||||
}).start
|
||||
}
|
||||
handler(body)
|
||||
}
|
||||
|
||||
/**
|
||||
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
|
||||
* the block has been executed.
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.actor.Actor.transactor
|
||||
import org.scalatest.Suite
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import Actor._
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.Test
|
||||
|
|
@ -45,9 +45,9 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
|
|||
case object Go
|
||||
val agent = Agent(5)
|
||||
val latch = new CountDownLatch(1)
|
||||
val tx = transactor {
|
||||
case Go => agent send { e => latch.countDown; e + 1 }
|
||||
}
|
||||
val tx = actorOf( new Transactor {
|
||||
def receive = { case Go => agent send { e => latch.countDown; e + 1 } }
|
||||
} ).start
|
||||
tx ! Go
|
||||
assert(latch.await(5, TimeUnit.SECONDS))
|
||||
val result = agent()
|
||||
|
|
|
|||
|
|
@ -13,10 +13,9 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
"be able to hotswap its behavior with HotSwap(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actor {
|
||||
case _ =>
|
||||
_log += "default"
|
||||
}
|
||||
val a = actorOf( new Actor {
|
||||
def receive = { case _ => _log += "default" }
|
||||
}).start
|
||||
a ! HotSwap {
|
||||
case _ =>
|
||||
_log += "swapped"
|
||||
|
|
@ -58,11 +57,13 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
"be able to revert hotswap its behavior with RevertHotSwap(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actor {
|
||||
case "init" =>
|
||||
_log += "init"
|
||||
barrier.await
|
||||
}
|
||||
val a = actorOf( new Actor {
|
||||
def receive = {
|
||||
case "init" =>
|
||||
_log += "init"
|
||||
barrier.await
|
||||
}
|
||||
}).start
|
||||
|
||||
a ! "init"
|
||||
barrier.await
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ class SchedulerSpec extends JUnitSuite {
|
|||
|
||||
case object Tick
|
||||
val countDownLatch = new CountDownLatch(3)
|
||||
val tickActor = actor {
|
||||
case Tick => countDownLatch.countDown
|
||||
}
|
||||
val tickActor = actorOf(new Actor {
|
||||
def receive = { case Tick => countDownLatch.countDown }
|
||||
}).start
|
||||
// run every 50 millisec
|
||||
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
|
||||
|
||||
|
|
@ -40,9 +40,9 @@ class SchedulerSpec extends JUnitSuite {
|
|||
@Test def schedulerShouldScheduleOnce = withCleanEndState {
|
||||
case object Tick
|
||||
val countDownLatch = new CountDownLatch(3)
|
||||
val tickActor = actor {
|
||||
case Tick => countDownLatch.countDown
|
||||
}
|
||||
val tickActor = actorOf(new Actor {
|
||||
def receive = { case Tick => countDownLatch.countDown }
|
||||
}).start
|
||||
// run every 50 millisec
|
||||
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
|
||||
Scheduler.scheduleOnce( () => countDownLatch.countDown, 50, TimeUnit.MILLISECONDS)
|
||||
|
|
|
|||
|
|
@ -54,9 +54,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
@Test def testLogger = {
|
||||
val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
|
||||
val latch = new CountDownLatch(2)
|
||||
val t1 = actor {
|
||||
case _ =>
|
||||
}
|
||||
val t1 = actorOf(new Actor { def receive = { case _ => } }).start
|
||||
val l = loggerActor(t1,(x) => { msgs.add(x); latch.countDown }).start
|
||||
val foo : Any = "foo"
|
||||
val bar : Any = "bar"
|
||||
|
|
@ -72,18 +70,22 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
@Test def testSmallestMailboxFirstDispatcher = {
|
||||
val t1ProcessedCount = new AtomicInteger(0)
|
||||
val latch = new CountDownLatch(500)
|
||||
val t1 = actor {
|
||||
case x =>
|
||||
Thread.sleep(50) // slow actor
|
||||
t1ProcessedCount.incrementAndGet
|
||||
latch.countDown
|
||||
}
|
||||
val t1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case x =>
|
||||
Thread.sleep(50) // slow actor
|
||||
t1ProcessedCount.incrementAndGet
|
||||
latch.countDown
|
||||
}
|
||||
}).start
|
||||
|
||||
val t2ProcessedCount = new AtomicInteger(0)
|
||||
val t2 = actor {
|
||||
case x => t2ProcessedCount.incrementAndGet
|
||||
latch.countDown
|
||||
}
|
||||
val t2 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case x => t2ProcessedCount.incrementAndGet
|
||||
latch.countDown
|
||||
}
|
||||
}).start
|
||||
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
|
||||
for (i <- 1 to 500) d ! i
|
||||
val done = latch.await(10,TimeUnit.SECONDS)
|
||||
|
|
@ -103,12 +105,14 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
})
|
||||
i.start
|
||||
|
||||
def newListener = actor {
|
||||
case "bar" =>
|
||||
num.incrementAndGet
|
||||
latch.countDown
|
||||
case "foo" => foreachListener.countDown
|
||||
}
|
||||
def newListener = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "bar" =>
|
||||
num.incrementAndGet
|
||||
latch.countDown
|
||||
case "foo" => foreachListener.countDown
|
||||
}
|
||||
}).start
|
||||
|
||||
val a1 = newListener
|
||||
val a2 = newListener
|
||||
|
|
|
|||
|
|
@ -341,9 +341,9 @@ object AMQP {
|
|||
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
|
||||
}
|
||||
|
||||
val deliveryHandler = actor {
|
||||
case Delivery(payload, _, _, _, _) => handler.apply(new String(payload))
|
||||
}
|
||||
val deliveryHandler = actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) }
|
||||
} ).start
|
||||
|
||||
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
|
||||
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
|
||||
|
|
@ -431,11 +431,9 @@ object AMQP {
|
|||
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
|
||||
}
|
||||
|
||||
val deliveryHandler = actor {
|
||||
case Delivery(payload, _, _, _, _) => {
|
||||
handler.apply(createProtobufFromBytes[I](payload))
|
||||
}
|
||||
}
|
||||
val deliveryHandler = actorOf(new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) => handler.apply(createProtobufFromBytes[I](payload)) }
|
||||
}).start
|
||||
|
||||
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
|
||||
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
|
||||
|
|
|
|||
|
|
@ -69,9 +69,10 @@ object ExampleSession {
|
|||
|
||||
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
|
||||
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor {
|
||||
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
|
||||
}, None, Some(exchangeParameters)))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) =>
|
||||
log.info("@george_bush received message from: %s", new String(payload)) }
|
||||
}).start, None, Some(exchangeParameters)))
|
||||
|
||||
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
|
||||
producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing")
|
||||
|
|
@ -84,13 +85,15 @@ object ExampleSession {
|
|||
|
||||
val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout)
|
||||
|
||||
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
|
||||
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
|
||||
}, None, Some(exchangeParameters)))
|
||||
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) =>
|
||||
log.info("@george_bush received message from: %s", new String(payload)) }
|
||||
}).start, None, Some(exchangeParameters)))
|
||||
|
||||
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor {
|
||||
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
|
||||
}, None, Some(exchangeParameters)))
|
||||
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) =>
|
||||
log.info("@barack_obama received message from: %s", new String(payload)) }
|
||||
}).start, None, Some(exchangeParameters)))
|
||||
|
||||
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
|
||||
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
|
||||
|
|
@ -103,13 +106,15 @@ object ExampleSession {
|
|||
|
||||
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
|
||||
|
||||
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
|
||||
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
|
||||
}, None, Some(exchangeParameters)))
|
||||
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) =>
|
||||
log.info("@george_bush received message from: %s", new String(payload)) }
|
||||
}).start, None, Some(exchangeParameters)))
|
||||
|
||||
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor {
|
||||
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
|
||||
}, None, Some(exchangeParameters)))
|
||||
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) =>
|
||||
log.info("@barack_obama received message from: %s", new String(payload)) }
|
||||
}).start, None, Some(exchangeParameters)))
|
||||
|
||||
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
|
||||
producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush")
|
||||
|
|
@ -120,27 +125,31 @@ object ExampleSession {
|
|||
|
||||
val channelCountdown = new CountDownLatch(2)
|
||||
|
||||
val connectionCallback = actor {
|
||||
case Connected => log.info("Connection callback: Connected!")
|
||||
case Reconnecting => () // not used, sent when connection fails and initiates a reconnect
|
||||
case Disconnected => log.info("Connection callback: Disconnected!")
|
||||
}
|
||||
val connectionCallback = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Connected => log.info("Connection callback: Connected!")
|
||||
case Reconnecting => () // not used, sent when connection fails and initiates a reconnect
|
||||
case Disconnected => log.info("Connection callback: Disconnected!")
|
||||
}
|
||||
}).start
|
||||
val connection = AMQP.newConnection(new ConnectionParameters(connectionCallback = Some(connectionCallback)))
|
||||
|
||||
val channelCallback = actor {
|
||||
case Started => {
|
||||
log.info("Channel callback: Started")
|
||||
channelCountdown.countDown
|
||||
val channelCallback = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => {
|
||||
log.info("Channel callback: Started")
|
||||
channelCountdown.countDown
|
||||
}
|
||||
case Restarting => // not used, sent when channel or connection fails and initiates a restart
|
||||
case Stopped => log.info("Channel callback: Stopped")
|
||||
}
|
||||
case Restarting => // not used, sent when channel or connection fails and initiates a restart
|
||||
case Stopped => log.info("Channel callback: Stopped")
|
||||
}
|
||||
}).start
|
||||
val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct)
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
|
||||
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor {
|
||||
case _ => () // not used
|
||||
}, None, Some(exchangeParameters), channelParameters = Some(channelParameters)))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing",
|
||||
actorOf( new Actor { def receive = { case _ => } }).start,
|
||||
None, Some(exchangeParameters), channelParameters = Some(channelParameters)))
|
||||
|
||||
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
|
||||
|
||||
|
|
|
|||
|
|
@ -24,16 +24,18 @@ class AMQPConnectionRecoveryTestIntegration extends JUnitSuite with MustMatchers
|
|||
val reconnectedLatch = new StandardLatch
|
||||
val disconnectedLatch = new StandardLatch
|
||||
|
||||
val connectionCallback: ActorRef = Actor.actor({
|
||||
case Connected =>
|
||||
if (!connectedLatch.isOpen) {
|
||||
connectedLatch.open
|
||||
} else {
|
||||
reconnectedLatch.open
|
||||
}
|
||||
case Reconnecting => reconnectingLatch.open
|
||||
case Disconnected => disconnectedLatch.open
|
||||
})
|
||||
val connectionCallback: ActorRef = Actor.actorOf( new Actor {
|
||||
def receive = {
|
||||
case Connected =>
|
||||
if (!connectedLatch.isOpen) {
|
||||
connectedLatch.open
|
||||
} else {
|
||||
reconnectedLatch.open
|
||||
}
|
||||
case Reconnecting => reconnectingLatch.open
|
||||
case Disconnected => disconnectedLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50, connectionCallback = Some(connectionCallback)))
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -9,11 +9,11 @@ import com.rabbitmq.client.ShutdownSignalException
|
|||
import se.scalablesolutions.akka.amqp._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
|
||||
class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMatchers {
|
||||
|
||||
|
|
@ -27,24 +27,27 @@ class AMQPConsumerChannelRecoveryTestIntegration extends JUnitSuite with MustMat
|
|||
|
||||
val consumerStartedLatch = new StandardLatch
|
||||
val consumerRestartedLatch = new StandardLatch
|
||||
val consumerChannelCallback: ActorRef = actor {
|
||||
case Started => {
|
||||
if (!consumerStartedLatch.isOpen) {
|
||||
consumerStartedLatch.open
|
||||
} else {
|
||||
consumerRestartedLatch.open
|
||||
val consumerChannelCallback: ActorRef = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => {
|
||||
if (!consumerStartedLatch.isOpen) {
|
||||
consumerStartedLatch.open
|
||||
} else {
|
||||
consumerRestartedLatch.open
|
||||
}
|
||||
}
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
|
||||
val payloadLatch = new StandardLatch
|
||||
val consumerExchangeParameters = ExchangeParameters("text_exchange")
|
||||
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
|
||||
case Delivery(payload, _, _, _, _) => payloadLatch.open
|
||||
}, exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
|
||||
}).start,
|
||||
exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
|
||||
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
|
||||
|
||||
val listenerLatch = new StandardLatch
|
||||
|
|
|
|||
|
|
@ -24,17 +24,19 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must
|
|||
try {
|
||||
val producerStartedLatch = new StandardLatch
|
||||
val producerRestartedLatch = new StandardLatch
|
||||
val producerChannelCallback: ActorRef = actor {
|
||||
case Started => {
|
||||
if (!producerStartedLatch.isOpen) {
|
||||
producerStartedLatch.open
|
||||
} else {
|
||||
producerRestartedLatch.open
|
||||
val producerChannelCallback: ActorRef = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => {
|
||||
if (!producerStartedLatch.isOpen) {
|
||||
producerStartedLatch.open
|
||||
} else {
|
||||
producerRestartedLatch.open
|
||||
}
|
||||
}
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(producerChannelCallback))
|
||||
val producer = AMQP.newProducer(connection, ProducerParameters(
|
||||
|
|
@ -44,25 +46,28 @@ class AMQPConsumerConnectionRecoveryTestIntegration extends JUnitSuite with Must
|
|||
|
||||
val consumerStartedLatch = new StandardLatch
|
||||
val consumerRestartedLatch = new StandardLatch
|
||||
val consumerChannelCallback: ActorRef = actor {
|
||||
case Started => {
|
||||
if (!consumerStartedLatch.isOpen) {
|
||||
consumerStartedLatch.open
|
||||
} else {
|
||||
consumerRestartedLatch.open
|
||||
val consumerChannelCallback: ActorRef = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => {
|
||||
if (!consumerStartedLatch.isOpen) {
|
||||
consumerStartedLatch.open
|
||||
} else {
|
||||
consumerRestartedLatch.open
|
||||
}
|
||||
}
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
|
||||
|
||||
val payloadLatch = new StandardLatch
|
||||
val consumerExchangeParameters = ExchangeParameters("text_exchange")
|
||||
val consumerChannelParameters = ChannelParameters(channelCallback = Some(consumerChannelCallback))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
|
||||
case Delivery(payload, _, _, _, _) => payloadLatch.open
|
||||
}, exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf( new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
|
||||
}).start,
|
||||
exchangeParameters = Some(consumerExchangeParameters), channelParameters = Some(consumerChannelParameters)))
|
||||
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
|
||||
|
||||
val listenerLatch = new StandardLatch
|
||||
|
|
|
|||
|
|
@ -8,11 +8,11 @@ import se.scalablesolutions.akka.actor.Actor._
|
|||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
|
||||
class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustMatchers {
|
||||
|
||||
|
|
@ -21,29 +21,33 @@ class AMQPConsumerManualAcknowledgeTestIntegration extends JUnitSuite with MustM
|
|||
val connection = AMQP.newConnection()
|
||||
try {
|
||||
val countDown = new CountDownLatch(2)
|
||||
val channelCallback = actor {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
val channelCallback = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
val exchangeParameters = ExchangeParameters("text_exchange")
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
|
||||
|
||||
val failLatch = new StandardLatch
|
||||
val acknowledgeLatch = new StandardLatch
|
||||
var deliveryTagCheck: Long = -1
|
||||
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actor {
|
||||
case Delivery(payload, _, deliveryTag, _, sender) => {
|
||||
if (!failLatch.isOpen) {
|
||||
failLatch.open
|
||||
error("Make it fail!")
|
||||
} else {
|
||||
deliveryTagCheck = deliveryTag
|
||||
sender.foreach(_ ! Acknowledge(deliveryTag))
|
||||
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.ack.this", actorOf( new Actor {
|
||||
def receive = {
|
||||
case Delivery(payload, _, deliveryTag, _, sender) => {
|
||||
if (!failLatch.isOpen) {
|
||||
failLatch.open
|
||||
error("Make it fail!")
|
||||
} else {
|
||||
deliveryTagCheck = deliveryTag
|
||||
sender.foreach(_ ! Acknowledge(deliveryTag))
|
||||
}
|
||||
}
|
||||
case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
|
||||
}
|
||||
case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
|
||||
}, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters),
|
||||
}).start, queueName = Some("self.ack.queue"), exchangeParameters = Some(exchangeParameters),
|
||||
selfAcknowledging = false, channelParameters = Some(channelParameters),
|
||||
queueDeclaration = ActiveDeclaration(autoDelete = false)))
|
||||
|
||||
|
|
|
|||
|
|
@ -8,11 +8,11 @@ import se.scalablesolutions.akka.actor.Actor._
|
|||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.amqp._
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
|
||||
class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatchers {
|
||||
|
||||
|
|
@ -22,21 +22,23 @@ class AMQPConsumerManualRejectTestIntegration extends JUnitSuite with MustMatche
|
|||
try {
|
||||
val countDown = new CountDownLatch(2)
|
||||
val restartingLatch = new StandardLatch
|
||||
val channelCallback = actor {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => restartingLatch.open
|
||||
case Stopped => ()
|
||||
}
|
||||
val channelCallback = actorOf(new Actor {
|
||||
def receive = {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => restartingLatch.open
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
val exchangeParameters = ExchangeParameters("text_exchange")
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
|
||||
|
||||
val rejectedLatch = new StandardLatch
|
||||
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actor {
|
||||
case Delivery(payload, _, deliveryTag, _, sender) => {
|
||||
sender.foreach(_ ! Reject(deliveryTag))
|
||||
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters("manual.reject.this", actorOf( new Actor {
|
||||
def receive = {
|
||||
case Delivery(payload, _, deliveryTag, _, sender) => sender.foreach(_ ! Reject(deliveryTag))
|
||||
case Rejected(deliveryTag) => rejectedLatch.open
|
||||
}
|
||||
case Rejected(deliveryTag) => rejectedLatch.open
|
||||
}, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters),
|
||||
}).start, queueName = Some("self.reject.queue"), exchangeParameters = Some(exchangeParameters),
|
||||
selfAcknowledging = false, channelParameters = Some(channelParameters)))
|
||||
|
||||
val producer = AMQP.newProducer(connection,
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
|
|||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
|
||||
class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers {
|
||||
|
||||
|
|
@ -19,19 +20,21 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers {
|
|||
def consumerMessage = AMQPTest.withCleanEndState {
|
||||
val connection = AMQP.newConnection()
|
||||
val countDown = new CountDownLatch(2)
|
||||
val channelCallback = actor {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
val channelCallback = actorOf(new Actor {
|
||||
def receive = {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
|
||||
val exchangeParameters = ExchangeParameters("text_exchange")
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
|
||||
|
||||
val payloadLatch = new StandardLatch
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
|
||||
case Delivery(payload, _, _, _, _) => payloadLatch.open
|
||||
}, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
|
||||
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actorOf(new Actor {
|
||||
def receive = { case Delivery(payload, _, _, _, _) => payloadLatch.open }
|
||||
}).start, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
|
||||
|
||||
val producer = AMQP.newProducer(connection,
|
||||
ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters)))
|
||||
|
|
|
|||
|
|
@ -26,17 +26,19 @@ class AMQPProducerChannelRecoveryTestIntegration extends JUnitSuite with MustMat
|
|||
val restartingLatch = new StandardLatch
|
||||
val restartedLatch = new StandardLatch
|
||||
|
||||
val producerCallback: ActorRef = Actor.actor({
|
||||
case Started => {
|
||||
if (!startedLatch.isOpen) {
|
||||
startedLatch.open
|
||||
} else {
|
||||
restartedLatch.open
|
||||
val producerCallback: ActorRef = Actor.actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => {
|
||||
if (!startedLatch.isOpen) {
|
||||
startedLatch.open
|
||||
} else {
|
||||
restartedLatch.open
|
||||
}
|
||||
}
|
||||
case Restarting => restartingLatch.open
|
||||
case Stopped => ()
|
||||
}
|
||||
case Restarting => restartingLatch.open
|
||||
case Stopped => ()
|
||||
})
|
||||
}).start
|
||||
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(producerCallback))
|
||||
val producerParameters = ProducerParameters(
|
||||
|
|
|
|||
|
|
@ -25,17 +25,19 @@ class AMQPProducerConnectionRecoveryTestIntegration extends JUnitSuite with Must
|
|||
val restartingLatch = new StandardLatch
|
||||
val restartedLatch = new StandardLatch
|
||||
|
||||
val producerCallback: ActorRef = Actor.actor({
|
||||
case Started => {
|
||||
if (!startedLatch.isOpen) {
|
||||
startedLatch.open
|
||||
} else {
|
||||
restartedLatch.open
|
||||
val producerCallback: ActorRef = Actor.actorOf(new Actor{
|
||||
def receive = {
|
||||
case Started => {
|
||||
if (!startedLatch.isOpen) {
|
||||
startedLatch.open
|
||||
} else {
|
||||
restartedLatch.open
|
||||
}
|
||||
}
|
||||
case Restarting => restartingLatch.open
|
||||
case Stopped => ()
|
||||
}
|
||||
case Restarting => restartingLatch.open
|
||||
case Stopped => ()
|
||||
})
|
||||
}).start
|
||||
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(producerCallback))
|
||||
val producerParameters = ProducerParameters(
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
|
|||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
|
||||
class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
|
||||
|
||||
|
|
@ -22,11 +23,13 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
|
|||
val connection = AMQP.newConnection()
|
||||
|
||||
val countDown = new CountDownLatch(3)
|
||||
val channelCallback = actor {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
val channelCallback = actorOf( new Actor {
|
||||
def receive = {
|
||||
case Started => countDown.countDown
|
||||
case Restarting => ()
|
||||
case Stopped => ()
|
||||
}
|
||||
}).start
|
||||
|
||||
val exchangeName = "text_topic_exchange"
|
||||
val channelParameters = ChannelParameters(channelCallback
|
||||
|
|
|
|||
|
|
@ -143,7 +143,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
var found = RemoteServer.serverFor("localhost", 9990)
|
||||
assert(found.isDefined, "sever not found")
|
||||
|
||||
val a = actor { case _ => }
|
||||
val a = actorOf( new Actor { def receive = { case _ => } } ).start
|
||||
|
||||
found = RemoteServer.serverFor("localhost", 9990)
|
||||
assert(found.isDefined, "sever not found after creating an actor")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue