Java API for CamelServiceManager and CamelContextManager (usage of JavaAPI.Option)
This commit is contained in:
parent
353d01cf05
commit
77d5f39955
17 changed files with 188 additions and 126 deletions
|
|
@ -33,6 +33,7 @@ object JavaAPI {
|
|||
sealed abstract class Option[A] extends java.lang.Iterable[A] {
|
||||
def get: A
|
||||
def isDefined: Boolean
|
||||
def asScala: scala.Option[A]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -44,11 +45,10 @@ object JavaAPI {
|
|||
final case class Some[A](v: A) extends Option[A] {
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
val sv = scala.Some(v)
|
||||
|
||||
def get = sv.get
|
||||
def iterator = sv.iterator
|
||||
def get = v
|
||||
def iterator = Iterator.single(v)
|
||||
def isDefined = true
|
||||
def asScala = scala.Some(v)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -60,11 +60,16 @@ object JavaAPI {
|
|||
import scala.collection.JavaConversions._
|
||||
|
||||
def get = throw new NoSuchElementException("None.get")
|
||||
def iterator = scala.None.iterator
|
||||
def iterator = Iterator.empty
|
||||
def isDefined = false
|
||||
def asScala = scala.None
|
||||
}
|
||||
|
||||
def some[A](v: A) = Some(v)
|
||||
def none[A] = None[A]
|
||||
|
||||
implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala
|
||||
implicit def scala2JavaOption[A](o: scala.Option[A]): Option[A] =
|
||||
if (o.isDefined) Some(o.get) else None[A]
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import org.apache.camel.impl.DefaultCamelContext
|
|||
|
||||
import se.scalablesolutions.akka.camel.component.TypedActorComponent
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.util.JavaAPI.{Option => JOption}
|
||||
|
||||
/**
|
||||
* Manages the lifecycle of a CamelContext. Allowed transitions are
|
||||
|
|
@ -18,12 +19,12 @@ import se.scalablesolutions.akka.util.Logging
|
|||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
trait CamelContextLifecycle extends Logging {
|
||||
trait CamelContextLifecycle extends CamelContextLifecycleJavaAPI with Logging {
|
||||
// TODO: enforce correct state transitions
|
||||
// valid: init -> start -> stop -> init ...
|
||||
|
||||
private var _context: CamelContext = _
|
||||
private var _template: ProducerTemplate = _
|
||||
private var _context: Option[CamelContext] = None
|
||||
private var _template: Option[ProducerTemplate] = None
|
||||
|
||||
private var _initialized = false
|
||||
private var _started = false
|
||||
|
|
@ -40,25 +41,25 @@ trait CamelContextLifecycle extends Logging {
|
|||
private[camel] var typedActorRegistry: Map[String, AnyRef] = _
|
||||
|
||||
/**
|
||||
* Returns the managed CamelContext.
|
||||
* Returns <code>Some(CamelContext)</code> if <code>CamelContextLifecycle</code>
|
||||
* has been initialized, otherwise <code>None</code>.
|
||||
*/
|
||||
protected def context: CamelContext = _context
|
||||
protected def context: Option[CamelContext] = _context
|
||||
|
||||
/**
|
||||
* Returns the managed ProducerTemplate.
|
||||
* Returns <code>Some(ProducerTemplate)</code> if <code>CamelContextLifecycle</code>
|
||||
* has been initialized, otherwise <code>None</code>.
|
||||
*/
|
||||
protected def template: ProducerTemplate = _template
|
||||
protected def template: Option[ProducerTemplate] = _template
|
||||
|
||||
/**
|
||||
* Sets the managed CamelContext.
|
||||
*/
|
||||
protected def context_= (context: CamelContext) { _context = context }
|
||||
|
||||
/**
|
||||
* Sets the managed ProducerTemplate.
|
||||
*/
|
||||
protected def template_= (template: ProducerTemplate) { _template = template }
|
||||
def mandatoryContext =
|
||||
if (context.isDefined) context.get
|
||||
else throw new IllegalStateException("no current CamelContext")
|
||||
|
||||
def mandatoryTemplate =
|
||||
if (template.isDefined) template.get
|
||||
else throw new IllegalStateException("no current ProducerTemplate")
|
||||
|
||||
def initialized = _initialized
|
||||
def started = _started
|
||||
|
||||
|
|
@ -66,21 +67,30 @@ trait CamelContextLifecycle extends Logging {
|
|||
* Starts the CamelContext and an associated ProducerTemplate.
|
||||
*/
|
||||
def start = {
|
||||
context.start
|
||||
template.start
|
||||
_started = true
|
||||
log.info("Camel context started")
|
||||
for {
|
||||
c <- context
|
||||
t <- template
|
||||
} {
|
||||
c.start
|
||||
t.start
|
||||
_started = true
|
||||
log.info("Camel context started")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the CamelContext and the associated ProducerTemplate.
|
||||
*/
|
||||
def stop = {
|
||||
template.stop
|
||||
context.stop
|
||||
_initialized = false
|
||||
_started = false
|
||||
log.info("Camel context stopped")
|
||||
for {
|
||||
t <- template
|
||||
c <- context
|
||||
} {
|
||||
t.stop
|
||||
c.stop
|
||||
_started = false
|
||||
log.info("Camel context stopped")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,19 +108,50 @@ trait CamelContextLifecycle extends Logging {
|
|||
def init(context: CamelContext) {
|
||||
this.typedActorComponent = new TypedActorComponent
|
||||
this.typedActorRegistry = typedActorComponent.typedActorRegistry
|
||||
this.context = context
|
||||
this.context.setStreamCaching(true)
|
||||
this.context.addComponent(TypedActorComponent.InternalSchema, typedActorComponent)
|
||||
this.template = context.createProducerTemplate
|
||||
|
||||
context.setStreamCaching(true)
|
||||
context.addComponent(TypedActorComponent.InternalSchema, typedActorComponent)
|
||||
|
||||
this._context = Some(context)
|
||||
this._template = Some(context.createProducerTemplate)
|
||||
|
||||
_initialized = true
|
||||
log.info("Camel context initialized")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API for CamelContextLifecycle.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
trait CamelContextLifecycleJavaAPI { this: CamelContextLifecycle =>
|
||||
/**
|
||||
* Returns <code>Some(CamelContext)</code> if <code>CamelContextLifecycle</code>
|
||||
* has been initialized, otherwise <code>None</code>.
|
||||
*/
|
||||
def getContext: JOption[CamelContext] = context
|
||||
|
||||
/**
|
||||
* Returns <code>Some(ProducerTemplate)</code> if <code>CamelContextLifecycle</code>
|
||||
* has been initialized, otherwise <code>None</code>.
|
||||
*/
|
||||
def getTemplate: JOption[ProducerTemplate] = template
|
||||
}
|
||||
|
||||
/**
|
||||
* Manages a global CamelContext and an associated ProducerTemplate.
|
||||
*/
|
||||
object CamelContextManager extends CamelContextLifecycle {
|
||||
override def context: CamelContext = super.context
|
||||
override def template: ProducerTemplate = super.template
|
||||
object CamelContextManager extends CamelContextLifecycle with CamelContextLifecycleJavaAPI {
|
||||
/**
|
||||
* Returns <code>Some(CamelContext)</code> if <code>CamelContextLifecycle</code>
|
||||
* has been initialized, otherwise <code>None</code>.
|
||||
*/
|
||||
override def context: Option[CamelContext] = super.context
|
||||
|
||||
/**
|
||||
* Returns <code>Some(ProducerTemplate)</code> if <code>CamelContextLifecycle</code>
|
||||
* has been initialized, otherwise <code>None</code>.
|
||||
*/
|
||||
override def template: Option[ProducerTemplate] = super.template
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import se.scalablesolutions.akka.actor.Actor._
|
|||
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
|
||||
import se.scalablesolutions.akka.config.Config._
|
||||
import se.scalablesolutions.akka.util.{Logging, Bootable}
|
||||
import se.scalablesolutions.akka.util.JavaAPI.{Option => JOption, Some => JSome, None => JNone}
|
||||
import se.scalablesolutions.akka.util.JavaAPI.{Option => JOption}
|
||||
|
||||
/**
|
||||
* Publishes (untyped) consumer actors and typed consumer actors via Camel endpoints. Actors
|
||||
|
|
@ -116,7 +116,7 @@ trait CamelService extends Bootable with Logging {
|
|||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
object CamelServiceManager {
|
||||
object CamelServiceManager extends CamelServiceManagerJavaAPI {
|
||||
|
||||
/**
|
||||
* The current (optional) CamelService. Is defined when a CamelService has been started.
|
||||
|
|
@ -145,14 +145,7 @@ object CamelServiceManager {
|
|||
*/
|
||||
def service = _current
|
||||
|
||||
/**
|
||||
* Returns the current CamelService.
|
||||
*
|
||||
* @throws IllegalStateException if there's no current CamelService.
|
||||
*/
|
||||
def getService: JOption[CamelService] = {
|
||||
if (_current.isDefined) JSome(_current.get) else JNone[CamelService]
|
||||
}
|
||||
// TODO: add mandatoryService (throwing exception if service is not defined)
|
||||
|
||||
private[camel] def register(service: CamelService) =
|
||||
if (_current.isDefined) throw new IllegalStateException("current CamelService already registered")
|
||||
|
|
@ -163,6 +156,21 @@ object CamelServiceManager {
|
|||
else throw new IllegalStateException("only current CamelService can be unregistered")
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API for CamelServiceManager.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
trait CamelServiceManagerJavaAPI {
|
||||
/**
|
||||
* Returns <code>Some(CamelService)</code> if <code>CamelService</code>
|
||||
* has been started, <code>None</code> otherwise.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def getService: JOption[CamelService] = CamelServiceManager.service
|
||||
}
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
* Creates a route to the registered consumer actor.
|
||||
*/
|
||||
def handleConsumerRegistered(event: ConsumerRegistered) {
|
||||
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking))
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking))
|
||||
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
|
||||
}
|
||||
|
||||
|
|
@ -31,7 +31,7 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
* Stops the route to the already un-registered consumer actor.
|
||||
*/
|
||||
def handleConsumerUnregistered(event: ConsumerUnregistered) {
|
||||
CamelContextManager.context.stopRoute(event.uuid.toString)
|
||||
CamelContextManager.mandatoryContext.stopRoute(event.uuid.toString)
|
||||
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
|
||||
}
|
||||
|
||||
|
|
@ -43,7 +43,7 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
|
||||
|
||||
CamelContextManager.typedActorRegistry.put(objectId, event.typedActor)
|
||||
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
|
||||
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.typedActor, event.uri))
|
||||
}
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
|
||||
|
||||
CamelContextManager.typedActorRegistry.remove(objectId)
|
||||
CamelContextManager.context.stopRoute(objectId)
|
||||
CamelContextManager.mandatoryContext.stopRoute(objectId)
|
||||
log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.typedActor, event.uri))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
|
|||
* @see CamelContextManager.
|
||||
*/
|
||||
def bodyAs[T](clazz: Class[T]): T =
|
||||
CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](clazz, body)
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
|
||||
|
||||
/**
|
||||
* Returns the body of the message converted to the type <code>T</code>. Conversion is done
|
||||
|
|
@ -35,7 +35,7 @@ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
|
|||
* @see CamelContextManager.
|
||||
*/
|
||||
def bodyAs[T](implicit m: Manifest[T]): T =
|
||||
CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], body)
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], body)
|
||||
|
||||
/**
|
||||
* Returns those headers from this message whose name is contained in <code>names</code>.
|
||||
|
|
@ -53,14 +53,14 @@ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
|
|||
* <code>NoSuchElementException</code> if the header doesn't exist.
|
||||
*/
|
||||
def headerAs[T](name: String)(implicit m: Manifest[T]): T =
|
||||
CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], header(name))
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], header(name))
|
||||
|
||||
/**
|
||||
* Returns the header with given <code>name</code> converted to type given by the <code>clazz</code>
|
||||
* argument. Throws <code>NoSuchElementException</code> if the header doesn't exist.
|
||||
*/
|
||||
def headerAs[T](name: String, clazz: Class[T]): T =
|
||||
CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](clazz, header(name))
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, header(name))
|
||||
|
||||
/**
|
||||
* Creates a Message with a new <code>body</code> using a <code>transformer</code> function.
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ trait ProducerSupport { this: Actor =>
|
|||
* <code>Endpoint</code> object resolved from the current CamelContext with
|
||||
* <code>endpointUri</code>.
|
||||
*/
|
||||
private lazy val endpoint = CamelContextManager.context.getEndpoint(endpointUri)
|
||||
private lazy val endpoint = CamelContextManager.mandatoryContext.getEndpoint(endpointUri)
|
||||
|
||||
/**
|
||||
* <code>SendProcessor</code> for producing messages to <code>endpoint</code>.
|
||||
|
|
|
|||
|
|
@ -13,6 +13,6 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor {
|
|||
public void onReceiveAfterProduce(Object message) {
|
||||
Message msg = (Message)message;
|
||||
String body = msg.bodyAs(String.class);
|
||||
CamelContextManager.template().sendBody("direct:forward-test-1", body);
|
||||
CamelContextManager.mandatoryTemplate().sendBody("direct:forward-test-1", body);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,22 +6,30 @@ import org.scalatest.junit.JUnitSuite
|
|||
|
||||
class CamelContextLifecycleTest extends JUnitSuite with CamelContextLifecycle {
|
||||
@Test def shouldManageCustomCamelContext {
|
||||
assert(context === null)
|
||||
assert(template === null)
|
||||
assert(context === None)
|
||||
assert(template === None)
|
||||
|
||||
intercept[IllegalStateException] { mandatoryContext }
|
||||
intercept[IllegalStateException] { mandatoryTemplate }
|
||||
|
||||
val ctx = new TestCamelContext
|
||||
assert(ctx.isStreamCaching === false)
|
||||
|
||||
init(ctx)
|
||||
assert(context.isStreamCaching === true)
|
||||
assert(!context.asInstanceOf[TestCamelContext].isStarted)
|
||||
// In Camel 2.3 CamelComtext.createProducerTemplate starts
|
||||
// the template before returning it (wasn't started in 2.2)
|
||||
assert(template.asInstanceOf[DefaultProducerTemplate].isStarted)
|
||||
|
||||
assert(mandatoryContext.isStreamCaching === true)
|
||||
assert(!mandatoryContext.asInstanceOf[TestCamelContext].isStarted)
|
||||
assert(mandatoryTemplate.asInstanceOf[DefaultProducerTemplate].isStarted)
|
||||
|
||||
start
|
||||
assert(context.asInstanceOf[TestCamelContext].isStarted)
|
||||
assert(template.asInstanceOf[DefaultProducerTemplate].isStarted)
|
||||
|
||||
assert(mandatoryContext.asInstanceOf[TestCamelContext].isStarted)
|
||||
assert(mandatoryTemplate.asInstanceOf[DefaultProducerTemplate].isStarted)
|
||||
|
||||
stop
|
||||
assert(!context.asInstanceOf[TestCamelContext].isStarted)
|
||||
assert(!template.asInstanceOf[DefaultProducerTemplate].isStarted)
|
||||
|
||||
assert(!mandatoryContext.asInstanceOf[TestCamelContext].isStarted)
|
||||
assert(!mandatoryTemplate.asInstanceOf[DefaultProducerTemplate].isStarted)
|
||||
}
|
||||
|
||||
class TestCamelContext extends DefaultCamelContext
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import se.scalablesolutions.akka.actor._
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
||||
import CamelContextManager.template
|
||||
import CamelContextManager.mandatoryTemplate
|
||||
import ConsumerTest._
|
||||
|
||||
var service: CamelService = _
|
||||
|
|
@ -45,12 +45,12 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
val consumer = actorOf(new TestConsumer("direct:publish-test-2"))
|
||||
"started before starting the CamelService" must {
|
||||
"support an in-out message exchange via its endpoint" in {
|
||||
template.requestBody("direct:publish-test-1", "msg1") must equal ("received msg1")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-1", "msg1") must equal ("received msg1")
|
||||
}
|
||||
}
|
||||
"not started" must {
|
||||
"not have an associated endpoint in the CamelContext" in {
|
||||
CamelContextManager.context.hasEndpoint("direct:publish-test-2") must be (null)
|
||||
CamelContextManager.mandatoryContext.hasEndpoint("direct:publish-test-2") must be (null)
|
||||
}
|
||||
}
|
||||
"started" must {
|
||||
|
|
@ -58,10 +58,10 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
val latch = service.expectEndpointActivationCount(1)
|
||||
consumer.start
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
template.requestBody("direct:publish-test-2", "msg2") must equal ("received msg2")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-2", "msg2") must equal ("received msg2")
|
||||
}
|
||||
"have an associated endpoint in the CamelContext" in {
|
||||
CamelContextManager.context.hasEndpoint("direct:publish-test-2") must not be (null)
|
||||
CamelContextManager.mandatoryContext.hasEndpoint("direct:publish-test-2") must not be (null)
|
||||
}
|
||||
}
|
||||
"stopped" must {
|
||||
|
|
@ -70,7 +70,7 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
consumer.stop
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
template.requestBody("direct:publish-test-2", "msg2")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-2", "msg2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -83,9 +83,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
val latch = service.expectEndpointActivationCount(3)
|
||||
actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
template.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal ("m2: x y")
|
||||
template.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal ("m3: x y")
|
||||
template.requestBodyAndHeader("direct:m4", "x", "test", "y") must equal ("m4: x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal ("m2: x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal ("m3: x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m4", "x", "test", "y") must equal ("m4: x y")
|
||||
}
|
||||
}
|
||||
"stopped" must {
|
||||
|
|
@ -94,13 +94,13 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
TypedActor.stop(actor)
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
template.requestBodyAndHeader("direct:m2", "x", "test", "y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y")
|
||||
}
|
||||
intercept[CamelExecutionException] {
|
||||
template.requestBodyAndHeader("direct:m3", "x", "test", "y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y")
|
||||
}
|
||||
intercept[CamelExecutionException] {
|
||||
template.requestBodyAndHeader("direct:m4", "x", "test", "y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m4", "x", "test", "y")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -113,8 +113,8 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
val latch = service.expectEndpointActivationCount(2)
|
||||
actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl])
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
template.requestBody("direct:publish-test-3", "x") must equal ("foo: x")
|
||||
template.requestBody("direct:publish-test-4", "x") must equal ("bar: x")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal ("foo: x")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal ("bar: x")
|
||||
}
|
||||
}
|
||||
"stopped" must {
|
||||
|
|
@ -123,10 +123,10 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
TypedActor.stop(actor)
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
template.requestBody("direct:publish-test-3", "x")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-3", "x")
|
||||
}
|
||||
intercept[CamelExecutionException] {
|
||||
template.requestBody("direct:publish-test-4", "x")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-4", "x")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -139,7 +139,7 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
val latch = service.expectEndpointActivationCount(1)
|
||||
consumer.start
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
template.requestBodyAndHeader("direct:test-untyped-consumer", "x", "test", "y") must equal ("x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:test-untyped-consumer", "x", "test", "y") must equal ("x y")
|
||||
}
|
||||
}
|
||||
"stopped" must {
|
||||
|
|
@ -148,7 +148,7 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
consumer.stop
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
template.sendBodyAndHeader("direct:test-untyped-consumer", "blah", "test", "blub")
|
||||
mandatoryTemplate.sendBodyAndHeader("direct:test-untyped-consumer", "blah", "test", "blub")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -162,7 +162,7 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
|
||||
try {
|
||||
template.requestBody("direct:publish-test-5", "msg3")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-5", "msg3")
|
||||
fail("expected TimoutException not thrown")
|
||||
} catch {
|
||||
case e => {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
|
|||
override protected def beforeAll = {
|
||||
ActorRegistry.shutdownAll
|
||||
CamelContextManager.init
|
||||
CamelContextManager.context.addRoutes(new TestRoute)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new TestRoute)
|
||||
CamelContextManager.start
|
||||
}
|
||||
|
||||
|
|
@ -239,7 +239,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
|
|||
}
|
||||
}
|
||||
|
||||
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
private def mockEndpoint = CamelContextManager.mandatoryContext.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
}
|
||||
|
||||
object ProducerFeatureTest {
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
then("the published consumer is accessible via its endpoint URI")
|
||||
val response = CamelContextManager.template.requestBody("direct:remote-consumer", "test")
|
||||
val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test")
|
||||
assert(response === "remote actor: test")
|
||||
}
|
||||
}
|
||||
|
|
@ -66,7 +66,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
then("the published method is accessible via its endpoint URI")
|
||||
val response = CamelContextManager.template.requestBody("direct:remote-typed-consumer", "test")
|
||||
val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test")
|
||||
assert(response === "remote typed actor: test")
|
||||
}
|
||||
}
|
||||
|
|
@ -82,7 +82,7 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
then("the published untyped consumer is accessible via its endpoint URI")
|
||||
val response = CamelContextManager.template.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b")
|
||||
val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b")
|
||||
assert(response === "a b")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
|
|||
override protected def beforeAll = {
|
||||
ActorRegistry.shutdownAll
|
||||
CamelContextManager.init
|
||||
CamelContextManager.context.addRoutes(new TestRoute)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new TestRoute)
|
||||
CamelContextManager.start
|
||||
}
|
||||
|
||||
|
|
@ -78,7 +78,7 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
|
|||
|
||||
}
|
||||
|
||||
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
private def mockEndpoint = CamelContextManager.mandatoryContext.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
}
|
||||
|
||||
object UntypedProducerFeatureTest {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
|
|||
override protected def beforeAll = {
|
||||
ActorRegistry.shutdownAll
|
||||
CamelContextManager.init
|
||||
CamelContextManager.context.addRoutes(new TestRoute)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new TestRoute)
|
||||
CamelContextManager.start
|
||||
}
|
||||
|
||||
|
|
@ -30,12 +30,12 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
|
|||
}
|
||||
|
||||
feature("Communicate with an actor via an actor:uuid endpoint") {
|
||||
import CamelContextManager.template
|
||||
import CamelContextManager.mandatoryTemplate
|
||||
|
||||
scenario("one-way communication") {
|
||||
val actor = actorOf[Tester1].start
|
||||
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
template.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
|
||||
mandatoryTemplate.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
|
||||
assert(reply.body === "Martin")
|
||||
|
|
@ -43,36 +43,36 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
|
|||
|
||||
scenario("two-way communication") {
|
||||
val actor = actorOf[Tester2].start
|
||||
assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin")
|
||||
assert(mandatoryTemplate.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin")
|
||||
}
|
||||
|
||||
scenario("two-way communication with timeout") {
|
||||
val actor = actorOf[Tester3].start
|
||||
intercept[RuntimeCamelException] {
|
||||
template.requestBody("actor:uuid:%s?blocking=true" format actor.uuid, "Martin")
|
||||
mandatoryTemplate.requestBody("actor:uuid:%s?blocking=true" format actor.uuid, "Martin")
|
||||
}
|
||||
}
|
||||
|
||||
scenario("two-way communication via a custom route with failure response") {
|
||||
mockEndpoint.expectedBodiesReceived("whatever")
|
||||
template.requestBody("direct:failure-test-1", "whatever")
|
||||
mandatoryTemplate.requestBody("direct:failure-test-1", "whatever")
|
||||
mockEndpoint.assertIsSatisfied
|
||||
}
|
||||
|
||||
scenario("two-way communication via a custom route with exception") {
|
||||
mockEndpoint.expectedBodiesReceived("whatever")
|
||||
template.requestBody("direct:failure-test-2", "whatever")
|
||||
mandatoryTemplate.requestBody("direct:failure-test-2", "whatever")
|
||||
mockEndpoint.assertIsSatisfied
|
||||
}
|
||||
}
|
||||
|
||||
feature("Communicate with an actor via an actor:id endpoint") {
|
||||
import CamelContextManager.template
|
||||
import CamelContextManager.mandatoryTemplate
|
||||
|
||||
scenario("one-way communication") {
|
||||
val actor = actorOf[Tester1].start
|
||||
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
|
||||
template.sendBody("actor:%s" format actor.id, "Martin")
|
||||
mandatoryTemplate.sendBody("actor:%s" format actor.id, "Martin")
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
|
||||
assert(reply.body === "Martin")
|
||||
|
|
@ -80,17 +80,17 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
|
|||
|
||||
scenario("two-way communication") {
|
||||
val actor = actorOf[Tester2].start
|
||||
assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
|
||||
assert(mandatoryTemplate.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
|
||||
}
|
||||
|
||||
scenario("two-way communication via a custom route") {
|
||||
val actor = actorOf[CustomIdActor].start
|
||||
assert(template.requestBody("direct:custom-id-test-1", "Martin") === "Received Martin")
|
||||
assert(template.requestBody("direct:custom-id-test-2", "Martin") === "Received Martin")
|
||||
assert(mandatoryTemplate.requestBody("direct:custom-id-test-1", "Martin") === "Received Martin")
|
||||
assert(mandatoryTemplate.requestBody("direct:custom-id-test-2", "Martin") === "Received Martin")
|
||||
}
|
||||
}
|
||||
|
||||
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
private def mockEndpoint = CamelContextManager.mandatoryContext.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
}
|
||||
|
||||
object ActorComponentFeatureTest {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import se.scalablesolutions.akka.camel._
|
|||
*/
|
||||
class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||
import TypedActorComponentFeatureTest._
|
||||
import CamelContextManager.template
|
||||
import CamelContextManager.mandatoryTemplate
|
||||
|
||||
override protected def beforeAll = {
|
||||
val typedActor = TypedActor.newInstance(classOf[SampleTypedActor], classOf[SampleTypedActorImpl]) // not a consumer
|
||||
|
|
@ -24,7 +24,7 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
|
|||
registry.put("ta", typedActor)
|
||||
|
||||
CamelContextManager.init(new DefaultCamelContext(registry))
|
||||
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new CustomRouteBuilder)
|
||||
CamelContextManager.start
|
||||
|
||||
// Internal registration
|
||||
|
|
@ -41,19 +41,19 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
|
|||
import ExchangePattern._
|
||||
|
||||
scenario("two-way communication with method returning String") {
|
||||
val result1 = template.requestBodyAndHeader("%s:tc?method=m2" format InternalSchema, "x", "test", "y")
|
||||
val result2 = template.requestBodyAndHeader("%s:tc?method=m4" format InternalSchema, "x", "test", "y")
|
||||
val result1 = mandatoryTemplate.requestBodyAndHeader("%s:tc?method=m2" format InternalSchema, "x", "test", "y")
|
||||
val result2 = mandatoryTemplate.requestBodyAndHeader("%s:tc?method=m4" format InternalSchema, "x", "test", "y")
|
||||
assert(result1 === "m2: x y")
|
||||
assert(result2 === "m4: x y")
|
||||
}
|
||||
|
||||
scenario("two-way communication with method returning void") {
|
||||
val result = template.requestBodyAndHeader("%s:tc?method=m5" format InternalSchema, "x", "test", "y")
|
||||
val result = mandatoryTemplate.requestBodyAndHeader("%s:tc?method=m5" format InternalSchema, "x", "test", "y")
|
||||
assert(result === "x") // returns initial body
|
||||
}
|
||||
|
||||
scenario("one-way communication with method returning String") {
|
||||
val result = template.send("%s:tc?method=m2" format InternalSchema, InOnly, new Processor {
|
||||
val result = mandatoryTemplate.send("%s:tc?method=m2" format InternalSchema, InOnly, new Processor {
|
||||
def process(exchange: Exchange) = {
|
||||
exchange.getIn.setBody("x")
|
||||
exchange.getIn.setHeader("test", "y")
|
||||
|
|
@ -65,7 +65,7 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
|
|||
}
|
||||
|
||||
scenario("one-way communication with method returning void") {
|
||||
val result = template.send("%s:tc?method=m5" format InternalSchema, InOnly, new Processor {
|
||||
val result = mandatoryTemplate.send("%s:tc?method=m5" format InternalSchema, InOnly, new Processor {
|
||||
def process(exchange: Exchange) = {
|
||||
exchange.getIn.setBody("x")
|
||||
exchange.getIn.setHeader("test", "y")
|
||||
|
|
@ -81,19 +81,19 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
|
|||
feature("Communicate with an internally-registered typed actor using typed-actor endpoint URIs") {
|
||||
scenario("communication not possible") {
|
||||
intercept[ResolveEndpointFailedException] {
|
||||
template.requestBodyAndHeader("typed-actor:tc?method=m2", "x", "test", "y")
|
||||
mandatoryTemplate.requestBodyAndHeader("typed-actor:tc?method=m2", "x", "test", "y")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
feature("Communicate with an externally-registered typed actor using typed-actor endpoint URIs") {
|
||||
scenario("two-way communication with method returning String") {
|
||||
val result = template.requestBody("typed-actor:ta?method=foo", "test")
|
||||
val result = mandatoryTemplate.requestBody("typed-actor:ta?method=foo", "test")
|
||||
assert(result === "foo: test")
|
||||
}
|
||||
|
||||
scenario("two-way communication with method returning String via custom route") {
|
||||
val result = template.requestBody("direct:test", "test")
|
||||
val result = mandatoryTemplate.requestBody("direct:test", "test")
|
||||
assert(result === "foo: test")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class Boot {
|
|||
|
||||
// Use a custom Camel context and a custom touter builder
|
||||
CamelContextManager.init(new DefaultCamelContext(registry))
|
||||
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new CustomRouteBuilder)
|
||||
|
||||
val producer = actorOf[Producer1]
|
||||
val mediator = actorOf(new Transformer(producer))
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import se.scalablesolutions.akka.camel._
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
object StandaloneApplication extends Application {
|
||||
import CamelContextManager.context
|
||||
import CamelContextManager._
|
||||
import CamelServiceManager._
|
||||
|
||||
// 'externally' register typed actors
|
||||
|
|
@ -21,12 +21,12 @@ object StandaloneApplication extends Application {
|
|||
|
||||
// customize CamelContext
|
||||
CamelContextManager.init(new DefaultCamelContext(registry))
|
||||
CamelContextManager.context.addRoutes(new StandaloneApplicationRoute)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new StandaloneApplicationRoute)
|
||||
|
||||
startCamelService
|
||||
|
||||
// access 'externally' registered typed actors
|
||||
assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test", "msg1"))
|
||||
assert("hello msg1" == mandatoryContext.createProducerTemplate.requestBody("direct:test", "msg1"))
|
||||
|
||||
// set expectations on upcoming endpoint activation
|
||||
val activation = service.get.expectEndpointActivationCount(1)
|
||||
|
|
@ -39,7 +39,7 @@ object StandaloneApplication extends Application {
|
|||
|
||||
// access 'internally' (automatically) registered typed-actors
|
||||
// (see @consume annotation value at TypedConsumer2.foo method)
|
||||
assert("default: msg3" == context.createProducerTemplate.requestBody("direct:default", "msg3"))
|
||||
assert("default: msg3" == mandatoryContext.createProducerTemplate.requestBody("direct:default", "msg3"))
|
||||
|
||||
stopCamelService
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ object StandaloneSpringApplication extends Application {
|
|||
val appctx = new ClassPathXmlApplicationContext("/context-standalone.xml")
|
||||
|
||||
// access 'externally' registered typed actors with typed-actor component
|
||||
assert("hello msg3" == template.requestBody("direct:test3", "msg3"))
|
||||
assert("hello msg3" == mandatoryTemplate.requestBody("direct:test3", "msg3"))
|
||||
|
||||
appctx.close
|
||||
|
||||
|
|
@ -104,7 +104,7 @@ object StandaloneJmsApplication extends Application {
|
|||
|
||||
// Send 10 messages to JMS topic directly
|
||||
for(i <- 1 to 10) {
|
||||
CamelContextManager.template.sendBody(jmsUri, "Camel rocks (%d)" format i)
|
||||
CamelContextManager.mandatoryTemplate.sendBody(jmsUri, "Camel rocks (%d)" format i)
|
||||
}
|
||||
|
||||
stopCamelService
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class CamelServiceSpringFeatureTest extends FeatureSpec with BeforeAndAfterEach
|
|||
scenario("with a custom CamelContext and access a registered typed actor") {
|
||||
val appctx = new ClassPathXmlApplicationContext("/appContextCamelServiceCustom.xml")
|
||||
assert(context.isInstanceOf[SpringCamelContext])
|
||||
assert("hello sample" === template.requestBody("direct:test", "sample"))
|
||||
assert("hello sample" === mandatoryTemplate.requestBody("direct:test", "sample"))
|
||||
appctx.close
|
||||
}
|
||||
|
||||
|
|
@ -35,7 +35,7 @@ class CamelServiceSpringFeatureTest extends FeatureSpec with BeforeAndAfterEach
|
|||
assert(context.isInstanceOf[DefaultCamelContext])
|
||||
context.asInstanceOf[DefaultCamelContext].setRegistry(registry)
|
||||
// access registered typed actor
|
||||
assert("hello sample" === template.requestBody("typed-actor:custom?method=foo", "sample"))
|
||||
assert("hello sample" === mandatoryTemplate.requestBody("typed-actor:custom?method=foo", "sample"))
|
||||
appctx.close
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue