added spring dispatcher configuration

This commit is contained in:
Michael Kober 2010-03-30 10:41:06 +02:00
parent e33c5862bf
commit 8ae90fdfda
24 changed files with 1408 additions and 480 deletions

View file

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka" xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
file:////Users/michaelkober/akka/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd">
<akka:active-object id="active-object-with-dispatcher" target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000">
<akka:dispatcher type="executor-based-event-driven" name="test"/>
</akka:active-object>
<akka:active-object id="active-object-with-dispatcher-ref" target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000">
<akka:dispatcher ref="executor-event-driven-dispatcher-1"/>
</akka:active-object>
<!-- executor-event-driven-dispatcher with bounded-array-blocking-queue -->
<akka:dispatcher id="executor-event-driven-dispatcher-1" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="bounded-array-blocking-queue"
capacity="100"
fairness="true"
core-pool-size="1"
max-pool-size="20"
keep-alive="3000"/>
</akka:dispatcher>
<!-- executor-event-driven-dispatcher with bounded-linked-blocking-queue with unbounded capacity-->
<akka:dispatcher id="executor-event-driven-dispatcher-2" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="bounded-linked-blocking-queue"
bound="10" />
</akka:dispatcher>
<!-- executor-event-driven-dispatcher with unbounded-linked-blocking-queue with bounded capacity-->
<akka:dispatcher id="executor-event-driven-dispatcher-4" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="unbounded-linked-blocking-queue"
capacity="55" />
</akka:dispatcher>
<!-- executor-event-driven-dispatcher with unbounded-linked-blocking-queue with unbounded capacity-->
<akka:dispatcher id="executor-event-driven-dispatcher-5" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="unbounded-linked-blocking-queue" />
</akka:dispatcher>
<!-- executor-event-driven-dispatcher with synchronous-queue -->
<akka:dispatcher id="executor-event-driven-dispatcher-6" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="synchronous-queue" fairness="true" />
</akka:dispatcher>
<!-- reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue -->
<akka:dispatcher id="reactor-based-thread-pool-event-driven-dispatcher" type="reactor-based-thread-pool-event-driven" name="myDispatcher">
<akka:thread-pool queue="synchronous-queue" fairness="true" />
</akka:dispatcher>
<akka:dispatcher id="reactor-based-single-thread-event-driven-dispatcher" type="reactor-based-single-thread-event-driven" name="myDispatcher" />
<akka:supervision id="supervision1">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.io.IOException</akka:trap-exit>
<akka:trap-exit>java.lang.NullPointerException</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Foo" lifecycle="permanent" timeout="1000"/>
<akka:active-object interface="se.scalablesolutions.akka.spring.foo.IBar"
target="se.scalablesolutions.akka.spring.foo.Bar" lifecycle="permanent" timeout="1000"/>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
</akka:active-objects>
</akka:supervision>
</beans>

View file

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka" xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
file:////Users/michaelkober/akka/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd">
<akka:supervision id="supervision1">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.io.IOException</akka:trap-exit>
<akka:trap-exit>java.lang.NullPointerException</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Foo" lifecycle="permanent" timeout="1000"/>
<akka:active-object interface="se.scalablesolutions.akka.spring.foo.IBar"
target="se.scalablesolutions.akka.spring.foo.Bar" lifecycle="permanent" timeout="1000"/>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
</akka:active-objects>
</akka:supervision>
<akka:supervision id="supervision2">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="5000">
<akka:trap-exits>
<akka:trap-exit>java.lang.Exception</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.StatefulPojo" lifecycle="permanent"
timeout="10000" transactional="true"/>
</akka:active-objects>
</akka:supervision>
<akka:dispatcher id="mydispatcher" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="bounded-array-blocking-queue"
capacity="100"
fairness="true"
core-pool-size="1"
max-pool-size="20"
keep-alive="3000"/>
</akka:dispatcher>
<akka:supervision id="supervision-with-dispatcher">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.lang.Exception</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Foo" lifecycle="permanent" timeout="1000">
<akka:dispatcher ref="mydispatcher" />
</akka:active-object>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
<akka:dispatcher ref="mydispatcher" />
</akka:active-object>
</akka:active-objects>
</akka:supervision>
<akka:supervision id="parent-supervision">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.lang.Exception</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Foo" lifecycle="permanent" timeout="1000" />
<akka:active-object target="se.scalablesolutions.akka.spring.foo.MyPojo" lifecycle="temporary" timeout="1000" />
</akka:active-objects>
<akka:supervision id="nested-supervision">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.lang.IOException</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Bar" lifecycle="permanent" timeout="1000" />
</akka:active-objects>
</akka:supervision>
</akka:supervision>
</beans>

View file

