CamelServiceManager.service returns Option[CamelService] (Scala API)
CamelServiceManager.getService() returns Option[CamelService] (Java API) Re #457
This commit is contained in:
parent
a21077707e
commit
353d01cf05
8 changed files with 95 additions and 42 deletions
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor
|
||||||
|
|
||||||
import se.scalablesolutions.akka.stm.Ref
|
import se.scalablesolutions.akka.stm.Ref
|
||||||
import se.scalablesolutions.akka.AkkaException
|
import se.scalablesolutions.akka.AkkaException
|
||||||
import se.scalablesolutions.akka.util.{ Function => JFunc, Procedure => JProc }
|
import se.scalablesolutions.akka.util.JavaAPI.{ Function => JFunc, Procedure => JProc }
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||||
import se.scalablesolutions.akka.actor.Actor._
|
import se.scalablesolutions.akka.actor.Actor._
|
||||||
import se.scalablesolutions.akka.dispatch.CompletableFuture
|
import se.scalablesolutions.akka.dispatch.CompletableFuture
|
||||||
import se.scalablesolutions.akka.AkkaException
|
import se.scalablesolutions.akka.AkkaException
|
||||||
import se.scalablesolutions.akka.util.{ Function, SideEffect }
|
import se.scalablesolutions.akka.util.JavaAPI.{ Function, SideEffect }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements Oz-style dataflow (single assignment) variables.
|
* Implements Oz-style dataflow (single assignment) variables.
|
||||||
|
|
|
||||||
|
|
@ -1,23 +1,70 @@
|
||||||
package se.scalablesolutions.akka.util
|
package se.scalablesolutions.akka.util
|
||||||
|
|
||||||
/** A Function interface
|
object JavaAPI {
|
||||||
* Used to create first-class-functions is Java (sort of)
|
/** A Function interface
|
||||||
* Java API
|
* Used to create first-class-functions is Java (sort of)
|
||||||
*/
|
* Java API
|
||||||
trait Function[T,R] {
|
*/
|
||||||
def apply(param: T): R
|
trait Function[T,R] {
|
||||||
|
def apply(param: T): R
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A Procedure is like a Function, but it doesn't produce a return value
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
trait Procedure[T] {
|
||||||
|
def apply(param: T): Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An executable piece of code that takes no parameters and doesn't return any value
|
||||||
|
*/
|
||||||
|
trait SideEffect {
|
||||||
|
def apply: Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents optional values. Instances of <code>Option</code>
|
||||||
|
* are either instances of case class <code>Some</code> or it is case
|
||||||
|
* object <code>None</code>.
|
||||||
|
* <p>
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
sealed abstract class Option[A] extends java.lang.Iterable[A] {
|
||||||
|
def get: A
|
||||||
|
def isDefined: Boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class <code>Some[A]</code> represents existing values of type
|
||||||
|
* <code>A</code>.
|
||||||
|
* <p>
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
final case class Some[A](v: A) extends Option[A] {
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
|
val sv = scala.Some(v)
|
||||||
|
|
||||||
|
def get = sv.get
|
||||||
|
def iterator = sv.iterator
|
||||||
|
def isDefined = true
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This case object represents non-existent values.
|
||||||
|
* <p>
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
case class None[A]() extends Option[A] {
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
|
def get = throw new NoSuchElementException("None.get")
|
||||||
|
def iterator = scala.None.iterator
|
||||||
|
def isDefined = false
|
||||||
|
}
|
||||||
|
|
||||||
|
def some[A](v: A) = Some(v)
|
||||||
|
def none[A] = None[A]
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A Procedure is like a Function, but it doesn't produce a return value
|
|
||||||
* Java API
|
|
||||||
*/
|
|
||||||
trait Procedure[T] {
|
|
||||||
def apply(param: T): Unit
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An executable piece of code that takes no parameters and doesn't return any value
|
|
||||||
*/
|
|
||||||
trait SideEffect {
|
|
||||||
def apply: Unit
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,8 @@ import org.apache.camel.CamelContext
|
||||||
import se.scalablesolutions.akka.actor.Actor._
|
import se.scalablesolutions.akka.actor.Actor._
|
||||||
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
|
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
|
||||||
import se.scalablesolutions.akka.config.Config._
|
import se.scalablesolutions.akka.config.Config._
|
||||||
import se.scalablesolutions.akka.util.{Bootable, Logging}
|
import se.scalablesolutions.akka.util.{Logging, Bootable}
|
||||||
|
import se.scalablesolutions.akka.util.JavaAPI.{Option => JOption, Some => JSome, None => JNone}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publishes (untyped) consumer actors and typed consumer actors via Camel endpoints. Actors
|
* Publishes (untyped) consumer actors and typed consumer actors via Camel endpoints. Actors
|
||||||
|
|
@ -73,7 +74,7 @@ trait CamelService extends Bootable with Logging {
|
||||||
|
|
||||||
// Register this instance as current CamelService and return it
|
// Register this instance as current CamelService and return it
|
||||||
CamelServiceManager.register(this)
|
CamelServiceManager.register(this)
|
||||||
CamelServiceManager.service
|
CamelServiceManager.service.get
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -136,16 +137,22 @@ object CamelServiceManager {
|
||||||
* @see CamelService#stop
|
* @see CamelService#stop
|
||||||
* @see CamelService#onUnload
|
* @see CamelService#onUnload
|
||||||
*/
|
*/
|
||||||
def stopCamelService = service.stop
|
def stopCamelService = for (s <- service) s.stop
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns <code>Some(CamelService)</code> if <code>CamelService</code>
|
||||||
|
* has been started, <code>None</code> otherwise.
|
||||||
|
*/
|
||||||
|
def service = _current
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the current CamelService.
|
* Returns the current CamelService.
|
||||||
*
|
*
|
||||||
* @throws IllegalStateException if there's no current CamelService.
|
* @throws IllegalStateException if there's no current CamelService.
|
||||||
*/
|
*/
|
||||||
def service =
|
def getService: JOption[CamelService] = {
|
||||||
if (_current.isDefined) _current.get
|
if (_current.isDefined) JSome(_current.get) else JNone[CamelService]
|
||||||
else throw new IllegalStateException("no current CamelService")
|
}
|
||||||
|
|
||||||
private[camel] def register(service: CamelService) =
|
private[camel] def register(service: CamelService) =
|
||||||
if (_current.isDefined) throw new IllegalStateException("current CamelService already registered")
|
if (_current.isDefined) throw new IllegalStateException("current CamelService already registered")
|
||||||
|
|
|
||||||
|
|
@ -10,19 +10,22 @@ import se.scalablesolutions.akka.actor.ActorRegistry
|
||||||
*/
|
*/
|
||||||
class CamelServiceManagerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
class CamelServiceManagerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
||||||
|
|
||||||
override def afterAll = ActorRegistry.shutdownAll
|
override def afterAll = {
|
||||||
|
CamelServiceManager.stopCamelService
|
||||||
|
ActorRegistry.shutdownAll
|
||||||
|
}
|
||||||
|
|
||||||
"A CamelServiceManager" when {
|
"A CamelServiceManager" when {
|
||||||
"the startCamelService method been has been called" must {
|
"the startCamelService method been has been called" must {
|
||||||
"have registered the started CamelService instance" in {
|
"have registered the started CamelService instance" in {
|
||||||
val service = CamelServiceManager.startCamelService
|
val service = CamelServiceManager.startCamelService
|
||||||
CamelServiceManager.service must be theSameInstanceAs (service)
|
CamelServiceManager.service.get must be theSameInstanceAs (service)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"the stopCamelService method been has been called" must {
|
"the stopCamelService method been has been called" must {
|
||||||
"have unregistered the current CamelService instance" in {
|
"have unregistered the current CamelService instance" in {
|
||||||
val service = CamelServiceManager.stopCamelService
|
val service = CamelServiceManager.stopCamelService
|
||||||
intercept[IllegalStateException] { CamelServiceManager.service }
|
CamelServiceManager.service must be (None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -32,13 +35,13 @@ class CamelServiceManagerTest extends WordSpec with BeforeAndAfterAll with MustM
|
||||||
"a CamelService instance has been started externally" must {
|
"a CamelService instance has been started externally" must {
|
||||||
"have registered the started CamelService instance" in {
|
"have registered the started CamelService instance" in {
|
||||||
service.start
|
service.start
|
||||||
CamelServiceManager.service must be theSameInstanceAs (service)
|
CamelServiceManager.service.get must be theSameInstanceAs (service)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"the current CamelService instance has been stopped externally" must {
|
"the current CamelService instance has been stopped externally" must {
|
||||||
"have unregistered the current CamelService instance" in {
|
"have unregistered the current CamelService instance" in {
|
||||||
service.stop
|
service.stop
|
||||||
intercept[IllegalStateException] { CamelServiceManager.service }
|
CamelServiceManager.service must be (None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -54,10 +57,6 @@ class CamelServiceManagerTest extends WordSpec with BeforeAndAfterAll with MustM
|
||||||
"only allow the current CamelService instance to be stopped" in {
|
"only allow the current CamelService instance to be stopped" in {
|
||||||
intercept[IllegalStateException] { CamelServiceFactory.createCamelService.stop }
|
intercept[IllegalStateException] { CamelServiceFactory.createCamelService.stop }
|
||||||
}
|
}
|
||||||
"ensure that the current CamelService instance has been actually started" in {
|
|
||||||
CamelServiceManager.stopCamelService
|
|
||||||
intercept[IllegalStateException] { CamelServiceManager.stopCamelService }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
||||||
val consumer = actorOf[RemoteConsumer].start
|
val consumer = actorOf[RemoteConsumer].start
|
||||||
|
|
||||||
when("remote consumer publication is triggered")
|
when("remote consumer publication is triggered")
|
||||||
var latch = service.expectEndpointActivationCount(1)
|
var latch = service.get.expectEndpointActivationCount(1)
|
||||||
consumer !! "init"
|
consumer !! "init"
|
||||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||||
|
|
||||||
|
|
@ -61,7 +61,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
||||||
val consumer = TypedActor.newRemoteInstance(classOf[SampleRemoteTypedConsumer], classOf[SampleRemoteTypedConsumerImpl], host, port)
|
val consumer = TypedActor.newRemoteInstance(classOf[SampleRemoteTypedConsumer], classOf[SampleRemoteTypedConsumerImpl], host, port)
|
||||||
|
|
||||||
when("remote typed consumer publication is triggered")
|
when("remote typed consumer publication is triggered")
|
||||||
var latch = service.expectEndpointActivationCount(1)
|
var latch = service.get.expectEndpointActivationCount(1)
|
||||||
consumer.foo("init")
|
consumer.foo("init")
|
||||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||||
|
|
||||||
|
|
@ -77,7 +77,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
||||||
val consumer = UntypedActor.actorOf(classOf[SampleRemoteUntypedConsumer]).start
|
val consumer = UntypedActor.actorOf(classOf[SampleRemoteUntypedConsumer]).start
|
||||||
|
|
||||||
when("remote untyped consumer publication is triggered")
|
when("remote untyped consumer publication is triggered")
|
||||||
var latch = service.expectEndpointActivationCount(1)
|
var latch = service.get.expectEndpointActivationCount(1)
|
||||||
consumer.sendRequestReply(Message("init", Map("test" -> "init")))
|
consumer.sendRequestReply(Message("init", Map("test" -> "init")))
|
||||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ object StandaloneApplication extends Application {
|
||||||
assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test", "msg1"))
|
assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test", "msg1"))
|
||||||
|
|
||||||
// set expectations on upcoming endpoint activation
|
// set expectations on upcoming endpoint activation
|
||||||
val activation = service.expectEndpointActivationCount(1)
|
val activation = service.get.expectEndpointActivationCount(1)
|
||||||
|
|
||||||
// 'internally' register typed actor (requires CamelService)
|
// 'internally' register typed actor (requires CamelService)
|
||||||
TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl])
|
TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl])
|
||||||
|
|
@ -86,7 +86,7 @@ object StandaloneJmsApplication extends Application {
|
||||||
startCamelService
|
startCamelService
|
||||||
|
|
||||||
// Expect two consumer endpoints to be activated
|
// Expect two consumer endpoints to be activated
|
||||||
val completion = service.expectEndpointActivationCount(2)
|
val completion = service.get.expectEndpointActivationCount(2)
|
||||||
|
|
||||||
val jmsUri = "jms:topic:test"
|
val jmsUri = "jms:topic:test"
|
||||||
// Wire publisher and consumer using a JMS topic
|
// Wire publisher and consumer using a JMS topic
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ object HttpConcurrencyTestStress {
|
||||||
val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start
|
val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start
|
||||||
val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
|
val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
|
||||||
|
|
||||||
val completion = service.expectEndpointActivationCount(1)
|
val completion = service.get.expectEndpointActivationCount(1)
|
||||||
val server = actorOf(new HttpServerActor(balancer)).start
|
val server = actorOf(new HttpServerActor(balancer)).start
|
||||||
completion.await
|
completion.await
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue