fixed bug with init of tx datastructs + changed actor id management
This commit is contained in:
parent
924661368b
commit
b4a46016eb
11 changed files with 33 additions and 20 deletions
|
|
@ -203,8 +203,6 @@ trait Actor extends TransactionManagement {
|
|||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||
private[akka] var _uuid = UUID.newUuid.toString
|
||||
|
||||
def uuid = _uuid
|
||||
|
||||
// ====================================
|
||||
// private fields
|
||||
// ====================================
|
||||
|
|
@ -259,7 +257,7 @@ trait Actor extends TransactionManagement {
|
|||
* use a custom name to be able to retrieve the "correct" persisted state
|
||||
* upon restart, remote restart etc.
|
||||
*/
|
||||
protected[akka] var id: String = this.getClass.getName
|
||||
protected var id: String = this.getClass.getName
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -269,8 +267,6 @@ trait Actor extends TransactionManagement {
|
|||
*/
|
||||
@volatile var timeout: Long = Actor.TIMEOUT
|
||||
|
||||
ActorRegistry.register(this)
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
|
|
@ -418,6 +414,7 @@ trait Actor extends TransactionManagement {
|
|||
init
|
||||
}
|
||||
Actor.log.debug("[%s] has started", toString)
|
||||
ActorRegistry.register(this)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
@ -760,6 +757,16 @@ trait Actor extends TransactionManagement {
|
|||
actor
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the id for the actor.
|
||||
*/
|
||||
def getId = id
|
||||
|
||||
/**
|
||||
* Returns the uuid for the actor.
|
||||
*/
|
||||
def uuid = _uuid
|
||||
|
||||
// =========================================
|
||||
// ==== INTERNAL IMPLEMENTATION DETAILS ====
|
||||
// =========================================
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ object ActorRegistry extends Logging {
|
|||
case Some(instances) => actorsByClassName + (className -> (actor :: instances))
|
||||
case None => actorsByClassName + (className -> (actor :: Nil))
|
||||
}
|
||||
val id = actor.id
|
||||
val id = actor.getId
|
||||
if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
|
||||
actorsById.get(id) match {
|
||||
case Some(instances) => actorsById + (id -> (actor :: instances))
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
|
|||
actors.put(actor.getClass.getName, actor)
|
||||
actor.lifeCycle = Some(lifeCycle)
|
||||
startLink(actor)
|
||||
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.id, actor))
|
||||
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.getId, actor))
|
||||
|
||||
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
|
||||
val supervisor = factory.newInstanceFor(supervisorConfig).start
|
||||
|
|
|
|||
|
|
@ -196,8 +196,8 @@ class RemoteServer extends Logging {
|
|||
* Register Remote Actor by the Actor's 'id' field.
|
||||
*/
|
||||
def register(actor: Actor) = if (isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import org.junit.Assert._
|
|||
import org.apache.cassandra.service.CassandraDaemon
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Before
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
case class GetMapState(key: String)
|
||||
case object GetVectorState
|
||||
|
|
@ -73,7 +74,7 @@ class CassandraPersistentActor extends Actor {
|
|||
}
|
||||
}
|
||||
|
||||
class CassandraPersistentActorTest {
|
||||
class CassandraPersistentActorSpec extends JUnitSuite {
|
||||
|
||||
@Before
|
||||
def startCassandra = EmbeddedCassandraService.start
|
||||
|
|
|
|||
|
|
@ -10,9 +10,8 @@ import se.scalablesolutions.akka.remote.RemoteServer
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.state.RedisStorage
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
import se.scalablesolutions.akka.state.{PersistentVector, RedisStorage}
|
||||
|
||||
/******************************************************************************
|
||||
To run the sample:
|
||||
|
|
@ -77,7 +76,11 @@ trait ChatStorage extends Actor
|
|||
class RedisChatStorage extends ChatStorage {
|
||||
lifeCycle = Some(LifeCycle(Permanent))
|
||||
|
||||
private var chatLog = RedisStorage.getVector("akka.chat.log")
|
||||
private var chatLog: PersistentVector[Array[Byte]] = _
|
||||
|
||||
override def initTransactionalState = chatLog = RedisStorage.getVector("akka.chat.log")
|
||||
|
||||
chatLog = RedisStorage.getVector("akka.chat.log")
|
||||
|
||||
log.info("Redis-based chat storage is starting up...")
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class SimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
@ -52,7 +52,7 @@ class PersistentSimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = CassandraStorage.newMap
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
|
|||
|
|
@ -29,11 +29,12 @@ public class PersistentSimpleService {
|
|||
private String KEY = "COUNTER";
|
||||
|
||||
private boolean hasStartedTicking = false;
|
||||
private PersistentMap<byte[], byte[]> storage = CassandraStorage.newMap();
|
||||
private PersistentMap<byte[], byte[]> storage;
|
||||
|
||||
@GET
|
||||
@Produces({"application/html"})
|
||||
public String count() {
|
||||
if (storage == null) storage = CassandraStorage.newMap();
|
||||
if (!hasStartedTicking) {
|
||||
storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array());
|
||||
hasStartedTicking = true;
|
||||
|
|
|
|||
|
|
@ -27,11 +27,12 @@ public class SimpleService {
|
|||
private String KEY = "COUNTER";
|
||||
|
||||
private boolean hasStartedTicking = false;
|
||||
private TransactionalMap storage = TransactionalState.newMap();
|
||||
private TransactionalMap<String, Integer> storage;
|
||||
|
||||
@GET
|
||||
@Produces({"application/json"})
|
||||
public String count() {
|
||||
if (storage == null) storage = TransactionalState.newMap();
|
||||
if (!hasStartedTicking) {
|
||||
storage.put(KEY, 0);
|
||||
hasStartedTicking = true;
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ class SimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
@ -105,7 +105,7 @@ class PersistentSimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = CassandraStorage.newMap
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ class SecureTickActor extends Actor with Logging {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
/**
|
||||
* allow access for any user to "/secureticker/public"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue