- Made ClusterActorRef not extends RemoteActorRef anymore

- Refactored and cleaned up Transaction Log initialization

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-06-07 20:10:08 -07:00
parent 5a24ba5568
commit 85e742373a
11 changed files with 143 additions and 64 deletions

View file

@ -411,7 +411,7 @@ object Actor extends ListenerManagement {
.getOrElse(throw new ConfigurationException(
"Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
} else {
// FIXME later managed different 'storage' as well
// FIXME later manage different 'storage' (data grid) as well
storeActorAndGetClusterRef(strategy, serializer)
}
}

View file

@ -553,14 +553,14 @@ class LocalActorRef private[akka] (
case _ true
}
// FIXME how to get the matching serializerClassName? Now default is used
// FIXME how to get the matching serializerClassName? Now default is used. Needed for transaction log snapshot
private val serializer = Actor.serializerFor(address, Format.defaultSerializerName)
private lazy val txLog: TransactionLog = {
val log = replicationStrategy match {
case Transient throw new IllegalStateException("Can not replicate 'transient' actor [" + toString + "]")
case WriteThrough transactionLog.newLogFor(_uuid.toString, false)
case WriteBehind transactionLog.newLogFor(_uuid.toString, true)
case WriteThrough transactionLog.newLogFor(_uuid.toString, false, replicationStrategy, serializer)
case WriteBehind transactionLog.newLogFor(_uuid.toString, true, replicationStrategy, serializer)
}
EventHandler.debug(this,
"Creating a transaction log for Actor [%s] with replication strategy [%s]"
@ -773,7 +773,7 @@ class LocalActorRef private[akka] (
} finally {
guard.lock.unlock()
if (isReplicated) {
txLog.recordEntry(messageHandle, this, serializer)
txLog.recordEntry(messageHandle, this)
}
}
}
@ -1038,8 +1038,6 @@ private[akka] case class RemoteActorRef private[akka] (
timeout = _timeout
// FIXME BAD, we should not have different ActorRefs
start()
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
@ -1073,8 +1071,6 @@ private[akka] case class RemoteActorRef private[akka] (
}
// ==== NOT SUPPORTED ====
@deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher) {
unsupported
}

View file

@ -116,7 +116,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
}
/**
* View over the local actor registry.
* Projection over the local actor registry.
*/
class LocalActorRegistry(
private val actorsByAddress: ConcurrentHashMap[String, ActorRef],

View file

@ -143,7 +143,7 @@ object DeploymentConfig {
def isHomeNode(home: Home): Boolean = home match {
case Host(hostname) hostname == Config.hostname
case IP(address) address == "0.0.0.0" // FIXME checking if IP address is on home node is missing
case IP(address) address == "0.0.0.0" || address == "127.0.0.1" // FIXME look up IP address from the system
case Node(nodename) nodename == Config.nodename
}

View file

@ -154,7 +154,7 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val DEFAULT_NAME = "MonitorableThread".intern
// FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
val created = new AtomicInteger

View file

@ -8,7 +8,7 @@ import akka.dispatch.{ Future, Promise, MessageInvocation }
import akka.config.{ Config, ModuleNotAvailableException }
import akka.remoteinterface.RemoteSupport
import akka.actor._
import DeploymentConfig.Deploy
import DeploymentConfig.{ Deploy, ReplicationStrategy }
import akka.event.EventHandler
import akka.serialization.Format
import akka.cluster.ClusterNode
@ -108,13 +108,23 @@ object ReflectiveAccess {
}
type TransactionLogObject = {
def newLogFor(id: String, isAsync: Boolean): TransactionLog
def logFor(id: String, isAsync: Boolean): TransactionLog
def newLogFor(
id: String,
isAsync: Boolean,
replicationStrategy: ReplicationStrategy,
format: Serializer): TransactionLog
def logFor(
id: String,
isAsync: Boolean,
replicationStrategy: ReplicationStrategy,
format: Serializer): TransactionLog
def shutdown()
}
type TransactionLog = {
def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef, serializer: Serializer)
def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef)
def recordEntry(entry: Array[Byte])
def recordSnapshot(snapshot: Array[Byte])
def entries: Vector[Array[Byte]]

View file

@ -618,7 +618,7 @@ class DefaultClusterNode private[akka] (
val actorRegistryPath = actorRegistryPathFor(uuid)
// create UUID -> Array[Byte] for actor registry
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store in Data Grid not ZooKeeper
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper
else {
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
def call: Either[String, Exception] = {
@ -789,14 +789,15 @@ class DefaultClusterNode private[akka] (
isConnected ifOn {
EventHandler.debug(this,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
membershipNodes foreach { node
replicaConnections.get(node) foreach {
case (_, connection)
connection ! command
case (_, connection) connection ! command
}
}
}
@ -1304,7 +1305,7 @@ class DefaultClusterNode private[akka] (
homeAddress.setAccessible(true)
homeAddress.set(actor, Some(remoteServerAddress))
remoteService.register(uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
remoteService.register(actorAddress, actor)
}
}
@ -1547,8 +1548,6 @@ object RemoteClusterDaemon {
val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
}
// FIXME supervise RemoteClusterDaemon
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -7,11 +7,15 @@ import Cluster._
import akka.actor._
import Actor._
import akka.dispatch._
import akka.util._
import ReflectiveAccess._
import ClusterModule._
import akka.event.EventHandler
import akka.dispatch.Promise
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.{ Map JMap }
import com.eaio.uuid.UUID
@ -20,16 +24,20 @@ import com.eaio.uuid.UUID
*/
class ClusterActorRef private[akka] (
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
actorAddress: String,
timeout: Long)
extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef
this: ClusterActorRef with Router.Router
val address: String,
_timeout: Long)
extends ActorRef with ScalaActorRef { this: Router.Router
timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
(Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) {
case (map, (uuid, inetSocketAddress)) map + (inetSocketAddress -> createRemoteActorRef(actorAddress, inetSocketAddress))
case (map, (uuid, inetSocketAddress)) map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress))
})
ClusterModule.ensureEnabled()
start()
def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
@ -55,4 +63,53 @@ class ClusterActorRef private[akka] (
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
}
def start(): ActorRef = synchronized {
_status = ActorRefInternals.RUNNING
this
}
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
}
}
// ==== NOT SUPPORTED ====
// FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}

View file

@ -177,11 +177,11 @@ object ClusterDeployer {
case e: NullPointerException
handleError(new DeploymentException(
"Could not store deployment data [" + deployment +
"] in ZooKeeper since client session is closed"))
"] in ZooKeeper since client session is closed"))
case e: Exception
handleError(new DeploymentException(
"Could not store deployment data [" +
deployment + "] in ZooKeeper due to: " + e))
deployment + "] in ZooKeeper due to: " + e))
}
}
}