@ -1,76 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:akka="http://www.akkasource.org/schema/akka" xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
file:////Users/michaelkober/akka/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd">
<bean id="wrappedService"
class="se.scalablesolutions.akka.actor.ActiveObject"
factory-method="newInstance">
<constructor-arg index="0" type="java.lang.Class" value="se.scalablesolutions.akka.spring.foo.MyPojo"/>
<constructor-arg index="1" value="1000"/>
</bean>
<akka:active-object id="simple-active-object"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000"/>
<akka:active-object id="simple-active-object-long-timeout"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="10000"/>
<akka:active-object id="transactional-active-object"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true"/>
<akka:active-object id="active-object-callbacks"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
<akka:active-object id="remote-active-object"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
<akka:remote host="localhost" port="9999" />
</akka:active-object>
<akka:active-object id="remote-service1" target="se.scalablesolutions.akka.spring.foo.MyPojo" timeout="1000">
<akka:remote host="localhost" port="9999" />
</akka:active-object>
<akka:supervision id="supervision1">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.io.IOException</akka:trap-exit>
<akka:trap-exit>java.lang.NullPointerException</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Foo" lifecycle="permanent" timeout="1000"/>
<akka:active-object interface="se.scalablesolutions.akka.spring.foo.IBar" target="se.scalablesolutions.akka.spring.foo.Bar" lifecycle="permanent" timeout="1000"/>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
</akka:active-objects>
</akka:supervision>
<bean id="wrappedService"
class="se.scalablesolutions.akka.actor.ActiveObject"
factory-method="newInstance">
<constructor-arg index="0" type="java.lang.Class" value="se.scalablesolutions.akka.spring.foo.MyPojo"/>
<constructor-arg index="1" value="1000"/>
</bean>
<akka:active-object id="simple-active-object"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000"/>
<akka:active-object id="simple-active-object-long-timeout"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="10000"/>
<akka:active-object id="transactional-active-object"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true"/>
<akka:active-object id="active-object-callbacks"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
<akka:active-object id="remote-active-object"
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
<akka:remote host="localhost" port="9999"/>
</akka:active-object>
<akka:active-object id="remote-service1" target="se.scalablesolutions.akka.spring.foo.MyPojo" timeout="1000">
<akka:remote host="localhost" port="9999"/>
</akka:active-object>
<akka:dispatcher id="dispatcher1" type="executor-based-event-driven" name="myDispatcher">
<akka:thread-pool queue="bounded-array-blocking-queue"
capacity="100"
fairness="true"
core-pool-size="3"
max-pool-size="40"
keep-alive="2000"/>
</akka:dispatcher>
<akka:supervision id="supervision1">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">
<akka:trap-exits>
<akka:trap-exit>java.io.IOException</akka:trap-exit>
<akka:trap-exit>java.lang.NullPointerException</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.Foo" lifecycle="permanent" timeout="1000"/>
<akka:active-object interface="se.scalablesolutions.akka.spring.foo.IBar"
target="se.scalablesolutions.akka.spring.foo.Bar" lifecycle="permanent" timeout="1000"/>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
</akka:active-objects>
</akka:supervision>
<akka:supervision id="supervision2">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="5000">
<akka:trap-exits>
<akka:trap-exit>java.lang.Exception</akka:trap-exit>
</akka:trap-exits>
</akka:restart-strategy>
<akka:active-objects>
<akka:active-object target="se.scalablesolutions.akka.spring.foo.StatefulPojo" lifecycle="permanent" timeout="10000" transactional="true"/>
</akka:active-objects>
</akka:supervision>
</beans>

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import se.scalablesolutions.akka.config.Config;
import se.scalablesolutions.akka.dispatch.FutureTimeoutException;
import se.scalablesolutions.akka.remote.RemoteNode;
import se.scalablesolutions.akka.spring.foo.MyPojo;
/**
* Tests for spring configuration of active objects and supervisor configuration.
* @author michaelkober
*/
public class ActiveObjectConfigurationTest {
private ApplicationContext context = null;
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/test-config.xml");
}
/**
* Tests that the &lt;akka:active-object/&gt; and &lt;akka:supervision/&gt; and &lt;akka:dispatcher/&gt; element
* can be used as a top level element.
*/
@Test
public void testParse() throws Exception {
final Resource CONTEXT = new ClassPathResource("se/scalablesolutions/akka/spring/foo/test-config.xml");
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(beanFactory);
reader.loadBeanDefinitions(CONTEXT);
assertTrue(beanFactory.containsBeanDefinition("simple-active-object"));
assertTrue(beanFactory.containsBeanDefinition("remote-active-object"));
assertTrue(beanFactory.containsBeanDefinition("supervision1"));
assertTrue(beanFactory.containsBeanDefinition("dispatcher1"));
}
@Test
public void testSimpleActiveObject() {
MyPojo myPojo = (MyPojo) context.getBean("simple-active-object");
String msg = myPojo.getFoo();
msg += myPojo.getBar();
assertEquals("wrong invocation order", "foobar", msg);
}
@Test(expected = FutureTimeoutException.class)
public void testSimpleActiveObject_Timeout() {
MyPojo myPojo = (MyPojo) context.getBean("simple-active-object");
myPojo.longRunning();
}
@Test
public void testSimpleActiveObject_NoTimeout() {
MyPojo myPojo = (MyPojo) context.getBean("simple-active-object-long-timeout");
String msg = myPojo.longRunning();
assertEquals("this took long", msg);
}
@Test
public void testTransactionalActiveObject() {
MyPojo myPojo = (MyPojo) context.getBean("transactional-active-object");
String msg = myPojo.getFoo();
msg += myPojo.getBar();
assertEquals("wrong invocation order", "foobar", msg);
}
@Test
public void testRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
Config.config();
MyPojo myPojo = (MyPojo) context.getBean("remote-active-object");
assertEquals("foo", myPojo.getFoo());
}
}

View file

@ -0,0 +1,143 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenDispatcher;
import se.scalablesolutions.akka.dispatch.ReactorBasedThreadPoolEventDrivenDispatcher;
import se.scalablesolutions.akka.dispatch.ReactorBasedSingleThreadEventDrivenDispatcher;
import se.scalablesolutions.akka.dispatch.MessageDispatcher;
import se.scalablesolutions.akka.dispatch.ThreadPoolBuilder;
import se.scalablesolutions.akka.spring.foo.MyPojo;
/**
* Tests for spring configuration of dispatcher configuration.
* @author michaelkober
*/
public class DispatcherConfigurationTest {
private ApplicationContext context = null;
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/dispatcher-config.xml");
}
/**
* test for executor-event-driven-dispatcher with array-blocking-queue
*/
@Test
public void testDispatcher() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-1");
ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher);
assertEquals("wrong core pool size", 1, executor.getCorePoolSize());
assertEquals("wrong max pool size", 20, executor.getMaximumPoolSize());
assertEquals("wrong keep alive", 3000, executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
assertTrue("wrong queue type",executor.getQueue() instanceof ArrayBlockingQueue);
assertEquals("wrong capacity", 100, executor.getQueue().remainingCapacity());
}
/**
* test for dispatcher via ref
*/
@Test
public void testDispatcherRef() {
MyPojo pojo = (MyPojo) context.getBean("active-object-with-dispatcher-ref");
assertNotNull(pojo);
}
/**
* test for executor-event-driven-dispatcher with bounded-linked-blocking-queue with unbounded capacity
*/
@Test
public void testDispatcherWithBoundedLinkedBlockingQueueWithUnboundedCapacity() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-2");
ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher);
assertTrue("wrong queue type", executor.getQueue() instanceof LinkedBlockingQueue);
assertEquals("wrong capacity", Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
}
/**
* test for executor-event-driven-dispatcher with unbounded-linked-blocking-queue with bounded capacity
*/
@Test
public void testDispatcherWithLinkedBlockingQueueWithBoundedCapacity() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-4");
ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher);
assertTrue("wrong queue type", executor.getQueue() instanceof LinkedBlockingQueue);
assertEquals("wrong capacity", 55, executor.getQueue().remainingCapacity());
}
/**
* test for executor-event-driven-dispatcher with unbounded-linked-blocking-queue with unbounded capacity
*/
@Test
public void testDispatcherWithLinkedBlockingQueueWithUnboundedCapacity() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-5");
ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher);
assertTrue("wrong queue type", executor.getQueue() instanceof LinkedBlockingQueue);
assertEquals("wrong capacity", Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
}
/**
* test for executor-event-driven-dispatcher with synchronous-queue
*/
@Test
public void testDispatcherWithSynchronousQueue() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-6");
ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher);
assertTrue("wrong queue type", executor.getQueue() instanceof SynchronousQueue);
}
/**
* test for reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue
*/
@Test
public void testReactorBasedThreadPoolDispatcherWithSynchronousQueue() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("reactor-based-thread-pool-event-driven-dispatcher");
assertTrue(dispatcher instanceof ReactorBasedThreadPoolEventDrivenDispatcher);
assertTrue(dispatcher instanceof ThreadPoolBuilder);
ThreadPoolBuilder pool = (ThreadPoolBuilder) dispatcher;
ThreadPoolExecutor executor = pool.se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder();
assertNotNull(executor);
assertTrue("wrong queue type", executor.getQueue() instanceof SynchronousQueue);
}
/**
* test for reactor-based-single-thread-event-driven-dispatcher with synchronous-queue
*/
@Test
public void testReactorBasedSingleThreadDispatcherWithSynchronousQueue() {
MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("reactor-based-single-thread-event-driven-dispatcher");
assertTrue(dispatcher instanceof ReactorBasedSingleThreadEventDrivenDispatcher);
}
/**
* Assert that dispatcher is correct type and get executor.
*/
private ThreadPoolExecutor getThreadPoolExecutorAndAssert(MessageDispatcher dispatcher) {
assertTrue(dispatcher instanceof ExecutorBasedEventDrivenDispatcher);
assertTrue(dispatcher instanceof ThreadPoolBuilder);
ThreadPoolBuilder pool = (ThreadPoolBuilder) dispatcher;
ThreadPoolExecutor executor = pool.se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder();
assertNotNull(executor);
return executor;
}
}

