initial support for publishing ActiveObject methods at Camel endpoints
This commit is contained in:
parent
2d2bdda330
commit
1d2a6c559b
14 changed files with 335 additions and 74 deletions
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.camel.component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.camel.Endpoint;
|
||||
import org.apache.camel.Processor;
|
||||
import org.apache.camel.component.bean.BeanComponent;
|
||||
import org.apache.camel.component.bean.BeanEndpoint;
|
||||
import org.apache.camel.component.bean.BeanHolder;
|
||||
|
||||
/**
|
||||
* Camel component for accessing active objects.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class ActiveObjectComponent extends BeanComponent {
|
||||
|
||||
public static final String DEFAULT_SCHEMA = "actobj";
|
||||
|
||||
private Map<String, Object> registry = new ConcurrentHashMap<String, Object>();
|
||||
|
||||
public Map<String, Object> getActiveObjectRegistry() {
|
||||
return registry;
|
||||
}
|
||||
|
||||
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
|
||||
BeanEndpoint beanEndpoint = new BeanEndpoint(uri, this);
|
||||
beanEndpoint.setBeanName(remaining);
|
||||
beanEndpoint.setBeanHolder(createBeanHolder(remaining));
|
||||
Processor processor = beanEndpoint.getProcessor();
|
||||
setProperties(processor, parameters);
|
||||
return beanEndpoint;
|
||||
}
|
||||
|
||||
private BeanHolder createBeanHolder(String beanName) throws Exception {
|
||||
BeanHolder holder = new ActiveObjectHolder(registry, getCamelContext(), beanName).createCacheHolder();
|
||||
registry.remove(beanName);
|
||||
return holder;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.camel.component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.NoSuchBeanException;
|
||||
import org.apache.camel.component.bean.BeanInfo;
|
||||
import org.apache.camel.component.bean.RegistryBean;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class ActiveObjectHolder extends RegistryBean {
|
||||
|
||||
private Map<String, Object> activeObjectRegistry;
|
||||
|
||||
public ActiveObjectHolder(Map<String, Object> activeObjectRegistry, CamelContext context, String name) {
|
||||
super(context, name);
|
||||
this.activeObjectRegistry = activeObjectRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BeanInfo getBeanInfo() {
|
||||
return new BeanInfo(getContext(), getBean().getClass(), getParameterMappingStrategy());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getBean() throws NoSuchBeanException {
|
||||
return activeObjectRegistry.get(getName());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ package se.scalablesolutions.akka.camel
|
|||
import org.apache.camel.{ProducerTemplate, CamelContext}
|
||||
import org.apache.camel.impl.DefaultCamelContext
|
||||
|
||||
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
/**
|
||||
|
|
@ -26,7 +27,18 @@ trait CamelContextLifecycle extends Logging {
|
|||
private var _started = false
|
||||
|
||||
/**
|
||||
* Returns the managed CamelContext.
|
||||
* Camel component for accessing active objects.
|
||||
*/
|
||||
private[camel] var activeObjectComponent: ActiveObjectComponent = _
|
||||
|
||||
/**
|
||||
* Registry in which active objects are TEMPORARILY registered during
|
||||
* creation of Camel routes to active objects.
|
||||
*/
|
||||
private[camel] var activeObjectRegistry: java.util.Map[String, AnyRef] = _
|
||||
|
||||
/**
|
||||
* Returns the managed CamelContext.
|
||||
*/
|
||||
protected def context: CamelContext = _context
|
||||
|
||||
|
|
@ -80,8 +92,11 @@ trait CamelContextLifecycle extends Logging {
|
|||
* caching they can do so after this method returned and prior to calling start.
|
||||
*/
|
||||
def init(context: CamelContext) {
|
||||
this.activeObjectComponent = new ActiveObjectComponent
|
||||
this.activeObjectRegistry = activeObjectComponent.getActiveObjectRegistry
|
||||
this.context = context
|
||||
this.context.setStreamCaching(true)
|
||||
this.context.addComponent(ActiveObjectComponent.DEFAULT_SCHEMA, activeObjectComponent)
|
||||
this.template = context.createProducerTemplate
|
||||
_initialized = true
|
||||
log.info("Camel context initialized")
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@
|
|||
|
||||
package se.scalablesolutions.akka.camel.service
|
||||
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry}
|
||||
import se.scalablesolutions.akka.camel.CamelContextManager
|
||||
import se.scalablesolutions.akka.util.{Bootable, Logging}
|
||||
|
||||
|
|
@ -21,7 +21,13 @@ trait CamelService extends Bootable with Logging {
|
|||
import CamelContextManager._
|
||||
|
||||
private[camel] val consumerPublisher = actorOf[ConsumerPublisher]
|
||||
private[camel] val publishRequestor = actorOf(new PublishRequestor(consumerPublisher))
|
||||
private[camel] val publishRequestor = actorOf[PublishRequestor]
|
||||
|
||||
// add listener for actor registration events
|
||||
ActorRegistry.addListener(publishRequestor)
|
||||
|
||||
// add listener for AspectInit registration events
|
||||
AspectInitRegistry.addListener(publishRequestor)
|
||||
|
||||
/**
|
||||
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
|
||||
|
|
@ -38,19 +44,16 @@ trait CamelService extends Bootable with Logging {
|
|||
// start actor that exposes consumer actors via Camel endpoints
|
||||
consumerPublisher.start
|
||||
|
||||
// add listener for actor registration events
|
||||
ActorRegistry.addRegistrationListener(publishRequestor.start)
|
||||
|
||||
// publish already registered consumer actors
|
||||
for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
|
||||
// init publishRequestor so that buffered and future events are delivered to consumerPublisher
|
||||
publishRequestor ! PublishRequestorInit(consumerPublisher)
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the CamelService.
|
||||
*/
|
||||
abstract override def onUnload = {
|
||||
ActorRegistry.removeRegistrationListener(publishRequestor)
|
||||
publishRequestor.stop
|
||||
ActorRegistry.removeListener(publishRequestor)
|
||||
AspectInitRegistry.removeListener(publishRequestor)
|
||||
consumerPublisher.stop
|
||||
stop
|
||||
super.onUnload
|
||||
|
|
@ -82,5 +85,14 @@ object CamelService {
|
|||
/**
|
||||
* Creates a new CamelService instance.
|
||||
*/
|
||||
def newInstance: CamelService = new CamelService {}
|
||||
def newInstance: CamelService = new DefaultCamelService
|
||||
}
|
||||
|
||||
/**
|
||||
* Default CamelService implementation to be created in Java applications with
|
||||
* <pre>
|
||||
* CamelService service = new DefaultCamelService()
|
||||
* </pre>
|
||||
*/
|
||||
class DefaultCamelService extends CamelService {
|
||||
}
|
||||
|
|
@ -4,14 +4,17 @@
|
|||
package se.scalablesolutions.akka.camel.service
|
||||
|
||||
import java.io.InputStream
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import org.apache.camel.builder.RouteBuilder
|
||||
|
||||
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.actor.annotation.consume
|
||||
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
|
||||
import collection.mutable.ListBuffer
|
||||
|
||||
/**
|
||||
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
|
||||
|
|
@ -22,20 +25,29 @@ import se.scalablesolutions.akka.util.Logging
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
class ConsumerPublisher extends Actor with Logging {
|
||||
@volatile private var publishLatch = new CountDownLatch(0)
|
||||
@volatile private var unpublishLatch = new CountDownLatch(0)
|
||||
|
||||
// TODO: redesign waiting (actor testing) mechanism
|
||||
// - do not use custom methods on actor
|
||||
// - only interact by passing messages
|
||||
// TODO: factor out code that is needed for testing
|
||||
|
||||
@volatile private var publishActorLatch = new CountDownLatch(0)
|
||||
@volatile private var unpublishActorLatch = new CountDownLatch(0)
|
||||
|
||||
/**
|
||||
* Adds a route to the actor identified by a Publish message to the global CamelContext.
|
||||
* Adds a route to the actor identified by a Publish message to the global CamelContext.
|
||||
*/
|
||||
protected def receive = {
|
||||
case r: ConsumerRegistered => {
|
||||
handleConsumerRegistered(r)
|
||||
publishLatch.countDown // needed for testing only.
|
||||
publishActorLatch.countDown // needed for testing only.
|
||||
}
|
||||
case u: ConsumerUnregistered => {
|
||||
handleConsumerUnregistered(u)
|
||||
unpublishLatch.countDown // needed for testing only.
|
||||
unpublishActorLatch.countDown // needed for testing only.
|
||||
}
|
||||
case d: ConsumerMethodRegistered => {
|
||||
handleConsumerMethodDetected(d)
|
||||
}
|
||||
case _ => { /* ignore */}
|
||||
}
|
||||
|
|
@ -43,24 +55,24 @@ class ConsumerPublisher extends Actor with Logging {
|
|||
/**
|
||||
* Sets the expected number of actors to be published. Used for testing only.
|
||||
*/
|
||||
private[camel] def expectPublishCount(count: Int): Unit =
|
||||
publishLatch = new CountDownLatch(count)
|
||||
private[camel] def expectPublishActorCount(count: Int): Unit =
|
||||
publishActorLatch = new CountDownLatch(count)
|
||||
|
||||
/**
|
||||
* Sets the expected number of actors to be unpublished. Used for testing only.
|
||||
*/
|
||||
private[camel] def expectUnpublishCount(count: Int): Unit =
|
||||
unpublishLatch = new CountDownLatch(count)
|
||||
private[camel] def expectUnpublishActorCount(count: Int): Unit =
|
||||
unpublishActorLatch = new CountDownLatch(count)
|
||||
|
||||
/**
|
||||
* Waits for the expected number of actors to be published. Used for testing only.
|
||||
*/
|
||||
private[camel] def awaitPublish = publishLatch.await
|
||||
private[camel] def awaitPublishActor = publishActorLatch.await
|
||||
|
||||
/**
|
||||
* Waits for the expected number of actors to be unpublished. Used for testing only.
|
||||
*/
|
||||
private[camel] def awaitUnpublish = unpublishLatch.await
|
||||
private[camel] def awaitUnpublishActor = unpublishActorLatch.await
|
||||
|
||||
/**
|
||||
* Creates a route to the registered consumer actor.
|
||||
|
|
@ -77,6 +89,17 @@ class ConsumerPublisher extends Actor with Logging {
|
|||
CamelContextManager.context.stopRoute(event.id)
|
||||
log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri))
|
||||
}
|
||||
|
||||
private def handleConsumerMethodDetected(event: ConsumerMethodRegistered) {
|
||||
// using the actor uuid is highly experimental
|
||||
val objectId = event.init.actorRef.uuid
|
||||
val targetClass = event.init.target.getName
|
||||
val targetMethod = event.method.getName
|
||||
|
||||
CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject)
|
||||
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
|
||||
log.info("published method %s.%s (%s) at endpoint %s" format (targetClass, targetMethod, objectId, event.uri))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -90,6 +113,12 @@ class ConsumerPublisher extends Actor with Logging {
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
|
||||
//
|
||||
//
|
||||
// TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute
|
||||
//
|
||||
//
|
||||
|
||||
// TODO: make conversions configurable
|
||||
private val bodyConversions = Map(
|
||||
"file" -> classOf[InputStream]
|
||||
|
|
@ -106,20 +135,69 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends
|
|||
private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
|
||||
}
|
||||
|
||||
class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends RouteBuilder {
|
||||
//
|
||||
//
|
||||
// TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute
|
||||
//
|
||||
//
|
||||
|
||||
// TODO: make conversions configurable
|
||||
private val bodyConversions = Map(
|
||||
"file" -> classOf[InputStream]
|
||||
)
|
||||
|
||||
def configure = {
|
||||
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
|
||||
bodyConversions.get(schema) match {
|
||||
case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(activeObjectUri)
|
||||
case None => from(endpointUri).to(activeObjectUri)
|
||||
}
|
||||
}
|
||||
|
||||
private def activeObjectUri = "%s:%s?method=%s" format (ActiveObjectComponent.DEFAULT_SCHEMA, id, method)
|
||||
}
|
||||
|
||||
/**
|
||||
* A registration listener that triggers publication and un-publication of consumer actors.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
class PublishRequestor(consumerPublisher: ActorRef) extends Actor {
|
||||
class PublishRequestor extends Actor {
|
||||
private val events = ListBuffer[ConsumerEvent]()
|
||||
private var publisher: Option[ActorRef] = None
|
||||
|
||||
protected def receive = {
|
||||
case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event
|
||||
case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event
|
||||
case ActorRegistered(actor) =>
|
||||
for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
|
||||
case ActorUnregistered(actor) =>
|
||||
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
|
||||
case AspectInitRegistered(proxy, init) =>
|
||||
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
|
||||
case PublishRequestorInit(pub) => {
|
||||
publisher = Some(pub)
|
||||
deliverBufferedEvents
|
||||
}
|
||||
case _ => { /* ignore */ }
|
||||
}
|
||||
|
||||
private def deliverCurrentEvent(event: ConsumerEvent) = {
|
||||
publisher match {
|
||||
case Some(pub) => pub ! event
|
||||
case None => events += event
|
||||
}
|
||||
}
|
||||
|
||||
private def deliverBufferedEvents = {
|
||||
for (event <- events) deliverCurrentEvent(event)
|
||||
events.clear
|
||||
}
|
||||
}
|
||||
|
||||
case class PublishRequestorInit(consumerPublisher: ActorRef)
|
||||
|
||||
/**
|
||||
* Consumer actor lifecycle event.
|
||||
* A consumer event.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
@ -151,6 +229,8 @@ case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Bool
|
|||
*/
|
||||
case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
|
||||
|
||||
case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
@ -179,6 +259,17 @@ private[camel] object ConsumerUnregistered {
|
|||
}
|
||||
}
|
||||
|
||||
private[camel] object ConsumerMethodRegistered {
|
||||
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
|
||||
// TODO: support/test annotations on interface methods
|
||||
// TODO: support/test annotations on superclass methods
|
||||
// TODO: document that overloaded methods are not supported
|
||||
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
|
||||
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
|
||||
yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Describes a consumer actor with elements that are relevant for publishing an actor at a
|
||||
* Camel endpoint (or unpublishing an actor from an endpoint).
|
||||
|
|
|
|||
|
|
@ -33,21 +33,23 @@ object CamelServiceFeatureTest {
|
|||
class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
|
||||
import CamelServiceFeatureTest._
|
||||
|
||||
var service: CamelService = CamelService.newInstance
|
||||
var service: CamelService = _
|
||||
|
||||
override protected def beforeAll = {
|
||||
ActorRegistry.shutdownAll
|
||||
// create new CamelService instance
|
||||
service = CamelService.newInstance
|
||||
// register test consumer before starting the CamelService
|
||||
actorOf(new TestConsumer("direct:publish-test-1")).start
|
||||
// Consigure a custom camel route
|
||||
// Configure a custom camel route
|
||||
CamelContextManager.init
|
||||
CamelContextManager.context.addRoutes(new TestRoute)
|
||||
// set expectations for testing purposes
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishActorCount(1)
|
||||
// start the CamelService
|
||||
service.load
|
||||
// await publication of first test consumer
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublishActor
|
||||
}
|
||||
|
||||
override protected def afterAll = {
|
||||
|
|
@ -60,9 +62,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
scenario("access registered consumer actors via Camel direct-endpoints") {
|
||||
|
||||
given("two consumer actors registered before and after CamelService startup")
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishActorCount(1)
|
||||
actorOf(new TestConsumer("direct:publish-test-2")).start
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublishActor
|
||||
|
||||
when("requests are sent to these actors")
|
||||
val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1")
|
||||
|
|
@ -81,14 +83,14 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
|
|||
|
||||
given("a consumer actor that has been stopped")
|
||||
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishActorCount(1)
|
||||
val consumer = actorOf(new TestConsumer(endpointUri)).start
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublishActor
|
||||
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
|
||||
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1)
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishActorCount(1)
|
||||
consumer.stop
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish
|
||||
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublishActor
|
||||
// endpoint is still there but the route has been stopped
|
||||
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,10 +3,11 @@ package se.scalablesolutions.akka.camel.service
|
|||
import _root_.org.junit.{Before, After, Test}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
|
||||
import se.scalablesolutions.akka.camel.Consumer
|
||||
import se.scalablesolutions.akka.camel.support.{Receive, Countdown}
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
|
||||
import Actor._
|
||||
|
||||
object PublishRequestorTest {
|
||||
class PublisherMock extends Actor with Receive[ConsumerEvent] {
|
||||
|
|
@ -28,11 +29,13 @@ class PublishRequestorTest extends JUnitSuite {
|
|||
|
||||
@Before def setUp = {
|
||||
publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start
|
||||
requestor = actorOf(new PublishRequestor(publisher)).start
|
||||
requestor = actorOf(new PublishRequestor).start
|
||||
requestor ! PublishRequestorInit(publisher)
|
||||
consumer = actorOf(new Actor with Consumer {
|
||||
def endpointUri = "mock:test"
|
||||
protected def receive = null
|
||||
}).start
|
||||
|
||||
}
|
||||
|
||||
@After def tearDown = {
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy;
|
|||
import java.lang.annotation.Target;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
@Target({ElementType.TYPE, ElementType.METHOD})
|
||||
public @interface consume {
|
||||
|
||||
public abstract String value();
|
||||
|
|
|
|||
|
|
@ -476,7 +476,7 @@ object ActiveObject extends Logging {
|
|||
|
||||
}
|
||||
|
||||
private[akka] object AspectInitRegistry {
|
||||
private[akka] object AspectInitRegistry extends ListenerManagement {
|
||||
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
|
||||
|
||||
def initFor(target: AnyRef) = {
|
||||
|
|
@ -485,9 +485,16 @@ private[akka] object AspectInitRegistry {
|
|||
init
|
||||
}
|
||||
|
||||
def register(target: AnyRef, init: AspectInit) = initializations.put(target, init)
|
||||
def register(target: AnyRef, init: AspectInit) = {
|
||||
val res = initializations.put(target, init)
|
||||
foreachListener(_ ! AspectInitRegistered(target, init))
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] sealed trait AspectInitRegistryEvent
|
||||
private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
|
||||
|
||||
private[akka] sealed case class AspectInit(
|
||||
val target: Class[_],
|
||||
val actorRef: ActorRef,
|
||||
|
|
|
|||
|
|
@ -4,11 +4,10 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import scala.reflect.Manifest
|
||||
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
|
||||
import se.scalablesolutions.akka.util.ListenerManagement
|
||||
|
||||
sealed trait ActorRegistryEvent
|
||||
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
|
||||
|
|
@ -26,11 +25,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ActorRegistry extends Logging {
|
||||
object ActorRegistry extends ListenerManagement {
|
||||
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
|
||||
private val actorsById = new ConcurrentHashMap[String, List[ActorRef]]
|
||||
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]]
|
||||
private val registrationListeners = new CopyOnWriteArrayList[ActorRef]
|
||||
|
||||
/**
|
||||
* Returns all actors in the system.
|
||||
|
|
@ -135,29 +133,4 @@ object ActorRegistry extends Logging {
|
|||
actorsByClassName.clear
|
||||
log.info("All actors have been shut down and unregistered from ActorRegistry")
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the registration <code>listener</code> this this registry's listener list.
|
||||
*/
|
||||
def addRegistrationListener(listener: ActorRef) = {
|
||||
listener.start
|
||||
registrationListeners.add(listener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the registration <code>listener</code> this this registry's listener list.
|
||||
*/
|
||||
def removeRegistrationListener(listener: ActorRef) = {
|
||||
listener.stop
|
||||
registrationListeners.remove(listener)
|
||||
}
|
||||
|
||||
private def foreachListener(f: (ActorRef) => Unit) {
|
||||
val iterator = registrationListeners.iterator
|
||||
while (iterator.hasNext) {
|
||||
val listener = iterator.next
|
||||
if (listener.isRunning) f(listener)
|
||||
else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
49
akka-core/src/main/scala/util/ListenerManagement.scala
Normal file
49
akka-core/src/main/scala/util/ListenerManagement.scala
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.util
|
||||
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
|
||||
/**
|
||||
* A manager for listener actors. Intended for mixin by observables.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
trait ListenerManagement extends Logging {
|
||||
|
||||
private val listeners = new CopyOnWriteArrayList[ActorRef]
|
||||
|
||||
/**
|
||||
* Adds the <code>listener</code> this this registry's listener list.
|
||||
* The <code>listener</code> is started by this method.
|
||||
*/
|
||||
def addListener(listener: ActorRef) = {
|
||||
listener.start
|
||||
listeners.add(listener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the <code>listener</code> this this registry's listener list.
|
||||
* The <code>listener</code> is stopped by this method.
|
||||
*/
|
||||
def removeListener(listener: ActorRef) = {
|
||||
listener.stop
|
||||
listeners.remove(listener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute <code>f</code> with each listener as argument.
|
||||
*/
|
||||
protected def foreachListener(f: (ActorRef) => Unit) {
|
||||
val iterator = listeners.iterator
|
||||
while (iterator.hasNext) {
|
||||
val listener = iterator.next
|
||||
if (listener.isRunning) f(listener)
|
||||
else log.warning("Can't notify [%s] since it is not running.", listener)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package sample.camel;
|
||||
|
||||
import org.apache.camel.Body;
|
||||
import org.apache.camel.Header;
|
||||
|
||||
import se.scalablesolutions.akka.actor.annotation.consume;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class Consumer10 {
|
||||
|
||||
@consume("file:data/input2")
|
||||
public void foo(String body) {
|
||||
System.out.println("Received message:");
|
||||
System.out.println(body);
|
||||
}
|
||||
|
||||
@consume("jetty:http://0.0.0.0:8877/camel/active")
|
||||
public String bar(@Body String body, @Header("name") String header) {
|
||||
return String.format("%s %s", body, header);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -37,7 +37,7 @@ class Producer1 extends Actor with Producer {
|
|||
}
|
||||
|
||||
class Consumer1 extends Actor with Consumer with Logging {
|
||||
def endpointUri = "file:data/input"
|
||||
def endpointUri = "file:data/input1"
|
||||
|
||||
def receive = {
|
||||
case msg: Message => log.info("received %s" format msg.bodyAs[String])
|
||||
|
|
|
|||
|
|
@ -6,10 +6,10 @@ import org.apache.camel.impl.DefaultCamelContext
|
|||
import org.apache.camel.spring.spi.ApplicationContextRegistry
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
|
||||
import se.scalablesolutions.akka.actor.SupervisorFactory
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.camel.CamelContextManager
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.actor.{ActiveObject, SupervisorFactory}
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -62,6 +62,9 @@ class Boot {
|
|||
|
||||
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor
|
||||
actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again.
|
||||
|
||||
// Publish active object methods on endpoints
|
||||
ActiveObject.newInstance(classOf[Consumer10])
|
||||
}
|
||||
|
||||
class CustomRouteBuilder extends RouteBuilder {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue