Remove initTransactionalState, renamed init and shutdown

This commit is contained in:
Viktor Klang 2010-09-13 11:08:43 +02:00
parent c7b555f81e
commit ceff8bd9e8
35 changed files with 83 additions and 83 deletions

View file

@ -410,14 +410,14 @@ trait Actor extends Logging {
* <p/>
* Is called when an Actor is started by invoking 'actor.start'.
*/
def init {}
def preStart {}
/**
* User overridable callback.
* <p/>
* Is called when 'actor.stop' is invoked.
*/
def shutdown {}
def postStop {}
/**
* User overridable callback.
@ -433,13 +433,6 @@ trait Actor extends Logging {
*/
def postRestart(reason: Throwable) {}
/**
* User overridable callback.
* <p/>
* Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
/**
* Is the actor able to handle the message passed in as arguments?
*/

View file

@ -826,7 +826,7 @@ class LocalActorRef private[akka](
_transactionFactory = None
_isRunning = false
_isShutDown = true
actor.shutdown
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
if(remoteAddress.isDefined)
@ -1132,8 +1132,7 @@ class LocalActorRef private[akka](
failedActor.preRestart(reason)
nullOutActorRefReferencesFor(failedActor)
val freshActor = newActor
freshActor.init
freshActor.initTransactionalState
freshActor.preStart
actorInstance.set(freshActor)
if (failedActor.isInstanceOf[Proxyable])
failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor)
@ -1301,8 +1300,7 @@ class LocalActorRef private[akka](
}
private def initializeActorInstance = {
actor.init // run actor init and initTransactionalState callbacks
actor.initTransactionalState
actor.preStart // run actor preStart
Actor.log.trace("[%s] has started", toString)
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)

View file

@ -187,7 +187,7 @@ final class SupervisorActor private[akka] (
trapExit = trapExceptions
faultHandler = Some(handler)
override def shutdown(): Unit = shutdownLinkedActors
override def postStop(): Unit = shutdownLinkedActors
def receive = {
// FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor

View file

@ -78,7 +78,7 @@ object Chameneos {
var sumMeetings = 0
var numFaded = 0
override def init = {
override def preStart = {
for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i))
}

View file

@ -42,7 +42,7 @@ class RestartStrategySpec extends JUnitSuite {
restartLatch.open
}
override def shutdown = {
override def postStop = {
if (restartLatch.isOpen) {
secondRestartLatch.open
}

View file

@ -53,7 +53,7 @@ object HawtDispatcherEchoServer {
var accept_source:DispatchSource = _
var sessions = ListBuffer[ActorRef]()
override def init = {
override def preStart = {
channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
@ -122,7 +122,7 @@ object HawtDispatcherEchoServer {
var writeCounter = 0L
var closed = false
override def init = {
override def preStart = {
if(useReactorPattern) {
// Then we will be using the reactor pattern for handling IO:
@ -154,7 +154,7 @@ object HawtDispatcherEchoServer {
println("Accepted connection from: "+remote_address);
}
override def shutdown = {
override def postStop = {
closed = true
read_source.release
write_source.release

View file

@ -85,7 +85,7 @@ class ThreadBasedDispatcherSpec extends JUnitSuite {
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
dispatcher.shutdown
dispatcher.postStop
}
}
*/

View file

@ -108,10 +108,10 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
super.preRestart(reason)
}
override def shutdown = {
override def postStop = {
listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag)))
self.shutdownLinkedActors
super.shutdown
super.postStop
}
override def toString =

View file

@ -46,7 +46,7 @@ object ExampleSession {
printTopic("Happy hAkking :-)")
// shutdown everything the amqp tree except the main AMQP supervisor
// postStop everything the amqp tree except the main AMQP supervisor
// all connections/consumers/producers will be stopped
AMQP.shutdownAll

View file

@ -103,5 +103,5 @@ abstract private[amqp] class FaultTolerantChannelActor(
closeChannel
}
override def shutdown = closeChannel
override def postStop = closeChannel
}

View file

@ -104,9 +104,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
connectionCallback.foreach(cb => if (cb.isRunning) cb ! message)
}
override def shutdown = {
override def postStop = {
reconnectionTimer.cancel
// make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed
// make sure postStop is called on all linked actors so they can do channel cleanup before connection is killed
self.shutdownLinkedActors
disconnect
}

View file

@ -40,9 +40,9 @@ class RpcClientActor[I,O](
}
override def shutdown = {
override def postStop = {
rpcClient.foreach(rpc => rpc.close)
super.shutdown
super.postStop
}
override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]"

View file

@ -54,10 +54,10 @@ trait ProducerSupport { this: Actor =>
def headersToCopy: Set[String] = headersToCopyDefault
/**
* Default implementation of <code>Actor.shutdown</code> for freeing resources needed
* Default implementation of <code>Actor.postStop</code> for freeing resources needed
* to actually send messages to <code>endpointUri</code>.
*/
override def shutdown {
override def postStop {
processor.stop
}

View file

@ -24,6 +24,11 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter {
@BeanProperty var clusterName = ""
@BeanProperty var broadcaster : Broadcaster = null
def init() {
//Since this class is instantiated by Atmosphere, we need to make sure it's started
self.start
}
/**
* Stops the actor
*/
@ -48,7 +53,4 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter {
case b @ ClusterCometBroadcast(c, _) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b
case _ =>
}
//Since this class is instantiated by Atmosphere, we need to make sure it's started
self.start
}

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.util.{Logging, Bootable}
import javax.servlet.{ServletContextListener, ServletContextEvent}
/**
* This class can be added to web.xml mappings as a listener to start and shutdown Akka.
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
*
*<web-app>
* ...

View file

@ -36,6 +36,6 @@ class AtomikosTransactionService extends TransactionService with TransactionProt
"Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString)
}
)))
// TODO: gracefully shutdown of the TM
//txService.shutdown(false)
// TODO: gracefully postStop of the TM
//txService.postStop(false)
}

View file

@ -15,7 +15,7 @@ object Main {
}
/**
* The Akka Kernel, is used to start And shutdown Akka in standalone/kernel mode.
* The Akka Kernel, is used to start And postStop Akka in standalone/kernel mode.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -86,11 +86,11 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
@volatile private var local: Node = Node(Nil)
@volatile private var remotes: Map[ADDR_T, Node] = Map()
override def init = {
override def preStart = {
remotes = new HashMap[ADDR_T, Node]
}
override def shutdown = {
override def postStop = {
remotes = Map()
}

View file

@ -54,8 +54,8 @@ class JGroupsClusterActor extends BasicClusterActor {
protected def toAllNodes(msg : Array[Byte]): Unit =
for (c <- channel) c.send(new JG_MSG(null, null, msg))
override def shutdown = {
super.shutdown
override def postStop = {
super.postStop
log info ("Shutting down %s", toString)
isActive = false
channel.foreach(Util shutdown _)

View file

@ -185,7 +185,7 @@ class RemoteClient private[akka] (
extends Logging with ListenerManagement {
val name = "RemoteClient@" + hostname + "::" + port
//FIXME Should these be clear:ed on shutdown?
//FIXME Should these be clear:ed on postStop?
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, ActorRef]

View file

@ -398,7 +398,7 @@ class RemoteServerHandler(
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
/**
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
* ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer.
* If a channel is closed before, it is automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)

View file

@ -3920,7 +3920,7 @@ public final class RemoteProtocol {
public boolean hasInit() { return hasInit; }
public java.lang.String getInit() { return init_; }
// optional string shutdown = 5;
// optional string postStop = 5;
public static final int SHUTDOWN_FIELD_NUMBER = 5;
private boolean hasShutdown;
private java.lang.String shutdown_ = "";
@ -4295,7 +4295,7 @@ public final class RemoteProtocol {
return this;
}
// optional string shutdown = 5;
// optional string postStop = 5;
public boolean hasShutdown() {
return result.hasShutdown();
}

View file

@ -67,7 +67,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
// make sure the servers postStop cleanly after the test has finished
@After
def finished {
try {

View file

@ -47,7 +47,7 @@ class ProtobufActorMessageSerializationSpec extends JUnitSuite {
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
// make sure the servers postStop cleanly after the test has finished
@After
def finished() {
server.shutdown

View file

@ -185,7 +185,7 @@ trait ChatServer extends Actor {
protected def sessionManagement: Receive
protected def shutdownSessions(): Unit
override def shutdown = {
override def postStop = {
log.info("Chat server is shutting down...")
shutdownSessions
self.unlink(storage)
@ -205,7 +205,7 @@ class ChatService extends
SessionManagement with
ChatManagement with
RedisChatStorageFactory {
override def init = {
override def preStart = {
RemoteNode.start("localhost", 9999)
RemoteNode.register("chat:service", self)
}

View file

@ -14,7 +14,7 @@ public class Pojo extends TypedActor implements PojoInf, ApplicationContextAware
private String stringFromRef;
private boolean gotApplicationContext = false;
private boolean initInvoked = false;
private boolean preStartInvoked = false;
public boolean gotApplicationContext() {
return gotApplicationContext;
@ -41,11 +41,11 @@ public class Pojo extends TypedActor implements PojoInf, ApplicationContextAware
}
@Override
public void init() {
initInvoked = true;
public void preStart() {
preStartInvoked = true;
}
public boolean isInitInvoked() {
return initInvoked;
public boolean isPreStartInvoked() {
return preStartInvoked;
}
}

View file

@ -8,6 +8,6 @@ public interface PojoInf {
public String getStringFromVal();
public String getStringFromRef();
public boolean gotApplicationContext();
public boolean isInitInvoked();
public boolean isPreStartInvoked();
}

View file

@ -19,7 +19,7 @@ public class SampleBean extends TypedActor implements SampleBeanIntf {
}
@Override
public void shutdown() {
public void postStop() {
down = true;
}
}

View file

@ -5,6 +5,7 @@ import se.scalablesolutions.akka.stm.TransactionalMap;
import se.scalablesolutions.akka.stm.TransactionalVector;
import se.scalablesolutions.akka.stm.Ref;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.stm.local.Atomic;
public class StatefulPojo extends TypedActor {
private TransactionalMap<String, String> mapState;
@ -13,12 +14,16 @@ public class StatefulPojo extends TypedActor {
private boolean isInitialized = false;
@Override
public void initTransactionalState() {
if (!isInitialized) {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
public void preStart() {
if(!isInitialized) {
isInitialized = new Atomic<Boolean>() {
public Boolean atomically() {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
return true;
}
}.execute();
}
}

View file

@ -68,7 +68,7 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
it("should create an application context and verify dependency injection for typed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val ta = ctx.getBean("typedActor").asInstanceOf[PojoInf];
assert(ta.isInitInvoked)
assert(ta.isPreStartInvoked)
assert(ta.getStringFromVal === "akka rocks")
assert(ta.getStringFromRef === "spring rocks")
assert(ta.gotApplicationContext)

View file

@ -41,12 +41,12 @@ import scala.reflect.BeanProperty
* }
*
* @Override
* public void init() {
* public void preStart() {
* ... // optional initialization on start
* }
*
* @Override
* public void shutdown() {
* public void postStop() {
* ... // optional cleanup on stop
* }
*
@ -79,11 +79,11 @@ import scala.reflect.BeanProperty
*
* def square(x: Int): Future[Integer] = future(x * x)
*
* override def init = {
* override def preStart = {
* ... // optional initialization on start
* }
*
* override def shutdown = {
* override def postStop = {
* ... // optional cleanup on stop
* }
*
@ -519,11 +519,7 @@ object TypedActor extends Logging {
val typedActor =
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalArgumentException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
typedActor.init
import se.scalablesolutions.akka.stm.local.atomic
atomic {
typedActor.initTransactionalState
}
typedActor.preStart
typedActor
}

View file

@ -10,7 +10,7 @@ public class NestedTransactionalTypedActorImpl extends TypedTransactor implement
private boolean isInitialized = false;
@Override
public void init() {
public void preStart() {
if (!isInitialized) {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();

View file

@ -38,7 +38,7 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
}
@Override
public void shutdown() {
public void postStop() {
_down = true;
latch.countDown();
}

View file

@ -2,6 +2,8 @@ package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.stm.*;
import se.scalablesolutions.akka.stm.local.*;
import se.scalablesolutions.akka.stm.local.Atomic;
public class TransactionalTypedActorImpl extends TypedTransactor implements TransactionalTypedActor {
private TransactionalMap<String, String> mapState;
@ -10,12 +12,16 @@ public class TransactionalTypedActorImpl extends TypedTransactor implements Tran
private boolean isInitialized = false;
@Override
public void initTransactionalState() {
public void preStart() {
if (!isInitialized) {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
isInitialized = new Atomic<Boolean>() {
public Boolean atomically() {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
return true;
}
}.execute();
}
}

View file

@ -95,7 +95,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
/*
it("should shutdown non-supervised, annotated typed actor on TypedActor.stop") {
it("should postStop non-supervised, annotated typed actor on TypedActor.stop") {
val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
assert(AspectInitRegistry.initFor(obj) ne null)
assert("hello akka" === obj.greet("akka"))
@ -112,7 +112,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
}
it("should shutdown non-supervised, annotated typed actor on ActorRegistry.shutdownAll") {
it("should postStop non-supervised, annotated typed actor on ActorRegistry.shutdownAll") {
val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
assert(AspectInitRegistry.initFor(obj) ne null)
assert("hello akka" === obj.greet("akka"))
@ -147,7 +147,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
}
it("should shutdown supervised, annotated typed actor on failure") {
it("should postStop supervised, annotated typed actor on failure") {
val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
val cdl = obj.newCountdownLatch(1)
assert(AspectInitRegistry.initFor(obj) ne null)