View file

@ -1,120 +0,0 @@
package se.scalablesolutions.akka.spring;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import se.scalablesolutions.akka.dispatch.FutureTimeoutException;
import se.scalablesolutions.akka.config.Config;
import se.scalablesolutions.akka.remote.RemoteNode;
import se.scalablesolutions.akka.spring.foo.Foo;
import se.scalablesolutions.akka.spring.foo.IBar;
import se.scalablesolutions.akka.spring.foo.MyPojo;
import se.scalablesolutions.akka.spring.foo.StatefulPojo;
/**
* Tests for spring configuration of active objects and supervisor configuration.
*/
public class SpringConfigurationTest {
private ApplicationContext context = null;
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/test-config.xml");
}
/**
* Tests that the &lt;akka:active-object/&gt; and &lt;akka:supervision/&gt; element
* can be used as a top level element.
*/
@Test
public void testParse() throws Exception {
final Resource CONTEXT = new ClassPathResource("se/scalablesolutions/akka/spring/foo/test-config.xml");
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(beanFactory);
reader.loadBeanDefinitions(CONTEXT);
assertTrue(beanFactory.containsBeanDefinition("simple-active-object"));
assertTrue(beanFactory.containsBeanDefinition("remote-active-object"));
assertTrue(beanFactory.containsBeanDefinition("supervision1"));
}
@Test
public void testSimpleActiveObject() {
MyPojo myPojo = (MyPojo) context.getBean("simple-active-object");
String msg = myPojo.getFoo();
msg += myPojo.getBar();
assertEquals("wrong invocation order", "foobar", msg);
}
@Test(expected=FutureTimeoutException.class)
public void testSimpleActiveObject_Timeout() {
MyPojo myPojo = (MyPojo) context.getBean("simple-active-object");
myPojo.longRunning();
}
@Test
public void testSimpleActiveObject_NoTimeout() {
MyPojo myPojo = (MyPojo) context.getBean("simple-active-object-long-timeout");
String msg = myPojo.longRunning();
assertEquals("this took long", msg);
}
@Test
public void testTransactionalActiveObject() {
MyPojo myPojo = (MyPojo) context.getBean("transactional-active-object");
String msg = myPojo.getFoo();
msg += myPojo.getBar();
assertEquals("wrong invocation order", "foobar", msg);
}
@Test
public void testRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
Config.config();
MyPojo myPojo = (MyPojo) context.getBean("remote-active-object");
assertEquals("foo", myPojo.getFoo());
}
@Test
public void testSupervision() {
// get ActiveObjectConfigurator bean from spring context
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision1");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
IBar bar = myConfigurator.getInstance(IBar.class);
assertNotNull(bar);
MyPojo pojo = myConfigurator.getInstance(MyPojo.class);
assertNotNull(pojo);
}
@Test
public void testTransactionalState() {
ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context.getBean("supervision2");
StatefulPojo stateful = conf.getInstance(StatefulPojo.class);
stateful.init();
stateful.setMapState("testTransactionalState", "some map state");
stateful.setVectorState("some vector state");
stateful.setRefState("some ref state");
assertEquals("some map state", stateful.getMapState("testTransactionalState"));
assertEquals("some vector state", stateful.getVectorState());
assertEquals("some ref state", stateful.getRefState());
}
}

View file

@ -0,0 +1,70 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import se.scalablesolutions.akka.spring.foo.Foo;
import se.scalablesolutions.akka.spring.foo.IBar;
import se.scalablesolutions.akka.spring.foo.MyPojo;
import se.scalablesolutions.akka.spring.foo.StatefulPojo;
/**
* Testclass for supervisor configuration.
* @author michaelkober
*
*/
public class SupervisorConfigurationTest {
private ApplicationContext context = null;
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/supervisor-config.xml");
}
@Test
public void testSupervision() {
// get ActiveObjectConfigurator bean from spring context
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision1");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
IBar bar = myConfigurator.getInstance(IBar.class);
assertNotNull(bar);
MyPojo pojo = myConfigurator.getInstance(MyPojo.class);
assertNotNull(pojo);
}
@Test
public void testTransactionalState() {
ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context.getBean("supervision2");
StatefulPojo stateful = conf.getInstance(StatefulPojo.class);
stateful.init();
stateful.setMapState("testTransactionalState", "some map state");
stateful.setVectorState("some vector state");
stateful.setRefState("some ref state");
assertEquals("some map state", stateful.getMapState("testTransactionalState"));
assertEquals("some vector state", stateful.getVectorState());
assertEquals("some ref state", stateful.getRefState());
}
@Test
public void testSupervisionWithDispatcher() {
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision-with-dispatcher");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
// TODO how to check dispatcher?
}
}