View file

@ -51,7 +51,9 @@ class ReplicationException(message: String) extends AkkaException(message)
class TransactionLog private (
ledger: LedgerHandle,
val id: String,
val isAsync: Boolean) {
val isAsync: Boolean,
replicationStrategy: ReplicationStrategy,
format: Serializer) {
import TransactionLog._
@ -65,12 +67,12 @@ class TransactionLog private (
/**
* TODO document method
*/
def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef, format: Serializer) {
def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) {
if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) {
val snapshot =
// FIXME ReplicationStrategy Transient is always used
if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, Transient)(format))
else toBinary(actorRef, false, Transient)(format)
if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationStrategy)(format))
else toBinary(actorRef, false, replicationStrategy)(format)
recordSnapshot(snapshot)
}
recordEntry(MessageSerializer.serialize(messageHandle.message).toByteArray)
@ -365,8 +367,13 @@ object TransactionLog {
(bk, zk)
}
private[akka] def apply(ledger: LedgerHandle, id: String, isAsync: Boolean = false) =
new TransactionLog(ledger, id, isAsync)
private[akka] def apply(
ledger: LedgerHandle,
id: String,
isAsync: Boolean,
replicationStrategy: ReplicationStrategy,
format: Serializer) =
new TransactionLog(ledger, id, isAsync, replicationStrategy, format)
/**
* Shuts down the transaction log.
@ -387,7 +394,12 @@ object TransactionLog {
/**
* TODO document method
*/
def newLogFor(id: String, isAsync: Boolean = false): TransactionLog = {
def newLogFor(
id: String,
isAsync: Boolean,
replicationStrategy: ReplicationStrategy,
format: Serializer): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id
val ledger = try {
@ -431,13 +443,18 @@ object TransactionLog {
}
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
TransactionLog(ledger, id, isAsync)
TransactionLog(ledger, id, isAsync, replicationStrategy, format)
}
/**
* TODO document method
*/
def logFor(id: String, isAsync: Boolean = false): TransactionLog = {
def logFor(
id: String,
isAsync: Boolean,
replicationStrategy: ReplicationStrategy,
format: Serializer): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id
val logId = try {
@ -476,7 +493,7 @@ object TransactionLog {
case e handleError(e)
}
TransactionLog(ledger, id, isAsync)
TransactionLog(ledger, id, isAsync, replicationStrategy, format)
}
private[akka] def await[T](future: Promise[T]): T = {

View file

@ -32,31 +32,31 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"A Transaction Log" should {
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid)
val txlog = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid)
val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid))
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, Format.Default))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid)
val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid)
val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val entries = txlog2.entriesInRange(0, 1).map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
@ -66,7 +66,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid)
val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
@ -74,7 +74,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid)
val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val entries = txlog2.entries.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
@ -86,7 +86,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid)
val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
@ -94,7 +94,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid)
val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@ -105,7 +105,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid)
val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
@ -120,7 +120,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid)
val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@ -134,7 +134,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid)
val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
@ -149,7 +149,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"A Transaction Log" should {
"be able to record entries - asynchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, true)
val txlog = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
Thread.sleep(100)
@ -158,24 +158,24 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record and delete entries - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true)
val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
Thread.sleep(100)
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true))
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, Format.Default))
}
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true)
val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
Thread.sleep(100)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, true)
val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val entries = txlog2.entriesInRange(0, 1).map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
@ -186,7 +186,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries and read entries with 'entries' - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true)
val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
@ -195,7 +195,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
Thread.sleep(100)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, true)
val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val entries = txlog2.entries.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
@ -208,7 +208,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record a snapshot - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true)
val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
Thread.sleep(100)
@ -217,7 +217,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record and read a snapshot and following entries - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true)
val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@ -229,7 +229,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
Thread.sleep(100)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, true)
val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
@ -245,7 +245,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true)
val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@ -258,7 +258,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
Thread.sleep(100)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, true)
val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))