diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala
index 3795b8a7fb..b8aaacfe5b 100644
--- a/akka-camel/src/main/scala/CamelService.scala
+++ b/akka-camel/src/main/scala/CamelService.scala
@@ -25,25 +25,22 @@ trait CamelService extends Bootable with Logging {
private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
private[camel] val publishRequestor = actorOf[PublishRequestor]
- // add listener for actor registration events
- ActorRegistry.addListener(publishRequestor)
-
- // add listener for AspectInit registration events
- AspectInitRegistry.addListener(publishRequestor)
+ private val serviceEnabled = config.getBool("akka.camel.service", true)
/**
* Starts this CamelService unless akka.camel.service is set to false.
*/
abstract override def onLoad = {
+ if (serviceEnabled) registerPublishRequestor
super.onLoad
- if (config.getBool("akka.camel.service", true)) start
+ if (serviceEnabled) start
}
/**
* Stops this CamelService unless akka.camel.service is set to false.
*/
abstract override def onUnload = {
- if (config.getBool("akka.camel.service", true)) stop
+ if (serviceEnabled) stop
super.onUnload
}
@@ -62,6 +59,8 @@ trait CamelService extends Bootable with Logging {
* on a remote node).
*/
def start: CamelService = {
+ if (!publishRequestorRegistered) registerPublishRequestor
+
// Only init and start if not already done by application
if (!CamelContextManager.initialized) CamelContextManager.init
if (!CamelContextManager.started) CamelContextManager.start
@@ -86,8 +85,7 @@ trait CamelService extends Bootable with Logging {
CamelServiceManager.unregister(this)
// Remove related listeners from registry
- ActorRegistry.removeListener(publishRequestor)
- AspectInitRegistry.removeListener(publishRequestor)
+ unregisterPublishRequestor
// Stop related services
consumerPublisher.stop
@@ -109,6 +107,21 @@ trait CamelService extends Bootable with Logging {
*/
def expectEndpointDeactivationCount(count: Int): CountDownLatch =
(consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get
+
+ private[camel] def publishRequestorRegistered: Boolean = {
+ ActorRegistry.hasListener(publishRequestor) ||
+ AspectInitRegistry.hasListener(publishRequestor)
+ }
+
+ private[camel] def registerPublishRequestor: Unit = {
+ ActorRegistry.addListener(publishRequestor)
+ AspectInitRegistry.addListener(publishRequestor)
+ }
+
+ private[camel] def unregisterPublishRequestor: Unit = {
+ ActorRegistry.removeListener(publishRequestor)
+ AspectInitRegistry.removeListener(publishRequestor)
+ }
}
/**
diff --git a/akka-camel/src/test/scala/ConsumerTest.scala b/akka-camel/src/test/scala/ConsumerTest.scala
index 0af8aec7d5..1a126e3d52 100644
--- a/akka-camel/src/test/scala/ConsumerTest.scala
+++ b/akka-camel/src/test/scala/ConsumerTest.scala
@@ -23,6 +23,8 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
ActorRegistry.shutdownAll
// create new CamelService instance
service = CamelServiceFactory.createCamelService
+ // Register publish requestor as listener
+ service.registerPublishRequestor
// register test consumer before starting the CamelService
actorOf(new TestConsumer("direct:publish-test-1")).start
// start consumer publisher, otherwise we cannot set message