View file

@ -1,163 +1,226 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.akkasource.org/schema/akka"
targetNamespace="http://www.akkasource.org/schema/akka"
elementFormDefault="qualified" attributeFormDefault="unqualified"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<!-- base types -->
targetNamespace="http://www.akkasource.org/schema/akka"
elementFormDefault="qualified" attributeFormDefault="unqualified"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<!-- restart strategies enumeration -->
<xsd:import namespace="http://www.springframework.org/schema/beans"
schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"/>
<!-- base types -->
<!-- restart strategies enumeration -->
<xsd:simpleType name="failover-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="AllForOne"/>
<xsd:enumeration value="OneForOne"/>
</xsd:restriction>
<xsd:restriction base="xsd:token">
<xsd:enumeration value="AllForOne"/>
<xsd:enumeration value="OneForOne"/>
</xsd:restriction>
</xsd:simpleType>
<!-- restart strategies enumeration -->
<xsd:simpleType name="lifecycle-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="permanent"/>
<xsd:enumeration value="temporary"/>
</xsd:restriction>
</xsd:simpleType>
<!-- Remote -->
<xsd:complexType name="remote-type">
<xsd:attribute name="host" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" type="xsd:integer" use="required">
<xsd:annotation>
<xsd:documentation>
Port of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- callbacks -->
<xsd:complexType name="restart-callbacks-type">
<xsd:attribute name="pre" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Pre restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="post" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Post restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- active object -->
<xsd:complexType name="active-object-type">
<xsd:sequence>
<xsd:element name="restart-callbacks" type="restart-callbacks-type" minOccurs="0" maxOccurs="1" />
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1" />
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID" />
<xsd:attribute name="target" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:long" use="required">
<xsd:annotation>
<xsd:documentation>
default timeout for '!!' invocations
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transactional" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
Set to true if messages should have REQUIRES_NEW semantics
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="interface" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Interface implemented by target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lifecycle" type="lifecycle-type">
<xsd:annotation>
<xsd:documentation>
Lifecycle, permanent or temporary
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- trap exits -->
<xsd:complexType name="trap-exits-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="trap-exit" type="xsd:string"/>
</xsd:choice>
</xsd:complexType>
<!-- active objects -->
<xsd:complexType name="active-objects-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="active-object" type="active-object-type"/>
</xsd:choice>
</xsd:complexType>
<!-- Supervisor -->
<xsd:complexType name="strategy-type" >
<xsd:sequence>
<xsd:element name="trap-exits" type="trap-exits-type" minOccurs="1" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="failover" type="failover-type">
<xsd:annotation>
<xsd:documentation>
Failover scheme, AllForOne or OneForOne
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="retries" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Maximal number of retries.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timerange" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Timerange for restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- ActiveObject -->
<xsd:element name="active-object" type="active-object-type"/>
<xsd:simpleType name="lifecycle-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="permanent"/>
<xsd:enumeration value="temporary"/>
</xsd:restriction>
</xsd:simpleType>
<!-- Supervision -->
<xsd:element name="supervision">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="restart-strategy" type="strategy-type" minOccurs="1" maxOccurs="1"/>
<xsd:element name="active-objects" type="active-objects-type" minOccurs="1" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID" use="required"/>
</xsd:complexType>
</xsd:element>
<!-- dispatchers enumeration -->
<xsd:simpleType name="dispatcher-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="executor-based-event-driven"/>
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
<xsd:enumeration value="thread-based"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher queue types enumeration -->
<xsd:simpleType name="dispatcher-queue-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="bounded-linked-blocking-queue"/>
<xsd:enumeration value="unbounded-linked-blocking-queue"/>
<xsd:enumeration value="synchronous-queue"/>
<xsd:enumeration value="bounded-array-blocking-queue"/>
</xsd:restriction>
</xsd:simpleType>
<!-- thread pool rejection policies enumeration -->
<xsd:simpleType name="rejection-policy-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="abort-policy"/>
<xsd:enumeration value="caller-runs-policy"/>
<xsd:enumeration value="discard-oldest-policy"/>
<xsd:enumeration value="discard-policy"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher type -->
<xsd:complexType name="dispatcher-type">
<xsd:choice minOccurs="0" maxOccurs="1">
<xsd:element name="thread-pool" type="threadpool-type"/>
</xsd:choice>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="ref" type="xsd:string"/>
<xsd:attribute name="type" type="dispatcher-enum-type"/>
<xsd:attribute name="name" type="xsd:string"/>
</xsd:complexType>
<xsd:complexType name="threadpool-type">
<xsd:attribute name="queue" type="dispatcher-queue-type"/>
<xsd:attribute name="bound" type="xsd:integer"/>
<xsd:attribute name="capacity" type="xsd:integer"/>
<xsd:attribute name="fairness" type="xsd:boolean"/>
<xsd:attribute name="core-pool-size" type="xsd:integer"/>
<xsd:attribute name="max-pool-size" type="xsd:integer"/>
<xsd:attribute name="keep-alive" type="xsd:long"/>
<xsd:attribute name="rejection-policy" type="rejection-policy-type"/>
</xsd:complexType>
<!-- Remote -->
<xsd:complexType name="remote-type">
<xsd:attribute name="host" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" type="xsd:integer" use="required">
<xsd:annotation>
<xsd:documentation>
Port of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- callbacks -->
<xsd:complexType name="restart-callbacks-type">
<xsd:attribute name="pre" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Pre restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="post" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Post restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- active object -->
<xsd:complexType name="active-object-type">
<xsd:all minOccurs="0">
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="restart-callbacks" type="restart-callbacks-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0"/>
</xsd:all>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="target" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:long" use="required">
<xsd:annotation>
<xsd:documentation>
default timeout for '!!' invocations
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transactional" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
Set to true if messages should have REQUIRES_NEW semantics
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="interface" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Interface implemented by target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lifecycle" type="lifecycle-type">
<xsd:annotation>
<xsd:documentation>
Lifecycle, permanent or temporary
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- trap exits -->
<xsd:complexType name="trap-exits-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="trap-exit" type="xsd:string"/>
</xsd:choice>
</xsd:complexType>
<!-- active objects -->
<xsd:complexType name="active-objects-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="active-object" type="active-object-type"/>
</xsd:choice>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="strategy-type">
<xsd:sequence>
<xsd:element name="trap-exits" type="trap-exits-type" minOccurs="1" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="failover" type="failover-type">
<xsd:annotation>
<xsd:documentation>
Failover scheme, AllForOne or OneForOne
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="retries" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Maximal number of retries.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timerange" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Timerange for restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="supervision-type">
<xsd:all>
<xsd:element name="restart-strategy" type="strategy-type" minOccurs="1" maxOccurs="1"/>
<xsd:element name="active-objects" type="active-objects-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="supervision" type="supervision-type" minOccurs="0"/>
</xsd:all>
<xsd:attribute name="id" type="xsd:ID"/>
</xsd:complexType>
<!-- ActiveObject -->
<xsd:element name="active-object" type="active-object-type"/>
<!-- Dispatcher -->
<xsd:element name="dispatcher" type="dispatcher-type"/>
<!-- Supervision -->
<xsd:element name="supervision" type="supervision-type"/>
</xsd:schema>

