From 6a65c67ca7e2ea6c16a6f7197bdae159575911fb Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Tue, 7 Jul 2009 22:11:27 +0200 Subject: [PATCH] changed oneway to be defined by void in AO + added restart callback def in config --- akka.iws | 713 ++++++++++++------ .../se/scalablesolutions/akka/api/Foo.java | 9 +- .../akka/api/InMemoryStateTest.java | 10 +- .../akka/api/PersistentNestedStateTest.java | 14 +- .../akka/api/PersistentStateful.java | 3 +- .../akka/api/PersistentStatefulNested.java | 3 +- .../src/main/scala/actor/ActiveObject.scala | 156 +++- kernel/src/main/scala/actor/Actor.scala | 2 +- .../ActiveObjectGuiceConfigurator.scala | 4 +- kernel/src/main/scala/config/Config.scala | 27 +- .../main/scala/state/DataFlowVariable.scala | 4 +- 11 files changed, 675 insertions(+), 270 deletions(-) diff --git a/akka.iws b/akka.iws index 6af29de35c..373a17f66e 100644 --- a/akka.iws +++ b/akka.iws @@ -7,46 +7,17 @@ + + - - - - - - - - - - - - - - - - - - - - - - - - + + - - - - - - - - - - + @@ -80,7 +51,98 @@ - + - + - - - - - - + - + - - + + + + + + @@ -1956,6 +2239,7 @@ + @@ -1998,109 +2282,118 @@ - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java index d1a4dce5c7..1e86af8e74 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java @@ -12,17 +12,18 @@ public class Foo { public String foo(String msg) { return msg + "return_foo "; } - @oneway public void bar(String msg) { bar.bar(msg); } - public void longRunning() { + public String longRunning() { try { Thread.sleep(10000); } catch (InterruptedException e) { } + return "test"; } - public void throwsException() { - throw new RuntimeException("expected"); + public String throwsException() { + if (true) throw new RuntimeException("expected"); + return "test"; } } diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 4b4b8e975f..eede10b0b3 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -23,9 +23,13 @@ public class InMemoryStateTest extends TestCase { new RestartStrategy(new AllForOne(), 3, 5000), new Component[]{ // FIXME: remove string-name, add ctor to only accept target class - new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000), - new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000) - //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) + new Component(InMemStateful.class, + new LifeCycle(new Permanent(), 1000), + //new RestartCallbacks("preRestart", "postRestart")), + 10000), + new Component(InMemFailer.class, + new LifeCycle(new Permanent(), 1000), + 10000) }).inject().supervise(); } diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java index 9afaf8b625..8fc5a1194b 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java @@ -34,12 +34,12 @@ public class PersistentNestedStateTest extends TestCase { conf.stop(); } - public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class); stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } @@ -51,7 +51,7 @@ public class PersistentNestedStateTest extends TestCase { nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected @@ -64,7 +64,7 @@ public class PersistentNestedStateTest extends TestCase { stateful.setVectorState("init"); // set init state PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class); nested.setVectorState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional assertEquals(2, stateful.getVectorLength()); // BAD: keeps one element since last test assertEquals(2, nested.getVectorLength()); } @@ -76,7 +76,7 @@ public class PersistentNestedStateTest extends TestCase { nested.setVectorState("init"); // set init state PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected @@ -89,7 +89,7 @@ public class PersistentNestedStateTest extends TestCase { PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class); stateful.setRefState("init"); // set init state nested.setRefState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional assertEquals("new state", stateful.getRefState()); assertEquals("new state", nested.getRefState()); } @@ -101,7 +101,7 @@ public class PersistentNestedStateTest extends TestCase { nested.setRefState("init"); // set init state PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 02bd17bd6f..7561099270 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -61,11 +61,12 @@ public class PersistentStateful { return msg; } - public void success(String key, String msg, PersistentStatefulNested nested) { + public String success(String key, String msg, PersistentStatefulNested nested) { mapState.put(key, msg); vectorState.add(msg); refState.swap(msg); nested.success(key, msg); + return msg; } diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java index 4060940945..ac46efb051 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java @@ -46,10 +46,11 @@ public class PersistentStatefulNested { } - public void success(String key, String msg) { + public String success(String key, String msg) { mapState.put(key, msg); vectorState.add(msg); refState.swap(msg); + return msg; } diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala index fbadeb6a38..1c4619fd30 100644 --- a/kernel/src/main/scala/actor/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.kernel.actor -import java.lang.reflect.Method +import java.lang.reflect.{InvocationTargetException, Method} import java.net.InetSocketAddress import kernel.config.ScalaConfig._ import kernel.nio.{RemoteRequest, RemoteClient} @@ -37,37 +37,73 @@ class ActiveObjectFactory { // FIXME How to pass the MessageDispatcher on from active object to child??????? def newInstance[T](target: Class[T], timeout: Long): T = - ActiveObject.newInstance(target, new Dispatcher, None, timeout) + ActiveObject.newInstance(target, new Dispatcher(None), None, timeout) + + def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = + ActiveObject.newInstance(target, new Dispatcher(restartCallbacks), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - ActiveObject.newInstance(intf, target, new Dispatcher, None, timeout) + ActiveObject.newInstance(intf, target, new Dispatcher(None), None, timeout) + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = + ActiveObject.newInstance(intf, target, new Dispatcher(restartCallbacks), None, timeout) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - ActiveObject.newInstance(target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout) + ActiveObject.newInstance(target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = + ActiveObject.newInstance(target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = - ActiveObject.newInstance(intf, target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout) + ActiveObject.newInstance(intf, target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = + ActiveObject.newInstance(intf, target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) actor.dispatcher = dispatcher ActiveObject.newInstance(target, actor, None, timeout) } - + + def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) + actor.dispatcher = dispatcher + ActiveObject.newInstance(target, actor, None, timeout) + } + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + ActiveObject.newInstance(intf, target, actor, None, timeout) + } + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher ActiveObject.newInstance(intf, target, actor, None, timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @@ -85,12 +121,12 @@ class ActiveObjectFactory { /* def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = { - val actor = new Dispatcher(target.getName) + val actor = new Dispatcher(None)(target.getName) ActiveObject.newInstance(target, actor) } def newInstanceAndLink[T](intf: Class[T], target: AnyRef, supervisor: AnyRef): T = { - val actor = new Dispatcher(target.getName) + val actor = new Dispatcher(None)(target.getName) ActiveObject.newInstance(intf, target, actor) } */ @@ -106,37 +142,73 @@ object ActiveObject { val AKKA_CAMEL_ROUTING_SCHEME = "akka" def newInstance[T](target: Class[T], timeout: Long): T = - newInstance(target, new Dispatcher, None, timeout) + newInstance(target, new Dispatcher(None), None, timeout) + + def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(target, new Dispatcher(restartCallbacks), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - newInstance(intf, target, new Dispatcher, None, timeout) + newInstance(intf, target, new Dispatcher(None), None, timeout) + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(intf, target, new Dispatcher(restartCallbacks), None, timeout) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - newInstance(target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = - newInstance(intf, target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(intf, target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(intf, target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + newInstance(target, actor, None, timeout) + } + + def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher newInstance(target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + newInstance(intf, target, actor, None, timeout) + } + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher + val actor = new Dispatcher(None) + actor.dispatcher = dispatcher + newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(restartCallbacks) actor.dispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @@ -224,8 +296,8 @@ sealed class ActorAroundAdvice(val target: Class[_], } else future.result.asInstanceOf[Option[T]] private def isOneWay(rtti: MethodRtti) = - rtti.getMethod.isAnnotationPresent(Annotations.oneway) // FIXME investigate why @oneway causes TX to race - //rtti.getMethod.getReturnType == java.lang.Void.TYPE + rtti.getMethod.getReturnType == java.lang.Void.TYPE || + rtti.getMethod.isAnnotationPresent(Annotations.oneway) } /** @@ -259,8 +331,9 @@ sealed class ActorAroundAdvice(val target: Class[_], * * @author Jonas Bonér */ -private[kernel] class Dispatcher extends Actor { +private[kernel] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends Actor { private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() + private val ZERO_ITEM_OBJECT_ARRAY = Array[Object[_]]() private[actor] var target: Option[AnyRef] = None private var preRestart: Option[Method] = None @@ -268,20 +341,33 @@ private[kernel] class Dispatcher extends Actor { private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = { if (targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactional - id = targetClass.getName target = Some(targetInstance) val methods = targetInstance.getClass.getDeclaredMethods.toList - preRestart = methods.find( m => m.isAnnotationPresent(Annotations.prerestart) && m.getName.startsWith("aw$original")) - if (preRestart.isDefined) preRestart.get.setAccessible(true) - if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) - throw new IllegalStateException("Method annotated with @prerestart in [" + targetClass.getName + "] must have a zero argument definition") + // See if we have any config define restart callbacks + callbacks match { + case None => {} + case Some(RestartCallbacks(pre, post)) => + preRestart = Some(try { + targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*) + } catch { case e => throw new IllegalStateException("Could not find pre restart method [" + pre + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") }) + postRestart = Some(try { + targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*) + } catch { case e => throw new IllegalStateException("Could not find post restart method [" + post + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") }) + } - postRestart = methods.find( m => m.isAnnotationPresent(Annotations.postrestart) && m.getName.startsWith("aw$original")) - if (postRestart.isDefined) postRestart.get.setAccessible(true) + // See if we have any annotation defined restart callbacks + if (!preRestart.isDefined) preRestart = methods.find( m => m.isAnnotationPresent(Annotations.prerestart)) + if (!postRestart.isDefined) postRestart = methods.find( m => m.isAnnotationPresent(Annotations.postrestart)) + + if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) + throw new IllegalStateException("Method annotated with @prerestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition") if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0) - throw new IllegalStateException("Method annotated with @postrestart in [" + targetClass.getName + "] must have a zero argument definition") + throw new IllegalStateException("Method annotated with @postrestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition") + + if (preRestart.isDefined) preRestart.get.setAccessible(true) + if (postRestart.isDefined) postRestart.get.setAccessible(true) } override def receive: PartialFunction[Any, Unit] = { @@ -293,11 +379,15 @@ private[kernel] class Dispatcher extends Actor { } override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) { - if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY: _*) + try { + if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + } catch { case e: InvocationTargetException => throw e.getCause } } override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { - if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY: _*) + try { + if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + } catch { case e: InvocationTargetException => throw e.getCause } } } diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala index c245d6af7d..e4dc5b0705 100644 --- a/kernel/src/main/scala/actor/Actor.scala +++ b/kernel/src/main/scala/actor/Actor.scala @@ -508,7 +508,7 @@ trait Actor extends Logging with TransactionManagement { case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.") // FIXME implement support for shutdown time - case Some(LifeCycle(scope, shutdownTime)) => { + case Some(LifeCycle(scope, shutdownTime, _)) => { scope match { case Permanent => { preRestart(reason, config) diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index fb5aa637f0..fbc061bb1b 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -98,7 +98,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target - val actor = new Dispatcher + val actor = new Dispatcher(component.lifeCycle.callbacks) if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get) val remoteAddress = if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) @@ -113,7 +113,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC val targetClass = component.intf.get val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) - val actor = new Dispatcher + val actor = new Dispatcher(component.lifeCycle.callbacks) if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get) val remoteAddress = if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) diff --git a/kernel/src/main/scala/config/Config.scala b/kernel/src/main/scala/config/Config.scala index 23e29b8e44..0df243a338 100644 --- a/kernel/src/main/scala/config/Config.scala +++ b/kernel/src/main/scala/config/Config.scala @@ -29,9 +29,18 @@ object ScalaConfig { case object AllForOne extends FailOverScheme case object OneForOne extends FailOverScheme - case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement { - def this(scope: Scope) = this(scope, 0) + case class LifeCycle(scope: Scope, + shutdownTime: Int, + callbacks: Option[RestartCallbacks] // optional + ) extends ConfigElement + object LifeCycle { + def apply(scope: Scope, shutdownTime: Int) = new LifeCycle(scope, shutdownTime, None) + def apply(scope: Scope) = new LifeCycle(scope, 0, None) } + case class RestartCallbacks(preRestart: String, postRestart: String) { + if (preRestart == null || postRestart == null) throw new IllegalArgumentException("Restart callback methods can't be null") + } + case object Permanent extends Scope case object Transient extends Scope case object Temporary extends Scope @@ -89,12 +98,18 @@ object JavaConfig { def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartStrategy( scheme.transform, maxNrOfRetries, withinTimeRange) } -// class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int, val callbacks: RestartCallbacks) extends ConfigElement { - class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends ConfigElement { - def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime) + + class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement { + def this(scope: Scope, shutdownTime: Int) = this(scope, shutdownTime, null) + def transform = { + val callbackOption = if (callbacks == null) None else Some(callbacks.transform) + se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime, callbackOption) + } } - class RestartCallbacks(val preRestart: String, val postRestart: String) + class RestartCallbacks(@BeanProperty val preRestart: String, @BeanProperty val postRestart: String) { + def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartCallbacks(preRestart, postRestart) + } abstract class Scope extends ConfigElement { def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.Scope diff --git a/kernel/src/main/scala/state/DataFlowVariable.scala b/kernel/src/main/scala/state/DataFlowVariable.scala index 5e8c8ac77c..17b851361e 100644 --- a/kernel/src/main/scala/state/DataFlowVariable.scala +++ b/kernel/src/main/scala/state/DataFlowVariable.scala @@ -59,8 +59,8 @@ object DataFlow { private class In[T](dataFlow: DataFlowVariable[T]) extends Actor { def act = loop { react { - case Set(v) => - if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { + case Set(v) => + if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { val iterator = dataFlow.blockedReaders.iterator while (iterator.hasNext) iterator.next ! Set(v) dataFlow.blockedReaders.clear