Added a new project akka-persistence-hbase

This commit is contained in:
David Greco 2010-09-15 09:31:53 +02:00
commit 848e0cbbfe
41 changed files with 1020 additions and 112 deletions

View file

@ -410,14 +410,14 @@ trait Actor extends Logging {
* <p/>
* Is called when an Actor is started by invoking 'actor.start'.
*/
def init {}
def preStart {}
/**
* User overridable callback.
* <p/>
* Is called when 'actor.stop' is invoked.
*/
def shutdown {}
def postStop {}
/**
* User overridable callback.
@ -433,13 +433,6 @@ trait Actor extends Logging {
*/
def postRestart(reason: Throwable) {}
/**
* User overridable callback.
* <p/>
* Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
/**
* Is the actor able to handle the message passed in as arguments?
*/

View file

@ -817,7 +817,7 @@ class LocalActorRef private[akka](
_transactionFactory = None
_isRunning = false
_isShutDown = true
actor.shutdown
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
if(remoteAddress.isDefined)
@ -1123,8 +1123,7 @@ class LocalActorRef private[akka](
failedActor.preRestart(reason)
nullOutActorRefReferencesFor(failedActor)
val freshActor = newActor
freshActor.init
freshActor.initTransactionalState
freshActor.preStart
actorInstance.set(freshActor)
if (failedActor.isInstanceOf[Proxyable])
failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor)
@ -1292,8 +1291,7 @@ class LocalActorRef private[akka](
}
private def initializeActorInstance = {
actor.init // run actor init and initTransactionalState callbacks
actor.initTransactionalState
actor.preStart // run actor preStart
Actor.log.trace("[%s] has started", toString)
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)

View file

@ -187,7 +187,7 @@ final class SupervisorActor private[akka] (
trapExit = trapExceptions
faultHandler = Some(handler)
override def shutdown(): Unit = shutdownLinkedActors
override def postStop(): Unit = shutdownLinkedActors
def receive = {
// FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor

View file

@ -78,7 +78,7 @@ object Chameneos {
var sumMeetings = 0
var numFaded = 0
override def init = {
override def preStart = {
for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i))
}

View file

@ -42,7 +42,7 @@ class RestartStrategySpec extends JUnitSuite {
restartLatch.open
}
override def shutdown = {
override def postStop = {
if (restartLatch.isOpen) {
secondRestartLatch.open
}

View file

@ -53,7 +53,7 @@ object HawtDispatcherEchoServer {
var accept_source:DispatchSource = _
var sessions = ListBuffer[ActorRef]()
override def init = {
override def preStart = {
channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
@ -122,7 +122,7 @@ object HawtDispatcherEchoServer {
var writeCounter = 0L
var closed = false
override def init = {
override def preStart = {
if(useReactorPattern) {
// Then we will be using the reactor pattern for handling IO:
@ -154,7 +154,7 @@ object HawtDispatcherEchoServer {
println("Accepted connection from: "+remote_address);
}
override def shutdown = {
override def postStop = {
closed = true
read_source.release
write_source.release

View file

@ -85,7 +85,7 @@ class ThreadBasedDispatcherSpec extends JUnitSuite {
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
dispatcher.shutdown
dispatcher.postStop
}
}
*/

View file

@ -108,10 +108,10 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
super.preRestart(reason)
}
override def shutdown = {
override def postStop = {
listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag)))
self.shutdownLinkedActors
super.shutdown
super.postStop
}
override def toString =

View file

@ -46,7 +46,7 @@ object ExampleSession {
printTopic("Happy hAkking :-)")
// shutdown everything the amqp tree except the main AMQP supervisor
// postStop everything the amqp tree except the main AMQP supervisor
// all connections/consumers/producers will be stopped
AMQP.shutdownAll

View file

@ -103,5 +103,5 @@ abstract private[amqp] class FaultTolerantChannelActor(
closeChannel
}
override def shutdown = closeChannel
override def postStop = closeChannel
}

View file

@ -104,9 +104,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
connectionCallback.foreach(cb => if (cb.isRunning) cb ! message)
}
override def shutdown = {
override def postStop = {
reconnectionTimer.cancel
// make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed
// make sure postStop is called on all linked actors so they can do channel cleanup before connection is killed
self.shutdownLinkedActors
disconnect
}

View file

@ -40,9 +40,9 @@ class RpcClientActor[I,O](
}
override def shutdown = {
override def postStop = {
rpcClient.foreach(rpc => rpc.close)
super.shutdown
super.postStop
}
override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]"

View file

@ -54,10 +54,10 @@ trait ProducerSupport { this: Actor =>
def headersToCopy: Set[String] = headersToCopyDefault
/**
* Default implementation of <code>Actor.shutdown</code> for freeing resources needed
* Default implementation of <code>Actor.postStop</code> for freeing resources needed
* to actually send messages to <code>endpointUri</code>.
*/
override def shutdown {
override def postStop {
processor.stop
}

View file

@ -24,6 +24,11 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter {
@BeanProperty var clusterName = ""
@BeanProperty var broadcaster : Broadcaster = null
def init() {
//Since this class is instantiated by Atmosphere, we need to make sure it's started
self.start
}
/**
* Stops the actor
*/
@ -48,7 +53,4 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter {
case b @ ClusterCometBroadcast(c, _) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b
case _ =>
}
//Since this class is instantiated by Atmosphere, we need to make sure it's started
self.start
}

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.util.{Logging, Bootable}
import javax.servlet.{ServletContextListener, ServletContextEvent}
/**
* This class can be added to web.xml mappings as a listener to start and shutdown Akka.
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
*
*<web-app>
* ...

View file

@ -36,6 +36,6 @@ class AtomikosTransactionService extends TransactionService with TransactionProt
"Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString)
}
)))
// TODO: gracefully shutdown of the TM
//txService.shutdown(false)
// TODO: gracefully postStop of the TM
//txService.postStop(false)
}

View file

@ -15,7 +15,7 @@ object Main {
}
/**
* The Akka Kernel, is used to start And shutdown Akka in standalone/kernel mode.
* The Akka Kernel, is used to start And postStop Akka in standalone/kernel mode.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -302,13 +302,23 @@ private [akka] object RedisStorageBackend extends
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zadd(name, zscore, byteArrayToString(item))
.map { case 1 => true }.getOrElse(false)
.map { e =>
e match {
case 1 => true
case _ => false
}
}.getOrElse(false)
}
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zrem(name, byteArrayToString(item))
.map { case 1 => true }.getOrElse(false)
.map { e =>
e match {
case 1 => true
case _ => false
}
}.getOrElse(false)
}
// cardinality of the set identified by name
@ -349,6 +359,7 @@ private [akka] object RedisStorageBackend extends
case e: java.lang.NullPointerException =>
throw new StorageException("Could not connect to Redis server")
case e =>
e.printStackTrace
throw new StorageException("Error in Redis: " + e.getMessage)
}
}

View file

@ -81,6 +81,7 @@ message RemoteRequestProtocol {
required bool isOneWay = 4;
optional string supervisorUuid = 5;
optional RemoteActorRefProtocol sender = 6;
repeated MetadataEntryProtocol metadata = 7;
}
/**
@ -93,6 +94,23 @@ message RemoteReplyProtocol {
optional string supervisorUuid = 4;
required bool isActor = 5;
required bool isSuccessful = 6;
repeated MetadataEntryProtocol metadata = 7;
}
/**
* Defines a UUID.
*/
message UuidProtocol {
required uint64 high = 1;
required uint64 low = 2;
}
/**
* Defines a meta data entry.
*/
message MetadataEntryProtocol {
required string key = 1;
required bytes value = 2;
}
/**

View file

@ -86,11 +86,11 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
@volatile private var local: Node = Node(Nil)
@volatile private var remotes: Map[ADDR_T, Node] = Map()
override def init = {
override def preStart = {
remotes = new HashMap[ADDR_T, Node]
}
override def shutdown = {
override def postStop = {
remotes = Map()
}

View file

@ -54,8 +54,8 @@ class JGroupsClusterActor extends BasicClusterActor {
protected def toAllNodes(msg : Array[Byte]): Unit =
for (c <- channel) c.send(new JG_MSG(null, null, msg))
override def shutdown = {
super.shutdown
override def postStop = {
super.postStop
log info ("Shutting down %s", toString)
isActive = false
channel.foreach(Util shutdown _)

View file

@ -205,7 +205,7 @@ class RemoteClient private[akka] (
extends Logging with ListenerManagement {
val name = "RemoteClient@" + hostname + "::" + port
//FIXME Should these be clear:ed on shutdown?
//FIXME Should these be clear:ed on postStop?
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, ActorRef]

View file

@ -423,7 +423,7 @@ class RemoteServerHandler(
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
/**
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
* ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer.
* If a channel is closed before, it is automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)

View file

@ -3920,7 +3920,7 @@ public final class RemoteProtocol {
public boolean hasInit() { return hasInit; }
public java.lang.String getInit() { return init_; }
// optional string shutdown = 5;
// optional string postStop = 5;
public static final int SHUTDOWN_FIELD_NUMBER = 5;
private boolean hasShutdown;
private java.lang.String shutdown_ = "";
@ -4295,7 +4295,7 @@ public final class RemoteProtocol {
return this;
}
// optional string shutdown = 5;
// optional string postStop = 5;
public boolean hasShutdown() {
return result.hasShutdown();
}

View file

@ -67,7 +67,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
// make sure the servers postStop cleanly after the test has finished
@After
def finished {
try {

View file

@ -47,7 +47,7 @@ class ProtobufActorMessageSerializationSpec extends JUnitSuite {
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
// make sure the servers postStop cleanly after the test has finished
@After
def finished() {
server.shutdown

View file

@ -185,7 +185,7 @@ trait ChatServer extends Actor {
protected def sessionManagement: Receive
protected def shutdownSessions(): Unit
override def shutdown = {
override def postStop = {
log.info("Chat server is shutting down...")
shutdownSessions
self.unlink(storage)
@ -205,7 +205,7 @@ class ChatService extends
SessionManagement with
ChatManagement with
RedisChatStorageFactory {
override def init = {
override def preStart = {
RemoteNode.start("localhost", 9999)
RemoteNode.register("chat:service", self)
}

View file

@ -14,7 +14,7 @@ public class Pojo extends TypedActor implements PojoInf, ApplicationContextAware
private String stringFromRef;
private boolean gotApplicationContext = false;
private boolean initInvoked = false;
private boolean preStartInvoked = false;
public boolean gotApplicationContext() {
return gotApplicationContext;
@ -41,11 +41,11 @@ public class Pojo extends TypedActor implements PojoInf, ApplicationContextAware
}
@Override
public void init() {
initInvoked = true;
public void preStart() {
preStartInvoked = true;
}
public boolean isInitInvoked() {
return initInvoked;
public boolean isPreStartInvoked() {
return preStartInvoked;
}
}

View file

@ -8,6 +8,6 @@ public interface PojoInf {
public String getStringFromVal();
public String getStringFromRef();
public boolean gotApplicationContext();
public boolean isInitInvoked();
public boolean isPreStartInvoked();
}

View file

@ -19,7 +19,7 @@ public class SampleBean extends TypedActor implements SampleBeanIntf {
}
@Override
public void shutdown() {
public void postStop() {
down = true;
}
}

View file

@ -5,6 +5,7 @@ import se.scalablesolutions.akka.stm.TransactionalMap;
import se.scalablesolutions.akka.stm.TransactionalVector;
import se.scalablesolutions.akka.stm.Ref;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.stm.local.Atomic;
public class StatefulPojo extends TypedActor {
private TransactionalMap<String, String> mapState;
@ -13,12 +14,16 @@ public class StatefulPojo extends TypedActor {
private boolean isInitialized = false;
@Override
public void initTransactionalState() {
if (!isInitialized) {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
public void preStart() {
if(!isInitialized) {
isInitialized = new Atomic<Boolean>() {
public Boolean atomically() {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
return true;
}
}.execute();
}
}

View file

@ -68,7 +68,7 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
it("should create an application context and verify dependency injection for typed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val ta = ctx.getBean("typedActor").asInstanceOf[PojoInf];
assert(ta.isInitInvoked)
assert(ta.isPreStartInvoked)
assert(ta.getStringFromVal === "akka rocks")
assert(ta.getStringFromRef === "spring rocks")
assert(ta.gotApplicationContext)

View file

@ -40,12 +40,12 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
* }
*
* @Override
* public void init() {
* public void preStart() {
* ... // optional initialization on start
* }
*
* @Override
* public void shutdown() {
* public void postStop() {
* ... // optional cleanup on stop
* }
*
@ -78,11 +78,11 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
*
* def square(x: Int): Future[Integer] = future(x * x)
*
* override def init = {
* override def preStart = {
* ... // optional initialization on start
* }
*
* override def shutdown = {
* override def postStop = {
* ... // optional cleanup on stop
* }
*
@ -542,11 +542,7 @@ object TypedActor extends Logging {
val typedActor =
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalArgumentException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
typedActor.init
import se.scalablesolutions.akka.stm.local.atomic
atomic {
typedActor.initTransactionalState
}
typedActor.preStart
typedActor
}
@ -764,4 +760,4 @@ private[akka] sealed case class AspectInit(
/**
* Marker interface for server manager typed actors.
*/
private[akka] sealed trait ServerManagedTypedActor extends TypedActor
private[akka] sealed trait ServerManagedTypedActor extends TypedActor

View file

@ -10,7 +10,7 @@ public class NestedTransactionalTypedActorImpl extends TypedTransactor implement
private boolean isInitialized = false;
@Override
public void init() {
public void preStart() {
if (!isInitialized) {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();

View file

@ -38,7 +38,7 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
}
@Override
public void shutdown() {
public void postStop() {
_down = true;
latch.countDown();
}

View file

@ -2,6 +2,8 @@ package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.stm.*;
import se.scalablesolutions.akka.stm.local.*;
import se.scalablesolutions.akka.stm.local.Atomic;
public class TransactionalTypedActorImpl extends TypedTransactor implements TransactionalTypedActor {
private TransactionalMap<String, String> mapState;
@ -10,12 +12,16 @@ public class TransactionalTypedActorImpl extends TypedTransactor implements Tran
private boolean isInitialized = false;
@Override
public void initTransactionalState() {
public void preStart() {
if (!isInitialized) {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
isInitialized = new Atomic<Boolean>() {
public Boolean atomically() {
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
return true;
}
}.execute();
}
}

View file

@ -95,7 +95,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
/*
it("should shutdown non-supervised, annotated typed actor on TypedActor.stop") {
it("should postStop non-supervised, annotated typed actor on TypedActor.stop") {
val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
assert(AspectInitRegistry.initFor(obj) ne null)
assert("hello akka" === obj.greet("akka"))
@ -112,7 +112,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
}
it("should shutdown non-supervised, annotated typed actor on ActorRegistry.shutdownAll") {
it("should postStop non-supervised, annotated typed actor on ActorRegistry.shutdownAll") {
val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
assert(AspectInitRegistry.initFor(obj) ne null)
assert("hello akka" === obj.greet("akka"))
@ -147,7 +147,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
}
it("should shutdown supervised, annotated typed actor on failure") {
it("should postStop supervised, annotated typed actor on failure") {
val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
val cdl = obj.newCountdownLatch(1)
assert(AspectInitRegistry.initFor(obj) ne null)

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redis</groupId>
<artifactId>redisclient</artifactId>
<version>2.8.0-2.0</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.scala-tools</groupId>
<artifactId>time</artifactId>
<version>2.8.0-0.2-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View file

@ -49,9 +49,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
lazy val CasbahRepoSnapshots = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/snapshots/")
lazy val CasbahRepoReleases = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/releases/")
lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/")
lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases")
}
// -------------------------------------------------------------------------------------------------------------------
@ -78,7 +77,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
lazy val casbahSnapshot = ModuleConfiguration("com.novus",CasbahRepoSnapshots)
lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
@ -494,7 +492,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val commons_codec = Dependencies.commons_codec
val redis = Dependencies.redis
// override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------
@ -505,7 +503,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val mongo = Dependencies.mongo
val casbah = Dependencies.casbah
// override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------