View file

@ -3,86 +3,27 @@
*/
package se.scalablesolutions.akka.spring
import org.springframework.util.xml.DomUtils
import se.scalablesolutions.akka.util.Logging
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
import org.springframework.beans.factory.xml.ParserContext
import org.w3c.dom.Element
/**
* Parser for custom namespace configuration for active-object.
* Parser for custom namespace configuration.
* @author michaelkober
*/
trait ActiveObjectBeanDefinitionParser extends Logging {
import AkkaSpringConfigurationTags._
/**
* Parses the given element and returns a ActiveObjectProperties.
* @param element dom element to parse
* @return configuration for the active object
class ActiveObjectBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
def parseActiveObject(element: Element): ActiveObjectProperties = {
val objectProperties = new ActiveObjectProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
if (remoteElement != null) {
objectProperties.host = mandatory(remoteElement, HOST)
objectProperties.port = mandatory(remoteElement, PORT).toInt
}
if (callbacksElement != null) {
objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART)
objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART)
if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) {
throw new IllegalStateException("At least one of pre or post must be defined.")
}
}
try {
objectProperties.timeout = mandatory(element, TIMEOUT).toLong
} catch {
case nfe: NumberFormatException =>
log.error(nfe, "could not parse timeout %s", element.getAttribute(TIMEOUT))
throw nfe
}
objectProperties.target = mandatory(element, TARGET)
objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean
if (!element.getAttribute(INTERFACE).isEmpty) {
objectProperties.interface = element.getAttribute(INTERFACE)
}
if (!element.getAttribute(LIFECYCLE).isEmpty) {
objectProperties.lifecyclye = element.getAttribute(LIFECYCLE)
}
objectProperties
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val activeObjectConf = parseActiveObject(element)
activeObjectConf.setAsProperties(builder)
}
/**
* Get a mandatory element attribute.
* @param element the element with the mandatory attribute
* @param attribute name of the mandatory attribute
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
def mandatory(element: Element, attribute: String): String = {
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
} else {
element.getAttribute(attribute)
}
}
/**
* Get a mandatory child element.
* @param element the parent element
* @param childName name of the mandatory child element
*/
def mandatoryElement(element: Element, childName: String): Element = {
val childElement = DomUtils.getChildElementByTagName(element, childName);
if (childElement == null) {
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
} else {
childElement
}
}
override def getBeanClass(element: Element) = classOf[ActiveObjectFactoryBean]
}

View file

@ -8,8 +8,7 @@ import org.springframework.beans.factory.config.AbstractFactoryBean
import se.scalablesolutions.akka.actor.ActiveObject
import reflect.BeanProperty
import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks
import se.scalablesolutions.akka.dispatch.MessageDispatcher
/**
@ -28,7 +27,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] {
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
@BeanProperty var lifecycle: String = ""
@BeanProperty var dispatcher: DispatcherProperties = _
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
@ -40,42 +39,68 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] {
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
*/
def createInstance: AnyRef = {
ActiveObject.newInstance(target.toClass, timeout, transactional, restartCallbacks)
if (isRemote) {
newRemoteInstance(target, timeout, interface, transactional, restartCallbacks, host, port)
var argumentList = ""
if (isRemote) argumentList += "r"
if (hasInterface) argumentList += "i"
if (hasDispatcher) argumentList += "d"
create(argumentList)
}
// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here)
//
// private[akka] def create(argList : String) : AnyRef = argList match {
// case "r" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
// case "ri" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
// case "rd" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
// case "rid" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
// case "i" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
// case "id" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks)
// case "d" => ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
// case _ => ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
// }
private[akka] def create(argList : String) : AnyRef = {
if (argList == "r") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
} else if (argList == "ri" ) {
ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
} else if (argList == "rd") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
} else if (argList == "rid") {
ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
} else if (argList == "i") {
ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
} else if (argList == "id") {
ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks)
} else if (argList == "d") {
ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
} else {
newInstance(target, timeout, interface, transactional, restartCallbacks);
ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
}
}
/**
* create Option[RestartCallback]
*/
private def callbacks: Option[RestartCallbacks] = {
if (hasCallbacks) {
val callbacks = new RestartCallbacks(pre, post)
Some(callbacks)
} else {
None
}
}
private[akka] def isRemote = (host != null) && (!host.isEmpty)
/**
* create Option[RestartCallback]
*/
private def restartCallbacks: Option[RestartCallbacks] = {
if (((pre == null) || pre.isEmpty) && ((post == null) || post.isEmpty)) {
None
} else {
val callbacks = new RestartCallbacks(pre, post)
Some(callbacks)
}
}
private[akka] def hasInterface = (interface != null) && (!interface.isEmpty)
private def newInstance(target: String, timeout: Long, interface: String, transactional: Boolean, callbacks: Option[RestartCallbacks]): AnyRef = {
if ((interface == null) || interface.isEmpty) {
ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
} else {
ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
}
}
private[akka] def hasCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
private def newRemoteInstance(target: String, timeout: Long, interface: String, transactional: Boolean, callbacks: Option[RestartCallbacks], host: String, port: Int): AnyRef = {
if ((interface == null) || interface.isEmpty) {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
} else {
ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
}
}
private[akka] def hasDispatcher = (dispatcher != null) && (dispatcher.dispatcherType != null) && (!dispatcher.dispatcherType.isEmpty)
private[akka] def dispatcherInstance : MessageDispatcher = {
import DispatcherFactoryBean._
createNewInstance(dispatcher)
}
}

View file

@ -0,0 +1,66 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.util.xml.DomUtils
import org.w3c.dom.Element
/**
* Parser trait for custom namespace configuration for active-object.
* @author michaelkober
*/
trait ActiveObjectParser extends BeanParser with DispatcherParser {
import AkkaSpringConfigurationTags._
/**
* Parses the given element and returns a ActiveObjectProperties.
* @param element dom element to parse
* @return configuration for the active object
*/
def parseActiveObject(element: Element): ActiveObjectProperties = {
val objectProperties = new ActiveObjectProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
if (remoteElement != null) {
objectProperties.host = mandatory(remoteElement, HOST)
objectProperties.port = mandatory(remoteElement, PORT).toInt
}
if (callbacksElement != null) {
objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART)
objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART)
if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) {
throw new IllegalStateException("At least one of pre or post must be defined.")
}
}
if (dispatcherElement != null) {
val dispatcherProperties = parseDispatcher(dispatcherElement)
objectProperties.dispatcher = dispatcherProperties
}
try {
objectProperties.timeout = mandatory(element, TIMEOUT).toLong
} catch {
case nfe: NumberFormatException =>
log.error(nfe, "could not parse timeout %s", element.getAttribute(TIMEOUT))
throw nfe
}
objectProperties.target = mandatory(element, TARGET)
objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean
if (!element.getAttribute(INTERFACE).isEmpty) {
objectProperties.interface = element.getAttribute(INTERFACE)
}
if (!element.getAttribute(LIFECYCLE).isEmpty) {
objectProperties.lifecyclye = element.getAttribute(LIFECYCLE)
}
objectProperties
}
}

View file

@ -21,6 +21,8 @@ class ActiveObjectProperties {
var host: String = ""
var port: Int = _
var lifecyclye: String = ""
var dispatcher: DispatcherProperties = _
/**
* Sets the properties to the given builder.
@ -36,6 +38,7 @@ class ActiveObjectProperties {
builder.addPropertyValue(INTERFACE, interface)
builder.addPropertyValue(TRANSACTIONAL, transactional)
builder.addPropertyValue(LIFECYCLE, lifecyclye)
builder.addPropertyValue(DISPATCHER_TAG, dispatcher)
}
}

View file

@ -12,7 +12,8 @@ import AkkaSpringConfigurationTags._
*/
class AkkaNamespaceHandler extends NamespaceHandlerSupport {
def init = {
registerBeanDefinitionParser(ACTIVE_OBJECT_TAG, new AkkaObjectBeanDefinitionParser());
registerBeanDefinitionParser(ACTIVE_OBJECT_TAG, new ActiveObjectBeanDefinitionParser());
registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser());
registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser());
}
}

View file

@ -8,17 +8,29 @@ package se.scalablesolutions.akka.spring
* @author michaelkober
*/
object AkkaSpringConfigurationTags {
// --- TAGS
//
// top level tags
val ACTIVE_OBJECT_TAG = "active-object"
val SUPERVISION_TAG = "supervision"
val DISPATCHER_TAG = "dispatcher"
// active-object sub tags
val RESTART_CALLBACKS_TAG = "restart-callbacks"
val REMOTE_TAG = "remote";
val REMOTE_TAG = "remote"
// superivision sub tags
val ACTIVE_OBJECTS_TAG = "active-objects"
val STRATEGY_TAG = "restart-strategy"
val TRAP_EXISTS_TAG = "trap-exits"
val TRAP_EXIT_TAG = "trap-exit"
// dispatcher sub tags
val THREAD_POOL_TAG = "thread-pool"
// --- ATTRIBUTES
//
// active object attributes
val TIMEOUT = "timeout"
val TARGET = "target"
@ -29,13 +41,53 @@ object AkkaSpringConfigurationTags {
val PRE_RESTART = "pre"
val POST_RESTART = "post"
val LIFECYCLE = "lifecycle"
// supervision attributes
val FAILOVER = "failover"
val RETRIES = "retries"
val TIME_RANGE = "timerange"
// Value types
// dispatcher attributes
val NAME = "name"
val REF = "ref"
val TYPE = "type"
// thread pool attributes
val QUEUE = "queue"
val CAPACITY = "capacity"
val FAIRNESS = "fairness"
val CORE_POOL_SIZE = "core-pool-size"
val MAX_POOL_SIZE = "max-pool-size"
val KEEP_ALIVE = "keep-alive"
val BOUND ="bound"
val REJECTION_POLICY ="rejection-policy"
// --- VALUES
//
// Lifecycle
val VAL_LIFECYCYLE_TEMPORARY = "temporary"
val VAL_LIFECYCYLE_PERMANENT = "permanent"
// Failover
val VAL_ALL_FOR_ONE = "AllForOne"
val VAL_ONE_FOR_ONE = "OneForOne"
// rejection policies
val VAL_ABORT_POLICY = "abort-policy"
val VAL_CALLER_RUNS_POLICY = "caller-runs-policy"
val VAL_DISCARD_OLDEST_POLICY = "discard-oldest-policy"
val VAL_DISCARD_POLICY = "discard-policy"
// dispatcher queue types
val VAL_BOUNDED_LINKED_BLOCKING_QUEUE = "bounded-linked-blocking-queue"
val VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE = "unbounded-linked-blocking-queue"
val VAL_SYNCHRONOUS_QUEUE = "synchronous-queue"
val VAL_BOUNDED_ARRAY_BLOCKING_QUEUE = "bounded-array-blocking-queue"
// dispatcher types
val EXECUTOR_BASED_EVENT_DRIVEN = "executor-based-event-driven"
val REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN = "reactor-based-thread-pool-event-driven"
val REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN = "reactor-based-single-thread-event-driven"
val THREAD_BASED = "thread-based"
}

View file

@ -0,0 +1,42 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import se.scalablesolutions.akka.util.Logging
import org.w3c.dom.Element
import org.springframework.util.xml.DomUtils
/**
* Base trait with utility methods for bean parsing.
*/
trait BeanParser extends Logging {
/**
* Get a mandatory element attribute.
* @param element the element with the mandatory attribute
* @param attribute name of the mandatory attribute
*/
def mandatory(element: Element, attribute: String): String = {
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
} else {
element.getAttribute(attribute)
}
}
/**
* Get a mandatory child element.
* @param element the parent element
* @param childName name of the mandatory child element
*/
def mandatoryElement(element: Element, childName: String): Element = {
val childElement = DomUtils.getChildElementByTagName(element, childName);
if (childElement == null) {
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
} else {
childElement
}
}
}

View file

@ -3,28 +3,26 @@
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
import org.springframework.beans.factory.xml.ParserContext
import org.w3c.dom.Element
import se.scalablesolutions.akka.util.Logging
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.{ParserContext, AbstractSingleBeanDefinitionParser}
/**
* Parser for custom namespace configuration.
* @author michaelkober
*/
class AkkaObjectBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectBeanDefinitionParser {
/*
class DispatcherBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectParser with DispatcherParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val activeObjectConf = parseActiveObject(element)
activeObjectConf.setAsProperties(builder)
val dispatcherProperties = parseDispatcher(element)
dispatcherProperties.setAsProperties(builder)
}
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
override def getBeanClass(element: Element) = classOf[ActiveObjectFactoryBean]
override def getBeanClass(element: Element) = classOf[DispatcherFactoryBean]
}

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.config.AbstractFactoryBean
import se.scalablesolutions.akka.config.JavaConfig._
import AkkaSpringConfigurationTags._
import reflect.BeanProperty
import se.scalablesolutions.akka.dispatch.{ThreadPoolBuilder, Dispatchers, MessageDispatcher}
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy}
/**
* Reusable factory method for dispatchers.
*/
object DispatcherFactoryBean {
def createNewInstance(properties: DispatcherProperties): MessageDispatcher = {
var dispatcher = properties.dispatcherType match {
case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name)
case REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN => Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(properties.name)
case REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN => Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(properties.name)
case THREAD_BASED => throw new IllegalArgumentException("not implemented yet") //FIXME
case _ => throw new IllegalArgumentException("unknown dispatcher type")
}
if ((properties.threadPool != null) && (properties.threadPool.queue != null)) {
var threadPoolBuilder = dispatcher.asInstanceOf[ThreadPoolBuilder]
threadPoolBuilder = properties.threadPool.queue match {
case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness)
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE => if (properties.threadPool.capacity > -1)
threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity)
else
threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound)
case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness)
case _ => throw new IllegalArgumentException("unknown queue type")
}
if (properties.threadPool.corePoolSize > -1) {
threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize)
}
if (properties.threadPool.maxPoolSize > -1) {
threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize)
}
if (properties.threadPool.keepAlive > -1) {
threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive)
}
if ((properties.threadPool.rejectionPolicy != null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match {
case "abort-policy" => new AbortPolicy()
case "caller-runs-policy" => new CallerRunsPolicy()
case "discard-oldest-policy" => new DiscardOldestPolicy()
case "discard-policy" => new DiscardPolicy()
case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'")
}
threadPoolBuilder.setRejectionPolicy(policy)
}
threadPoolBuilder.asInstanceOf[MessageDispatcher]
} else {
dispatcher
}
}
}
/**
* Factory bean for supervisor configuration.
* @author michaelkober
*/
class DispatcherFactoryBean extends AbstractFactoryBean[MessageDispatcher] {
@BeanProperty var properties: DispatcherProperties = _
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
*/
def getObjectType: Class[MessageDispatcher] = classOf[MessageDispatcher]
/*
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
*/
def createInstance: MessageDispatcher = {
import DispatcherFactoryBean._
createNewInstance(properties)
}
}

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.w3c.dom.Element
import org.springframework.util.xml.DomUtils
/**
* Parser trait for custom namespace for Akka dispatcher configuration.
* @author michaelkober
*/
trait DispatcherParser extends BeanParser {
import AkkaSpringConfigurationTags._
/**
* Parses the given element and returns a DispatcherProperties.
* @param element dom element to parse
* @return configuration for the dispatcher
*/
def parseDispatcher(element: Element): DispatcherProperties = {
val properties = new DispatcherProperties()
var dispatcherElement = element
if (hasRef(element)) {
val ref = element.getAttribute(REF)
dispatcherElement = element.getOwnerDocument.getElementById(ref)
if (dispatcherElement == null) {
throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
}
}
properties.name = mandatory(dispatcherElement, NAME)
properties.dispatcherType = mandatory(dispatcherElement, TYPE)
if (properties.dispatcherType == THREAD_BASED) {
if (dispatcherElement.getParentNode.getNodeName != "active-object") {
throw new IllegalArgumentException("Thread based dispatcher must be nested in active-object element!")
}
}
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
if (threadPoolElement != null) {
if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN ||
properties.dispatcherType == THREAD_BASED) {
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
}
val threadPoolProperties = parseThreadPool(threadPoolElement)
properties.threadPool = threadPoolProperties
}
properties
}
/**
* Parses the given element and returns a ThreadPoolProperties.
* @param element dom element to parse
* @return configuration for the thread pool
*/
def parseThreadPool(element: Element): ThreadPoolProperties = {
val properties = new ThreadPoolProperties()
properties.queue = element.getAttribute(QUEUE)
if (element.hasAttribute(CAPACITY)) {
properties.capacity = element.getAttribute(CAPACITY).toInt
}
if (element.hasAttribute(BOUND)) {
properties.bound = element.getAttribute(BOUND).toInt
}
if (element.hasAttribute(FAIRNESS)) {
properties.fairness = element.getAttribute(FAIRNESS).toBoolean
}
if (element.hasAttribute(CORE_POOL_SIZE)) {
properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
}
if (element.hasAttribute(MAX_POOL_SIZE)) {
properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
}
if (element.hasAttribute(KEEP_ALIVE)) {
properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
}
if (element.hasAttribute(REJECTION_POLICY)) {
properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
}
properties
}
def hasRef(element: Element): Boolean = {
val ref = element.getAttribute(REF)
(ref != null) && !ref.isEmpty
}
}

View file

@ -0,0 +1,58 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
/**
* Data container for dispatcher configuration data.
* @author michaelkober
*/
class DispatcherProperties {
var ref: String = ""
var dispatcherType: String = ""
var name: String = ""
var threadPool: ThreadPoolProperties = _
/**
* Sets the properties to the given builder.
* @param builder bean definition builder
*/
def setAsProperties(builder: BeanDefinitionBuilder) {
builder.addPropertyValue("properties", this)
}
override def toString : String = {
"DispatcherProperties[ref=" + ref +
", dispatcher-type=" + dispatcherType +
", name=" + name +
", threadPool=" + threadPool + "]"
}
}
/**
* Data container for thread pool configuration data.
* @author michaelkober
*/
class ThreadPoolProperties {
var queue = ""
var bound = -1
var capacity = -1
var fairness = false
var corePoolSize = -1
var maxPoolSize = -1
var keepAlive = -1L
var rejectionPolicy = ""
override def toString : String = {
"ThreadPoolProperties[queue=" + queue +
", bound=" + bound +
", capacity=" + capacity +
", fairness=" + fairness +
", corePoolSize=" + corePoolSize +
", maxPoolSize=" + maxPoolSize +
", keepAlive=" + keepAlive +
", policy=" + rejectionPolicy + "]"
}
}

View file

@ -18,7 +18,7 @@ import org.springframework.util.xml.DomUtils
* Parser for custom namespace for Akka declarative supervisor configuration.
* @author michaelkober
*/
class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectBeanDefinitionParser {
class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectParser {
/* (non-Javadoc)
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/

View file

@ -12,17 +12,22 @@ import ScalaDom._
import org.w3c.dom.Element
/**
* Test for ActiveObjectBeanDefinitionParser
* Test for ActiveObjectParser
* @author michaelkober
*/
@RunWith(classOf[JUnitRunner])
class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
private class Parser extends ActiveObjectBeanDefinitionParser
private class Parser extends ActiveObjectParser
describe("An ActiveObjectBeanDefinitionParser") {
describe("An ActiveObjectParser") {
val parser = new Parser()
it("should parse the active object configuration") {
val props = parser.parseActiveObject(createTestElement);
val xml = <akka:active-object id="active-object1"
target="foo.bar.MyPojo"
timeout="1000"
transactional="true"/>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.timeout == 1000)
assert(props.target == "foo.bar.MyPojo")
@ -30,22 +35,32 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
}
it("should throw IllegalArgumentException on missing mandatory attributes") {
evaluating { parser.parseActiveObject(createTestElement2) } should produce [IllegalArgumentException]
val xml = <akka:active-object id="active-object1"
timeout="1000"
transactional="true"/>
evaluating { parser.parseActiveObject(dom(xml).getDocumentElement) } should produce [IllegalArgumentException]
}
it("should parse ActiveObjects configuration with dispatcher") {
val xml = <akka:active-object id="active-object-with-dispatcher" target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000">
<akka:dispatcher type="thread-based" name="my-thread-based-dispatcher"/>
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcher.dispatcherType == "thread-based")
}
it("should parse remote ActiveObjects configuration") {
val xml = <akka:active-object id="remote active-object" target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000">
<akka:remote host="com.some.host" port="9999"/>
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.host == "com.some.host")
assert(props.port == 9999)
}
}
private def createTestElement : Element = {
val xml = <akka:active-object id="active-object1"
target="foo.bar.MyPojo"
timeout="1000"
transactional="true"/>
dom(xml).getDocumentElement
}
private def createTestElement2 : Element = {
val xml = <akka:active-object id="active-object1"
timeout="1000"
transactional="true"/>
dom(xml).getDocumentElement
}
}

View file

@ -29,13 +29,21 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
assert(bean.isRemote)
}
it("should create object that implements the given interface") {
bean.setInterface("com.biz.IPojo");
assert(bean.hasInterface)
}
it("should create an active object with dispatcher if dispatcher is set") {
val props = new DispatcherProperties()
props.dispatcherType = "executor-based-event-driven"
bean.setDispatcher(props);
assert(bean.hasDispatcher)
}
it("should return the object type") {
bean.setTarget("java.lang.String")
assert(bean.getObjectType == classOf[String])
}
it("should create an active object") {
// TODO:
}
}
}

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import ScalaDom._
/**
* Test for DispatcherBeanDefinitionParser
* @author michaelkober
*/
@RunWith(classOf[JUnitRunner])
class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
describe("A DispatcherBeanDefinitionParser") {
val parser = new DispatcherBeanDefinitionParser()
it("should be able to parse the dispatcher configuration") {
val xml = <akka:dispatcher id="dispatcher"
type="executor-based-event-driven"
name="myDispatcher" />
val props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcherType == "executor-based-event-driven")
assert(props.name == "myDispatcher")
}
it("should be able to parse the thread pool configuration") {
val xml = <akka:thread-pool queue="bounded-array-blocking-queue"
capacity="100"
fairness="true"
max-pool-size="40"
core-pool-size="6"
keep-alive="2000"
rejection-policy="caller-runs-policy" />
val props = parser.parseThreadPool(dom(xml).getDocumentElement);
assert(props != null)
assert(props.queue == "bounded-array-blocking-queue")
assert(props.capacity == 100)
assert(props.fairness)
assert(props.corePoolSize == 6)
assert(props.maxPoolSize == 40)
assert(props.keepAlive == 2000L)
assert(props.rejectionPolicy == "caller-runs-policy")
}
it("should be able to parse the dispatcher with a thread pool configuration") {
val xml = <akka:dispatcher id="dispatcher"
type="reactor-based-thread-pool-event-driven"
name="myDispatcher">
<akka:thread-pool queue="linked-blocking-queue"
capacity="50"
max-pool-size="10"
core-pool-size="2"
keep-alive="1000" />
</akka:dispatcher>
val props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcherType == "reactor-based-thread-pool-event-driven")
assert(props.name == "myDispatcher")
assert(props.threadPool.corePoolSize == 2)
assert(props.threadPool.maxPoolSize == 10)
assert(props.threadPool.keepAlive == 1000)
assert(props.threadPool.queue == "linked-blocking-queue")
}
it("should throw IllegalArgumentException on not existing reference") {
val xml = <akka:dispatcher ref="dispatcher" />
evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException]
}
it("should throw IllegalArgumentException on missing mandatory attributes") {
val xml = <akka:dispatcher id="dispatcher"
name="myDispatcher" />
evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException]
}
it("should throw IllegalArgumentException when configuring a single thread dispatcher with a thread pool") {
val xml = <akka:dispatcher id="reactor-based-single-thread-event-driven-dispatcher"
type="reactor-based-single-thread-event-driven"
name="myDispatcher">
<akka:thread-pool queue="synchronous-queue" fairness="true" />
</akka:dispatcher>
evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException]
}
it("should throw IllegalArgumentException when configuring a thread based dispatcher without ActiveObject") {
val xml = <akka:dispatcher id="dispatcher" type="thread-based" name="myDispatcher" />
evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException]
}
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.config.JavaConfig._
import se.scalablesolutions.akka.dispatch.MessageDispatcher
@RunWith(classOf[JUnitRunner])
class DispatcherFactoryBeanTest extends Spec with ShouldMatchers {
describe("A DispatcherFactoryBean") {
val bean = new DispatcherFactoryBean
it("should have java getters and setters for the dispatcher properties") {
val props = new DispatcherProperties()
bean.setProperties(props)
assert(bean.getProperties == props)
}
it("should return the object type MessageDispatcher") {
assert(bean.getObjectType == classOf[MessageDispatcher])
}
}
}