Added durable mailboxes: File-based, Redis-based, Zookeeper-based and Beanstalk-based

This commit is contained in:
Jonas Bonér 2011-05-17 14:48:06 +02:00
parent 5d88ffe9d4
commit 160ff867cb
27 changed files with 3185 additions and 70 deletions

View file

@ -333,7 +333,7 @@ object LocalDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy]
private[akka] def init(deployments: List[Deploy]) {
EventHandler.info(this, "Initializing local deployer\nDeploying actors locally [\n%s\n]" format deployments.mkString("\n\t"))
EventHandler.info(this, "Deploying actors locally [\n\t%s\n]" format deployments.mkString("\n\t"))
deployments foreach (deploy(_)) // deploy
}

View file

@ -0,0 +1,107 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.mailbox
import MailboxProtocol._
import akka.actor.ActorRef
import akka.dispatch._
import akka.config.Config._
import akka.util.Duration
import akka.AkkaException
import akka.event.EventHandler
import com.surftools.BeanstalkClient._
import com.surftools.BeanstalkClientImpl._
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BeanstalkBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) {
val hostname = config.getString("akka.actor.mailbox.beanstalk.hostname", "0.0.0.0")
val port = config.getInt("akka.enterprise.beanstalk.port", 11300)
val reconnectWindow = Duration(config.getInt("akka.actor.mailbox.beanstalk.reconnect-window", 5), TIME_UNIT).toSeconds.toInt
val messageSubmitDelay = Duration(config.getInt("akka.actor.mailbox.beanstalk.message-submit-delay", 0), TIME_UNIT).toSeconds.toInt
val messageSubmitTimeout = Duration(config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout", 5), TIME_UNIT).toSeconds.toInt
val messageTimeToLive = Duration(config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live", 120), TIME_UNIT).toSeconds.toInt
private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) }
// ===== For MessageQueue =====
def enqueue(durableMessage: MessageInvocation) = {
Some(queue.get.put(65536, messageSubmitDelay, messageTimeToLive, serialize(durableMessage)).toInt)
EventHandler.debug(this, "\nENQUEUING message in beanstalk-based mailbox [%s]".format(durableMessage))
}
def dequeue: MessageInvocation = try {
val job = queue.get.reserve(null)
if (job eq null) null: MessageInvocation
else {
val bytes = job.getData
if (bytes ne null) {
queue.get.delete(job.getJobId)
val messageInvocation = deserialize(bytes)
EventHandler.debug(this, "\nDEQUEUING message in beanstalk-based mailbox [%s]".format(messageInvocation))
messageInvocation
} else null: MessageInvocation
}
} catch {
case e: Exception =>
EventHandler.error(e, this, "Beanstalk connection error")
reconnect(name)
null: MessageInvocation
}
/**
* Completely delete the queue.
*/
def remove: Boolean = {
try {
queue.get.kick(100000)
true
} catch {
case e: Exception => false
}
}
def size: Int = {
val item = queue.get.reserve(0)
if (item eq null) 0 else 1
}
def isEmpty = size == 0
private def connect(name: String): Client = {
@volatile var connected = false
var attempts = 0
var client: Client = null
while (!connected) {
attempts += 1
try {
client = new ClientImpl(hostname, port)
client.useTube(name)
client.watch(name)
connected = true
} catch {
case e: Exception =>
EventHandler.error(e, this, "Unable to connect to Beanstalk. Retrying in [%s] seconds: %s".format(reconnectWindow, e))
try {
Thread.sleep(1000 * reconnectWindow)
} catch {
case e: InterruptedException => {}
}
}
}
EventHandler.info(this, "Beanstalk-based mailbox connected to Beanstalkd server")
client
}
private def reconnect(name: String): ThreadLocal[Client] = {
new ThreadLocal[Client] { override def initialValue: Client = connect(name) }
}
}

View file

@ -0,0 +1,3 @@
package akka.actor.mailbox
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxStorage)

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.mailbox
import akka.actor.ActorRef
import akka.dispatch._
import akka.config.Config._
import akka.event.EventHandler
import org.apache.commons.io.FileUtils
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] object FileBasedMailboxUtil {
val queuePath = config.getString("akka.actor.mailbox.file-based.directory-path", "./_mb") // /var/spool/akka
}
class FileBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) {
import FileBasedMailboxUtil._
private val queue = try {
try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case e => {} }
val queue = new filequeue.PersistentQueue(queuePath, name, config)
queue.setup // replays journal
queue.discardExpired
queue
} catch {
case e: Exception =>
EventHandler.error(e, this, "Could not create a file-based mailbox")
throw e
}
def enqueue(message: MessageInvocation) = {
EventHandler.debug(this, "\nENQUEUING message in file-based mailbox [%s]".format( message))
queue.add(serialize(message))
}
def dequeue: MessageInvocation = try {
val item = queue.remove
if (item.isDefined) {
queue.confirmRemove(item.get.xid)
val messageInvocation = deserialize(item.get.data)
EventHandler.debug(this, "\nDEQUEUING message in file-based mailbox [%s]".format(messageInvocation))
messageInvocation
} else null
} catch {
case e: java.util.NoSuchElementException => null
case e: Exception =>
EventHandler.error(e, this, "Couldn't dequeue from file-based mailbox")
throw e
}
/**
* Completely delete the queue.
*/
def remove: Boolean = try {
queue.remove
true
} catch {
case e => false //review why catch Throwable? And swallow potential Errors?
}
def size: Int = queue.length.toInt
def isEmpty: Boolean = size == 0
}

View file

@ -0,0 +1,22 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue
import java.io.IOException
case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)

View file

@ -0,0 +1,32 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue
import java.util.concurrent.atomic.AtomicLong
class Counter {
private val value = new AtomicLong(0)
def apply() = value.get
def set(n: Long) = value.set(n)
def incr() = value.addAndGet(1)
def incr(n: Long) = value.addAndGet(n)
def decr() = value.addAndGet(-1)
def decr(n: Long) = value.addAndGet(-n)
override def toString = value.get.toString
}

View file

@ -0,0 +1,346 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue
import java.io._
import java.nio.{ByteBuffer, ByteOrder}
import java.nio.channels.FileChannel
import akka.event.EventHandler
// returned from journal replay
sealed trait JournalItem
object JournalItem {
case class Add(item: QItem) extends JournalItem
case object Remove extends JournalItem
case object RemoveTentative extends JournalItem
case class SavedXid(xid: Int) extends JournalItem
case class Unremove(xid: Int) extends JournalItem
case class ConfirmRemove(xid: Int) extends JournalItem
case object EndOfFile extends JournalItem
}
/**
* Codes for working with the journal file for a PersistentQueue.
*/
class Journal(queuePath: String, syncJournal: => Boolean) {
private val queueFile = new File(queuePath)
private var writer: FileChannel = null
private var reader: Option[FileChannel] = None
private var replayer: Option[FileChannel] = None
var size: Long = 0
// small temporary buffer for formatting operations into the journal:
private val buffer = new Array[Byte](16)
private val byteBuffer = ByteBuffer.wrap(buffer)
byteBuffer.order(ByteOrder.LITTLE_ENDIAN)
private val CMD_ADD = 0
private val CMD_REMOVE = 1
private val CMD_ADDX = 2
private val CMD_REMOVE_TENTATIVE = 3
private val CMD_SAVE_XID = 4
private val CMD_UNREMOVE = 5
private val CMD_CONFIRM_REMOVE = 6
private val CMD_ADD_XID = 7
private def open(file: File): Unit = {
writer = new FileOutputStream(file, true).getChannel
}
def open(): Unit = {
open(queueFile)
}
def roll(xid: Int, openItems: List[QItem], queue: Iterable[QItem]): Unit = {
writer.close
val tmpFile = new File(queuePath + "~~" + System.currentTimeMillis)
open(tmpFile)
size = 0
for (item <- openItems) {
addWithXid(item)
removeTentative(false)
}
saveXid(xid)
for (item <- queue) {
add(false, item)
}
if (syncJournal) writer.force(false)
writer.close
tmpFile.renameTo(queueFile)
open
}
def close(): Unit = {
writer.close
for (r <- reader) r.close
reader = None
}
def erase(): Unit = {
try {
close()
queueFile.delete
} catch {
case _ =>
}
}
def inReadBehind(): Boolean = reader.isDefined
def isReplaying(): Boolean = replayer.isDefined
private def add(allowSync: Boolean, item: QItem): Unit = {
val blob = ByteBuffer.wrap(item.pack())
size += write(false, CMD_ADDX.toByte, blob.limit)
do {
writer.write(blob)
} while (blob.position < blob.limit)
if (allowSync && syncJournal) writer.force(false)
size += blob.limit
}
def add(item: QItem): Unit = add(true, item)
// used only to list pending transactions when recreating the journal.
private def addWithXid(item: QItem) = {
val blob = ByteBuffer.wrap(item.pack())
// only called from roll(), so the journal does not need to be synced after a write.
size += write(false, CMD_ADD_XID.toByte, item.xid, blob.limit)
do {
writer.write(blob)
} while (blob.position < blob.limit)
size += blob.limit
}
def remove() = {
size += write(true, CMD_REMOVE.toByte)
}
private def removeTentative(allowSync: Boolean): Unit = {
size += write(allowSync, CMD_REMOVE_TENTATIVE.toByte)
}
def removeTentative(): Unit = removeTentative(true)
private def saveXid(xid: Int) = {
// only called from roll(), so the journal does not need to be synced after a write.
size += write(false, CMD_SAVE_XID.toByte, xid)
}
def unremove(xid: Int) = {
size += write(true, CMD_UNREMOVE.toByte, xid)
}
def confirmRemove(xid: Int) = {
size += write(true, CMD_CONFIRM_REMOVE.toByte, xid)
}
def startReadBehind(): Unit = {
val pos = if (replayer.isDefined) replayer.get.position else writer.position
val rj = new FileInputStream(queueFile).getChannel
rj.position(pos)
reader = Some(rj)
}
def fillReadBehind(f: QItem => Unit): Unit = {
val pos = if (replayer.isDefined) replayer.get.position else writer.position
for (rj <- reader) {
if (rj.position == pos) {
// we've caught up.
rj.close
reader = None
} else {
readJournalEntry(rj) match {
case (JournalItem.Add(item), _) => f(item)
case (_, _) =>
}
}
}
}
def replay(name: String)(f: JournalItem => Unit): Unit = {
size = 0
var lastUpdate = 0L
val TEN_MB = 10L * 1024 * 1024
try {
val in = new FileInputStream(queueFile).getChannel
try {
replayer = Some(in)
var done = false
do {
readJournalEntry(in) match {
case (JournalItem.EndOfFile, _) => done = true
case (x, itemsize) =>
size += itemsize
f(x)
if (size / TEN_MB > lastUpdate) {
lastUpdate = size / TEN_MB
EventHandler.info(this,
"Continuing to read '%s' journal; %s MB so far...".format(name, lastUpdate * 10))
}
}
} while (!done)
} catch {
case e: BrokenItemException =>
EventHandler.error(e, this, "Exception replaying journal for '%s'".format(name))
truncateJournal(e.lastValidPosition)
}
} catch {
case e: FileNotFoundException =>
EventHandler.info(this, "No transaction journal for '%s'; starting with empty queue.".format(name))
case e: IOException =>
EventHandler.error(e, this, "Exception replaying journal for '%s'".format(name))
// this can happen if the server hardware died abruptly in the middle
// of writing a journal. not awesome but we should recover.
}
replayer = None
}
private def truncateJournal(position: Long) {
val trancateWriter = new FileOutputStream(queueFile, true).getChannel
try {
trancateWriter.truncate(position)
} finally {
trancateWriter.close()
}
}
def readJournalEntry(in: FileChannel): (JournalItem, Int) = {
byteBuffer.rewind
byteBuffer.limit(1)
val lastPosition = in.position
var x: Int = 0
do {
x = in.read(byteBuffer)
} while (byteBuffer.position < byteBuffer.limit && x >= 0)
if (x < 0) {
(JournalItem.EndOfFile, 0)
} else {
try {
buffer(0) match {
case CMD_ADD =>
val data = readBlock(in)
(JournalItem.Add(QItem.unpackOldAdd(data)), 5 + data.length)
case CMD_REMOVE =>
(JournalItem.Remove, 1)
case CMD_ADDX =>
val data = readBlock(in)
(JournalItem.Add(QItem.unpack(data)), 5 + data.length)
case CMD_REMOVE_TENTATIVE =>
(JournalItem.RemoveTentative, 1)
case CMD_SAVE_XID =>
val xid = readInt(in)
(JournalItem.SavedXid(xid), 5)
case CMD_UNREMOVE =>
val xid = readInt(in)
(JournalItem.Unremove(xid), 5)
case CMD_CONFIRM_REMOVE =>
val xid = readInt(in)
(JournalItem.ConfirmRemove(xid), 5)
case CMD_ADD_XID =>
val xid = readInt(in)
val data = readBlock(in)
val item = QItem.unpack(data)
item.xid = xid
(JournalItem.Add(item), 9 + data.length)
case n =>
throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + in.position))
}
} catch {
case ex: IOException =>
throw new BrokenItemException(lastPosition, ex)
}
}
}
def walk() = new Iterator[(JournalItem, Int)] {
val in = new FileInputStream(queuePath).getChannel
var done = false
var nextItem: Option[(JournalItem, Int)] = None
def hasNext = {
if (done) {
false
} else {
nextItem = readJournalEntry(in) match {
case (JournalItem.EndOfFile, _) =>
done = true
in.close()
None
case x =>
Some(x)
}
nextItem.isDefined
}
}
def next() = nextItem.get
}
private def readBlock(in: FileChannel): Array[Byte] = {
val size = readInt(in)
val data = new Array[Byte](size)
val dataBuffer = ByteBuffer.wrap(data)
var x: Int = 0
do {
x = in.read(dataBuffer)
} while (dataBuffer.position < dataBuffer.limit && x >= 0)
if (x < 0) {
// we never expect EOF when reading a block.
throw new IOException("Unexpected EOF")
}
data
}
private def readInt(in: FileChannel): Int = {
byteBuffer.rewind
byteBuffer.limit(4)
var x: Int = 0
do {
x = in.read(byteBuffer)
} while (byteBuffer.position < byteBuffer.limit && x >= 0)
if (x < 0) {
// we never expect EOF when reading an int.
throw new IOException("Unexpected EOF")
}
byteBuffer.rewind
byteBuffer.getInt()
}
private def write(allowSync: Boolean, items: Any*): Int = {
byteBuffer.clear
for (item <- items) item match {
case b: Byte => byteBuffer.put(b)
case i: Int => byteBuffer.putInt(i)
}
byteBuffer.flip
while (byteBuffer.position < byteBuffer.limit) {
writer.write(byteBuffer)
}
if (allowSync && syncJournal) writer.force(false)
byteBuffer.limit
}
}

View file

@ -0,0 +1,468 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue
import java.io._
import scala.collection.mutable
import akka.event.EventHandler
import akka.config.Configuration
// a config value that's backed by a global setting but may be locally overridden
class OverlaySetting[T](base: => T) {
@volatile private var local: Option[T] = None
def set(value: Option[T]) = local = value
def apply() = local.getOrElse(base)
}
class PersistentQueue(persistencePath: String, val name: String, val config: Configuration) {
private case object ItemArrived
// current size of all data in the queue:
private var queueSize: Long = 0
// # of items EVER added to the queue:
private var _totalItems: Long = 0
// # of items that were expired by the time they were read:
private var _totalExpired: Long = 0
// age (in milliseconds) of the last item read from the queue:
private var _currentAge: Long = 0
// # of items thot were discarded because the queue was full:
private var _totalDiscarded: Long = 0
// # of items in the queue (including those not in memory)
private var queueLength: Long = 0
private var queue = new mutable.Queue[QItem] {
// scala's Queue doesn't (yet?) have a way to put back.
def unget(item: QItem) = prependElem(item)
}
private var _memoryBytes: Long = 0
private var closed = false
private var paused = false
def overlay[T](base: => T) = new OverlaySetting(base)
// attempting to add an item after the queue reaches this size (in items) will fail.
val maxItems = overlay(PersistentQueue.maxItems)
// attempting to add an item after the queue reaches this size (in bytes) will fail.
val maxSize = overlay(PersistentQueue.maxSize)
// attempting to add an item larger than this size (in bytes) will fail.
val maxItemSize = overlay(PersistentQueue.maxItemSize)
// maximum expiration time for this queue (seconds).
val maxAge = overlay(PersistentQueue.maxAge)
// maximum journal size before the journal should be rotated.
val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
// maximum size of a queue before it drops into read-behind mode.
val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
// maximum overflow (multiplier) of a journal file before we re-create it.
val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
// absolute maximum size of a journal file until we rebuild it, no matter what.
val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
// whether to drop older items (instead of newer) when the queue is full
val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
// whether to keep a journal file at all
val keepJournal = overlay(PersistentQueue.keepJournal)
// whether to sync the journal after each transaction
val syncJournal = overlay(PersistentQueue.syncJournal)
// (optional) move expired items over to this queue
val expiredQueue = overlay(PersistentQueue.expiredQueue)
private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal())
// track tentative removals
private var xidCounter: Int = 0
private val openTransactions = new mutable.HashMap[Int, QItem]
def openTransactionCount = openTransactions.size
def openTransactionIds = openTransactions.keys.toList.sortWith(_ - _ > 0)
def length: Long = synchronized { queueLength }
def totalItems: Long = synchronized { _totalItems }
def bytes: Long = synchronized { queueSize }
def journalSize: Long = synchronized { journal.size }
def totalExpired: Long = synchronized { _totalExpired }
def currentAge: Long = synchronized { if (queueSize == 0) 0 else _currentAge }
def totalDiscarded: Long = synchronized { _totalDiscarded }
def isClosed: Boolean = synchronized { closed || paused }
// mostly for unit tests.
def memoryLength: Long = synchronized { queue.size }
def memoryBytes: Long = synchronized { _memoryBytes }
def inReadBehind = synchronized { journal.inReadBehind }
//FIXME, segment commented out, might have damaged semantics, investigate.
//config.subscribe { c => configure(c.getOrElse(new Config)) }
configure(config)
def configure(config: Configuration) = synchronized {
maxItems set config.getInt("akka.actor.mailbox.file-based.max-items")
maxSize set config.getLong("akka.actor.mailbox.file-based.max-size")
maxItemSize set config.getLong("akka.actor.mailbox.file-based.max-item-size")
maxAge set config.getInt("akka.actor.mailbox.file-based.max-age")
maxJournalSize set config.getLong("akka.actor.mailbox.file-based.max-journal-size")
maxMemorySize set config.getLong("akka.actor.mailbox.file-based.max-memory-size")
maxJournalOverflow set config.getInt("akka.actor.mailbox.file-based.max-journal-overflow")
maxJournalSizeAbsolute set config.getLong("akka.actor.mailbox.file-based.max-journal-size-absolute")
discardOldWhenFull set config.getBool("akka.actor.mailbox.file-based.discard-old-when-full")
keepJournal set config.getBool("akka.actor.mailbox.file-based.journal")
syncJournal set config.getBool("akka.actor.mailbox.file-based.sync-journal")
EventHandler.info(this,
"Configuring queue %s: journal=%s, max-items=%s, max-size=%s, max-age=%s, max-journal-size=%s, max-memory-size=%s, max-journal-overflow=%s, max-journal-size-absolute=%s, discard-old-when-full=%s, sync-journal=%s"
.format(
name, keepJournal(), maxItems(), maxSize(), maxAge(), maxJournalSize(), maxMemorySize(),
maxJournalOverflow(), maxJournalSizeAbsolute(), discardOldWhenFull(), syncJournal()))
if (!keepJournal()) journal.erase()
}
def dumpConfig(): Array[String] = synchronized {
Array(
"max-items=" + maxItems(),
"max-size=" + maxSize(),
"max-age=" + maxAge(),
"max-journal-size=" + maxJournalSize(),
"max-memory-size=" + maxMemorySize(),
"max-journal-overflow=" + maxJournalOverflow(),
"max-journal-size-absolute=" + maxJournalSizeAbsolute(),
"discard-old-when-full=" + discardOldWhenFull(),
"journal=" + keepJournal(),
"sync-journal=" + syncJournal(),
"move-expired-to" + expiredQueue().map { _.name }.getOrElse("(none)")
)
}
def dumpStats(): Array[(String, String)] = synchronized {
Array(
("items", length.toString),
("bytes", bytes.toString),
("total-items", totalItems.toString),
("logsize", journalSize.toString),
("expired-items", totalExpired.toString),
("mem-items", memoryLength.toString),
("mem-bytes", memoryBytes.toString),
("age", currentAge.toString),
("discarded", totalDiscarded.toString),
("open-transactions", openTransactionCount.toString)
)
}
private final def adjustExpiry(startingTime: Long, expiry: Long): Long = {
if (maxAge() > 0) {
val maxExpiry = startingTime + maxAge()
if (expiry > 0) (expiry min maxExpiry) else maxExpiry
} else {
expiry
}
}
/**
* Add a value to the end of the queue, transactionally.
*/
def add(value: Array[Byte], expiry: Long): Boolean = synchronized {
if (closed || value.size > maxItemSize()) return false
while (queueLength >= maxItems() || queueSize >= maxSize()) {
if (!discardOldWhenFull()) return false
_remove(false)
_totalDiscarded += 1
if (keepJournal()) journal.remove()
}
val now = System.currentTimeMillis
val item = QItem(now, adjustExpiry(now, expiry), value, 0)
if (keepJournal() && !journal.inReadBehind) {
if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) {
// force re-creation of the journal.
EventHandler.debug(this,
"Rolling journal file for '%s' (qsize=%s)".format(name, queueSize))
journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue)
}
if (queueSize >= maxMemorySize()) {
EventHandler.debug(this,
"Dropping to read-behind for queue '%s' (%s bytes)".format(name, queueSize))
journal.startReadBehind
}
}
_add(item)
if (keepJournal()) journal.add(item)
true
}
def add(value: Array[Byte]): Boolean = add(value, 0)
/**
* Peek at the head item in the queue, if there is one.
*/
def peek(): Option[QItem] = {
synchronized {
if (closed || paused || queueLength == 0) {
None
} else {
_peek()
}
}
}
/**
* Remove and return an item from the queue, if there is one.
*
* @param transaction true if this should be considered the first part
* of a transaction, to be committed or rolled back (put back at the
* head of the queue)
*/
def remove(transaction: Boolean): Option[QItem] = {
synchronized {
if (closed || paused || queueLength == 0) {
None
} else {
val item = _remove(transaction)
if (keepJournal()) {
if (transaction) journal.removeTentative() else journal.remove()
if ((queueLength == 0) && (journal.size >= maxJournalSize())) {
EventHandler.debug(this, "Rolling journal file for '%s'".format(name))
journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, Nil)
}
}
item
}
}
}
/**
* Remove and return an item from the queue, if there is one.
*/
def remove(): Option[QItem] = remove(false)
/**
* Return a transactionally-removed item to the queue. This is a rolled-
* back transaction.
*/
def unremove(xid: Int): Unit = {
synchronized {
if (!closed) {
if (keepJournal()) journal.unremove(xid)
_unremove(xid)
}
}
}
def confirmRemove(xid: Int): Unit = {
synchronized {
if (!closed) {
if (keepJournal()) journal.confirmRemove(xid)
openTransactions.remove(xid)
}
}
}
def flush(): Unit = {
while (remove(false).isDefined) { }
}
/**
* Close the queue's journal file. Not safe to call on an active queue.
*/
def close(): Unit = synchronized {
closed = true
if (keepJournal()) journal.close()
}
def pauseReads(): Unit = synchronized {
paused = true
}
def resumeReads(): Unit = synchronized {
paused = false
}
def setup(): Unit = synchronized {
queueSize = 0
replayJournal
}
def destroyJournal(): Unit = synchronized {
if (keepJournal()) journal.erase()
}
private final def nextXid(): Int = {
do {
xidCounter += 1
} while (openTransactions contains xidCounter)
xidCounter
}
private final def fillReadBehind(): Unit = {
// if we're in read-behind mode, scan forward in the journal to keep memory as full as
// possible. this amortizes the disk overhead across all reads.
while (keepJournal() && journal.inReadBehind && _memoryBytes < maxMemorySize()) {
journal.fillReadBehind { item =>
queue += item
_memoryBytes += item.data.length
}
if (!journal.inReadBehind) {
EventHandler.debug(this, "Coming out of read-behind for queue '%s'".format(name))
}
}
}
def replayJournal(): Unit = {
if (!keepJournal()) return
EventHandler.debug(this, "Replaying transaction journal for '%s'".format(name))
xidCounter = 0
journal.replay(name) {
case JournalItem.Add(item) =>
_add(item)
// when processing the journal, this has to happen after:
if (!journal.inReadBehind && queueSize >= maxMemorySize()) {
EventHandler.debug(this,
"Dropping to read-behind for queue '%s' (%s bytes)".format(name, queueSize))
journal.startReadBehind
}
case JournalItem.Remove => _remove(false)
case JournalItem.RemoveTentative => _remove(true)
case JournalItem.SavedXid(xid) => xidCounter = xid
case JournalItem.Unremove(xid) => _unremove(xid)
case JournalItem.ConfirmRemove(xid) => openTransactions.remove(xid)
case x => EventHandler.warning(this, "Unexpected item in journal: %s".format(x))
}
EventHandler.debug(this,
"Finished transaction journal for '%s' (%s items, %s bytes)"
.format(name, queueLength, journal.size))
journal.open
// now, any unfinished transactions must be backed out.
for (xid <- openTransactionIds) {
journal.unremove(xid)
_unremove(xid)
}
}
def toList(): List[QItem] = {
discardExpired
queue.toList
}
// ----- internal implementations
private def _add(item: QItem): Unit = {
discardExpired
if (!journal.inReadBehind) {
queue += item
_memoryBytes += item.data.length
}
_totalItems += 1
queueSize += item.data.length
queueLength += 1
}
private def _peek(): Option[QItem] = {
discardExpired
if (queue.isEmpty) None else Some(queue.front)
}
private def _remove(transaction: Boolean): Option[QItem] = {
discardExpired()
if (queue.isEmpty) return None
val now = System.currentTimeMillis
val item = queue.dequeue
val len = item.data.length
queueSize -= len
_memoryBytes -= len
queueLength -= 1
val xid = if (transaction) nextXid else 0
fillReadBehind
_currentAge = now - item.addTime
if (transaction) {
item.xid = xid
openTransactions(xid) = item
}
Some(item)
}
final def discardExpired(): Int = {
if (queue.isEmpty || journal.isReplaying) {
0
} else {
val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry)
if ((realExpiry != 0) && (realExpiry < System.currentTimeMillis)) {
_totalExpired += 1
val item = queue.dequeue
val len = item.data.length
queueSize -= len
_memoryBytes -= len
queueLength -= 1
fillReadBehind
if (keepJournal()) journal.remove()
expiredQueue().map { _.add(item.data, 0) }
1 + discardExpired()
} else {
0
}
}
}
private def _unremove(xid: Int) = {
openTransactions.remove(xid) map { item =>
queueLength += 1
queueSize += item.data.length
queue unget item
_memoryBytes += item.data.length
}
}
}
object PersistentQueue {
@volatile var maxItems: Int = Int.MaxValue
@volatile var maxSize: Long = Long.MaxValue
@volatile var maxItemSize: Long = Long.MaxValue
@volatile var maxAge: Int = 0
@volatile var maxJournalSize: Long = 16 * 1024 * 1024
@volatile var maxMemorySize: Long = 128 * 1024 * 1024
@volatile var maxJournalOverflow: Int = 10
@volatile var maxJournalSizeAbsolute: Long = Long.MaxValue
@volatile var discardOldWhenFull: Boolean = false
@volatile var keepJournal: Boolean = true
@volatile var syncJournal: Boolean = false
@volatile var expiredQueue: Option[PersistentQueue] = None
}

View file

@ -0,0 +1,53 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue
import java.nio.{ByteBuffer, ByteOrder}
case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) {
def pack(): Array[Byte] = {
val bytes = new Array[Byte](data.length + 16)
val buffer = ByteBuffer.wrap(bytes)
buffer.order(ByteOrder.LITTLE_ENDIAN)
buffer.putLong(addTime)
buffer.putLong(expiry)
buffer.put(data)
bytes
}
}
object QItem {
def unpack(data: Array[Byte]): QItem = {
val buffer = ByteBuffer.wrap(data)
val bytes = new Array[Byte](data.length - 16)
buffer.order(ByteOrder.LITTLE_ENDIAN)
val addTime = buffer.getLong
val expiry = buffer.getLong
buffer.get(bytes)
QItem(addTime, expiry, bytes, 0)
}
def unpackOldAdd(data: Array[Byte]): QItem = {
val buffer = ByteBuffer.wrap(data)
val bytes = new Array[Byte](data.length - 4)
buffer.order(ByteOrder.LITTLE_ENDIAN)
val expiry = buffer.getInt
buffer.get(bytes)
QItem(System.currentTimeMillis, if (expiry == 0) 0 else expiry * 1000, bytes, 0)
}
}

View file

@ -0,0 +1,237 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue
import java.io.File
import java.util.concurrent.CountDownLatch
import scala.collection.mutable
import akka.config.{Config, Configuration}
import akka.event.EventHandler
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
class QueueCollection(queueFolder: String, private var queueConfigs: Configuration) {
private val path = new File(queueFolder)
if (! path.isDirectory) {
path.mkdirs()
}
if (! path.isDirectory || ! path.canWrite) {
throw new InaccessibleQueuePath
}
private val queues = new mutable.HashMap[String, PersistentQueue]
private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]]
private var shuttingDown = false
// total items added since the server started up.
val totalAdded = new Counter()
// hits/misses on removing items from the queue
val queueHits = new Counter()
val queueMisses = new Counter()
/* FIXME, segment commented out, might have damaged semantics, investigate.
queueConfigs.subscribe { c =>
synchronized {
queueConfigs = c.getOrElse(new Config)
}
}*/
// preload any queues
def loadQueues() {
path.list() filter { name => !(name contains "~~") } map { queue(_) }
}
def queueNames: List[String] = synchronized {
queues.keys.toList
}
def currentItems = queues.values.foldLeft(0L) { _ + _.length }
def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
/**
* Get a named queue, creating it if necessary.
* Exposed only to unit tests.
*/
private[akka] def queue(name: String): Option[PersistentQueue] = synchronized {
if (shuttingDown) {
None
} else {
Some(queues.get(name) getOrElse {
// only happens when creating a queue for the first time.
val q = if (name contains '+') {
val master = name.split('+')(0)
fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
EventHandler.debug(this, "Fanout queue %s added to %s".format(name, master))
new PersistentQueue(path.getPath, name, queueConfigs)
} else {
new PersistentQueue(path.getPath, name, queueConfigs)
}
q.setup
queues(name) = q
q
})
}
}
/**
* Add an item to a named queue. Will not return until the item has been
* synchronously added and written to the queue journal file.
*
* @return true if the item was added; false if the server is shutting
* down
*/
def add(key: String, item: Array[Byte], expiry: Int): Boolean = {
for (fanouts <- fanout_queues.get(key); name <- fanouts) {
add(name, item, expiry)
}
queue(key) match {
case None => false
case Some(q) =>
val now = System.currentTimeMillis
val normalizedExpiry: Long = if (expiry == 0) {
0
} else if (expiry < 1000000) {
now + expiry
} else {
expiry
}
val result = q.add(item, normalizedExpiry)
if (result) totalAdded.incr()
result
}
}
def add(key: String, item: Array[Byte]): Boolean = add(key, item, 0)
/**
* Retrieve an item from a queue and pass it to a continuation. If no item is available within
* the requested time, or the server is shutting down, None is passed.
*/
def remove(key: String, timeout: Int, transaction: Boolean, peek: Boolean)(f: Option[QItem] => Unit): Unit = {
queue(key) match {
case None =>
queueMisses.incr
f(None)
case Some(q) =>
if (peek) {
f(q.peek())
} else {
q.remove
/* q.removeReact(if (timeout == 0) timeout else System.currentTimeMillis + timeout, transaction) {
case None =>
queueMisses.incr
f(None)
case Some(item) =>
queueHits.incr
f(Some(item))
}
*/ }
}
}
// for testing.
def receive(key: String): Option[Array[Byte]] = {
var rv: Option[Array[Byte]] = None
val latch = new CountDownLatch(1)
remove(key, 0, false, false) {
case None =>
rv = None
latch.countDown
case Some(v) =>
rv = Some(v.data)
latch.countDown
}
latch.await
rv
}
def unremove(key: String, xid: Int) {
queue(key) map { q => q.unremove(xid) }
}
def confirmRemove(key: String, xid: Int) {
queue(key) map { q => q.confirmRemove(xid) }
}
def flush(key: String) {
queue(key) map { q => q.flush() }
}
def delete(name: String): Unit = synchronized {
if (!shuttingDown) {
queues.get(name) map { q =>
q.close()
q.destroyJournal()
queues.remove(name)
}
if (name contains '+') {
val master = name.split('+')(0)
fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) -= name
EventHandler.debug(this, "Fanout queue %s dropped from %s".format(name, master))
}
}
}
def flushExpired(name: String): Int = synchronized {
if (shuttingDown) {
0
} else {
queue(name) map { q => q.discardExpired() } getOrElse(0)
}
}
def flushAllExpired(): Int = synchronized {
queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName) }
}
def stats(key: String): Array[(String, String)] = queue(key) match {
case None => Array[(String, String)]()
case Some(q) =>
q.dumpStats() ++
fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
}
def dumpConfig(key: String): Array[String] = {
queue(key) match {
case None => Array()
case Some(q) => q.dumpConfig()
}
}
/**
* Shutdown this queue collection. All actors are asked to exit, and
* any future queue requests will fail.
*/
def shutdown: Unit = synchronized {
if (shuttingDown) {
return
}
shuttingDown = true
for ((name, q) <- queues) {
// synchronous, so the journals are all officially closed before we return.
q.close
}
queues.clear
}
}

View file

@ -0,0 +1,149 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue.tools
import java.io.{FileNotFoundException, IOException}
import scala.collection.mutable
import akka.actor.mailbox.filequeue._
class QueueDumper(filename: String) {
var offset = 0L
var operations = 0L
var currentXid = 0
val queue = new mutable.Queue[Int] {
def unget(item: Int) = prependElem(item)
}
val openTransactions = new mutable.HashMap[Int, Int]
def apply() {
val journal = new Journal(filename, false)
var lastDisplay = 0L
try {
for ((item, itemsize) <- journal.walk()) {
operations += 1
dumpItem(item)
offset += itemsize
if (QDumper.quiet && offset - lastDisplay > 1024 * 1024) {
print("\rReading journal: %-6s".format(Util.bytesToHuman(offset, 0)))
Console.flush()
lastDisplay = offset
}
}
print("\r" + (" " * 30))
println()
val totalItems = queue.size + openTransactions.size
val totalBytes = queue.foldLeft(0L) { _ + _ } + openTransactions.values.foldLeft(0L) { _ + _ }
println("Journal size: %d bytes, with %d operations.".format(offset, operations))
println("%d items totalling %d bytes.".format(totalItems, totalBytes))
} catch {
case e: FileNotFoundException =>
println("Can't open journal file: " + filename)
case e: IOException =>
println("Exception reading journal file: " + filename)
e.printStackTrace()
}
}
def dumpItem(item: JournalItem) {
val now = System.currentTimeMillis
if (!QDumper.quiet) print("%08x ".format(offset & 0xffffffffL))
item match {
case JournalItem.Add(qitem) =>
if (!QDumper.quiet) {
print("ADD %-6d".format(qitem.data.size))
if (qitem.xid > 0) {
print(" xid=%d".format(qitem.xid))
}
if (qitem.expiry > 0) {
if (qitem.expiry - now < 0) {
print(" expired")
} else {
print(" exp=%d".format(qitem.expiry - now))
}
}
println()
}
queue += qitem.data.size
case JournalItem.Remove =>
if (!QDumper.quiet) println("REM")
queue.dequeue
case JournalItem.RemoveTentative =>
do {
currentXid += 1
} while (openTransactions contains currentXid)
openTransactions(currentXid) = queue.dequeue
if (!QDumper.quiet) println("RSV %d".format(currentXid))
case JournalItem.SavedXid(xid) =>
if (!QDumper.quiet) println("XID %d".format(xid))
currentXid = xid
case JournalItem.Unremove(xid) =>
queue.unget(openTransactions.remove(xid).get)
if (!QDumper.quiet) println("CAN %d".format(xid))
case JournalItem.ConfirmRemove(xid) =>
if (!QDumper.quiet) println("ACK %d".format(xid))
openTransactions.remove(xid)
case x =>
if (!QDumper.quiet) println(x)
}
}
}
object QDumper {
val filenames = new mutable.ListBuffer[String]
var quiet = false
def usage() {
println()
println("usage: qdump.sh <journal-files...>")
println(" describe the contents of a kestrel journal file")
println()
println("options:")
println(" -q quiet: don't describe every line, just the summary")
println()
}
def parseArgs(args: List[String]): Unit = args match {
case Nil =>
case "--help" :: xs =>
usage()
System.exit(0)
case "-q" :: xs =>
quiet = true
parseArgs(xs)
case x :: xs =>
filenames += x
parseArgs(xs)
}
def main(args: Array[String]) {
parseArgs(args.toList)
if (filenames.size == 0) {
usage()
System.exit(0)
}
for (filename <- filenames) {
println("Queue: " + filename)
new QueueDumper(filename)()
}
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright 2009 Twitter, Inc.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.actor.mailbox.filequeue.tools
object Util {
val KILOBYTE = 1024L
val MEGABYTE = 1024 * KILOBYTE
val GIGABYTE = 1024 * MEGABYTE
def bytesToHuman(bytes: Long, minDivisor: Long) = {
if ((bytes == 0) && (minDivisor == 0)) {
"0"
} else {
val divisor = if ((bytes >= GIGABYTE * 95 / 100) || (minDivisor == GIGABYTE)) {
GIGABYTE
} else if ((bytes >= MEGABYTE * 95 / 100) || (minDivisor == MEGABYTE)) {
MEGABYTE
} else {
KILOBYTE
}
// add 1/2 when computing the dot, to force it to round up.
var dot = ((bytes % divisor) * 20 + divisor) / (2 * divisor)
var base = (bytes - (bytes % divisor)) / divisor
if (dot >= 10) {
base += 1
dot -= 10
}
base.toString + (if (base < 100) ("." + dot) else "") + (divisor match {
case KILOBYTE => "K"
case MEGABYTE => "M"
case GIGABYTE => "G"
case _ => ""
})
}
}
}

View file

@ -0,0 +1,21 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxStorage) {
def clean {
import FileBasedMailboxUtil._
FileUtils.deleteDirectory(new java.io.File(queuePath))
}
override def beforeAll() {
clean
super.beforeAll
}
override def afterEach() {
clean
super.afterEach
}
}

View file

@ -0,0 +1,835 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: MailboxProtocol.proto
package akka.actor.mailbox;
public final class MailboxProtocol {
private MailboxProtocol() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public static final class DurableMailboxMessageProtocol extends
com.google.protobuf.GeneratedMessage {
// Use DurableMailboxMessageProtocol.newBuilder() to construct.
private DurableMailboxMessageProtocol() {
initFields();
}
private DurableMailboxMessageProtocol(boolean noInit) {}
private static final DurableMailboxMessageProtocol defaultInstance;
public static DurableMailboxMessageProtocol getDefaultInstance() {
return defaultInstance;
}
public DurableMailboxMessageProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.actor.mailbox.MailboxProtocol.internal_static_DurableMailboxMessageProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.actor.mailbox.MailboxProtocol.internal_static_DurableMailboxMessageProtocol_fieldAccessorTable;
}
// required string ownerAddress = 1;
public static final int OWNERADDRESS_FIELD_NUMBER = 1;
private boolean hasOwnerAddress;
private java.lang.String ownerAddress_ = "";
public boolean hasOwnerAddress() { return hasOwnerAddress; }
public java.lang.String getOwnerAddress() { return ownerAddress_; }
// optional string senderAddress = 2;
public static final int SENDERADDRESS_FIELD_NUMBER = 2;
private boolean hasSenderAddress;
private java.lang.String senderAddress_ = "";
public boolean hasSenderAddress() { return hasSenderAddress; }
public java.lang.String getSenderAddress() { return senderAddress_; }
// optional .UuidProtocol futureUuid = 3;
public static final int FUTUREUUID_FIELD_NUMBER = 3;
private boolean hasFutureUuid;
private akka.actor.mailbox.MailboxProtocol.UuidProtocol futureUuid_;
public boolean hasFutureUuid() { return hasFutureUuid; }
public akka.actor.mailbox.MailboxProtocol.UuidProtocol getFutureUuid() { return futureUuid_; }
// required bytes message = 4;
public static final int MESSAGE_FIELD_NUMBER = 4;
private boolean hasMessage;
private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasMessage() { return hasMessage; }
public com.google.protobuf.ByteString getMessage() { return message_; }
private void initFields() {
futureUuid_ = akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance();
}
public final boolean isInitialized() {
if (!hasOwnerAddress) return false;
if (!hasMessage) return false;
if (hasFutureUuid()) {
if (!getFutureUuid().isInitialized()) return false;
}
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (hasOwnerAddress()) {
output.writeString(1, getOwnerAddress());
}
if (hasSenderAddress()) {
output.writeString(2, getSenderAddress());
}
if (hasFutureUuid()) {
output.writeMessage(3, getFutureUuid());
}
if (hasMessage()) {
output.writeBytes(4, getMessage());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasOwnerAddress()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(1, getOwnerAddress());
}
if (hasSenderAddress()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getSenderAddress());
}
if (hasFutureUuid()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, getFutureUuid());
}
if (hasMessage()) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(4, getMessage());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol result;
// Construct using akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol();
return builder;
}
protected akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.getDescriptor();
}
public akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol getDefaultInstanceForType() {
return akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol) {
return mergeFrom((akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol other) {
if (other == akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.getDefaultInstance()) return this;
if (other.hasOwnerAddress()) {
setOwnerAddress(other.getOwnerAddress());
}
if (other.hasSenderAddress()) {
setSenderAddress(other.getSenderAddress());
}
if (other.hasFutureUuid()) {
mergeFutureUuid(other.getFutureUuid());
}
if (other.hasMessage()) {
setMessage(other.getMessage());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 10: {
setOwnerAddress(input.readString());
break;
}
case 18: {
setSenderAddress(input.readString());
break;
}
case 26: {
akka.actor.mailbox.MailboxProtocol.UuidProtocol.Builder subBuilder = akka.actor.mailbox.MailboxProtocol.UuidProtocol.newBuilder();
if (hasFutureUuid()) {
subBuilder.mergeFrom(getFutureUuid());
}
input.readMessage(subBuilder, extensionRegistry);
setFutureUuid(subBuilder.buildPartial());
break;
}
case 34: {
setMessage(input.readBytes());
break;
}
}
}
}
// required string ownerAddress = 1;
public boolean hasOwnerAddress() {
return result.hasOwnerAddress();
}
public java.lang.String getOwnerAddress() {
return result.getOwnerAddress();
}
public Builder setOwnerAddress(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasOwnerAddress = true;
result.ownerAddress_ = value;
return this;
}
public Builder clearOwnerAddress() {
result.hasOwnerAddress = false;
result.ownerAddress_ = getDefaultInstance().getOwnerAddress();
return this;
}
// optional string senderAddress = 2;
public boolean hasSenderAddress() {
return result.hasSenderAddress();
}
public java.lang.String getSenderAddress() {
return result.getSenderAddress();
}
public Builder setSenderAddress(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasSenderAddress = true;
result.senderAddress_ = value;
return this;
}
public Builder clearSenderAddress() {
result.hasSenderAddress = false;
result.senderAddress_ = getDefaultInstance().getSenderAddress();
return this;
}
// optional .UuidProtocol futureUuid = 3;
public boolean hasFutureUuid() {
return result.hasFutureUuid();
}
public akka.actor.mailbox.MailboxProtocol.UuidProtocol getFutureUuid() {
return result.getFutureUuid();
}
public Builder setFutureUuid(akka.actor.mailbox.MailboxProtocol.UuidProtocol value) {
if (value == null) {
throw new NullPointerException();
}
result.hasFutureUuid = true;
result.futureUuid_ = value;
return this;
}
public Builder setFutureUuid(akka.actor.mailbox.MailboxProtocol.UuidProtocol.Builder builderForValue) {
result.hasFutureUuid = true;
result.futureUuid_ = builderForValue.build();
return this;
}
public Builder mergeFutureUuid(akka.actor.mailbox.MailboxProtocol.UuidProtocol value) {
if (result.hasFutureUuid() &&
result.futureUuid_ != akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance()) {
result.futureUuid_ =
akka.actor.mailbox.MailboxProtocol.UuidProtocol.newBuilder(result.futureUuid_).mergeFrom(value).buildPartial();
} else {
result.futureUuid_ = value;
}
result.hasFutureUuid = true;
return this;
}
public Builder clearFutureUuid() {
result.hasFutureUuid = false;
result.futureUuid_ = akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance();
return this;
}
// required bytes message = 4;
public boolean hasMessage() {
return result.hasMessage();
}
public com.google.protobuf.ByteString getMessage() {
return result.getMessage();
}
public Builder setMessage(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
result.hasMessage = true;
result.message_ = value;
return this;
}
public Builder clearMessage() {
result.hasMessage = false;
result.message_ = getDefaultInstance().getMessage();
return this;
}
// @@protoc_insertion_point(builder_scope:DurableMailboxMessageProtocol)
}
static {
defaultInstance = new DurableMailboxMessageProtocol(true);
akka.actor.mailbox.MailboxProtocol.internalForceInit();
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:DurableMailboxMessageProtocol)
}
public static final class UuidProtocol extends
com.google.protobuf.GeneratedMessage {
// Use UuidProtocol.newBuilder() to construct.
private UuidProtocol() {
initFields();
}
private UuidProtocol(boolean noInit) {}
private static final UuidProtocol defaultInstance;
public static UuidProtocol getDefaultInstance() {
return defaultInstance;
}
public UuidProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.actor.mailbox.MailboxProtocol.internal_static_UuidProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.actor.mailbox.MailboxProtocol.internal_static_UuidProtocol_fieldAccessorTable;
}
// required uint64 high = 1;
public static final int HIGH_FIELD_NUMBER = 1;
private boolean hasHigh;
private long high_ = 0L;
public boolean hasHigh() { return hasHigh; }
public long getHigh() { return high_; }
// required uint64 low = 2;
public static final int LOW_FIELD_NUMBER = 2;
private boolean hasLow;
private long low_ = 0L;
public boolean hasLow() { return hasLow; }
public long getLow() { return low_; }
private void initFields() {
}
public final boolean isInitialized() {
if (!hasHigh) return false;
if (!hasLow) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (hasHigh()) {
output.writeUInt64(1, getHigh());
}
if (hasLow()) {
output.writeUInt64(2, getLow());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasHigh()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(1, getHigh());
}
if (hasLow()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, getLow());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.actor.mailbox.MailboxProtocol.UuidProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private akka.actor.mailbox.MailboxProtocol.UuidProtocol result;
// Construct using akka.actor.mailbox.MailboxProtocol.UuidProtocol.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new akka.actor.mailbox.MailboxProtocol.UuidProtocol();
return builder;
}
protected akka.actor.mailbox.MailboxProtocol.UuidProtocol internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new akka.actor.mailbox.MailboxProtocol.UuidProtocol();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDescriptor();
}
public akka.actor.mailbox.MailboxProtocol.UuidProtocol getDefaultInstanceForType() {
return akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public akka.actor.mailbox.MailboxProtocol.UuidProtocol build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private akka.actor.mailbox.MailboxProtocol.UuidProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public akka.actor.mailbox.MailboxProtocol.UuidProtocol buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
akka.actor.mailbox.MailboxProtocol.UuidProtocol returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.actor.mailbox.MailboxProtocol.UuidProtocol) {
return mergeFrom((akka.actor.mailbox.MailboxProtocol.UuidProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.actor.mailbox.MailboxProtocol.UuidProtocol other) {
if (other == akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance()) return this;
if (other.hasHigh()) {
setHigh(other.getHigh());
}
if (other.hasLow()) {
setLow(other.getLow());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 8: {
setHigh(input.readUInt64());
break;
}
case 16: {
setLow(input.readUInt64());
break;
}
}
}
}
// required uint64 high = 1;
public boolean hasHigh() {
return result.hasHigh();
}
public long getHigh() {
return result.getHigh();
}
public Builder setHigh(long value) {
result.hasHigh = true;
result.high_ = value;
return this;
}
public Builder clearHigh() {
result.hasHigh = false;
result.high_ = 0L;
return this;
}
// required uint64 low = 2;
public boolean hasLow() {
return result.hasLow();
}
public long getLow() {
return result.getLow();
}
public Builder setLow(long value) {
result.hasLow = true;
result.low_ = value;
return this;
}
public Builder clearLow() {
result.hasLow = false;
result.low_ = 0L;
return this;
}
// @@protoc_insertion_point(builder_scope:UuidProtocol)
}
static {
defaultInstance = new UuidProtocol(true);
akka.actor.mailbox.MailboxProtocol.internalForceInit();
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:UuidProtocol)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_DurableMailboxMessageProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_DurableMailboxMessageProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_UuidProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_UuidProtocol_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\025MailboxProtocol.proto\"\200\001\n\035DurableMailb" +
"oxMessageProtocol\022\024\n\014ownerAddress\030\001 \002(\t\022" +
"\025\n\rsenderAddress\030\002 \001(\t\022!\n\nfutureUuid\030\003 \001" +
"(\0132\r.UuidProtocol\022\017\n\007message\030\004 \002(\014\")\n\014Uu" +
"idProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004B\026\n" +
"\022akka.actor.mailboxH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_DurableMailboxMessageProtocol_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_DurableMailboxMessageProtocol_descriptor,
new java.lang.String[] { "OwnerAddress", "SenderAddress", "FutureUuid", "Message", },
akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.class,
akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.Builder.class);
internal_static_UuidProtocol_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_UuidProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_UuidProtocol_descriptor,
new java.lang.String[] { "High", "Low", },
akka.actor.mailbox.MailboxProtocol.UuidProtocol.class,
akka.actor.mailbox.MailboxProtocol.UuidProtocol.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
public static void internalForceInit() {}
// @@protoc_insertion_point(outer_class_scope)
}

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
option java_package = "akka.actor.mailbox";
option optimize_for = SPEED;
/******************************************
Compile with:
cd ./akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol
protoc MailboxProtocol.proto --java_out ../java
*******************************************/
/**
* Defines the durable mailbox message.
*/
message DurableMailboxMessageProtocol {
required string ownerAddress = 1;
optional string senderAddress = 2;
optional UuidProtocol futureUuid = 3;
required bytes message = 4;
}
/**
* Defines a UUID.
*/
message UuidProtocol {
required uint64 high = 1;
required uint64 low = 2;
}

View file

@ -0,0 +1,166 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.mailbox
import akka.actor.{newUuid, ActorRef}
import akka.util.ReflectiveAccess
import akka.dispatch._
import akka.config._
import akka.event.EventHandler
import java.lang.reflect.InvocationTargetException
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class DurableMailboxStorage(mailboxFQN: String) {
val constructorSignature = Array[Class[_]](classOf[ActorRef])
val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorRef].getClassLoader) match {
case Right(clazz) => clazz
case Left(exception) =>
val cause = exception match {
case i: InvocationTargetException => i.getTargetException
case _ => exception
}
throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString))
}
//TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc
def createFor(actor: ActorRef): AnyRef = {
EventHandler.debug(this, "Creating durable mailbox [%s] for [%s]".format(mailboxClass.getName, actor))
val ctor = mailboxClass.getDeclaredConstructor(constructorSignature: _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](actor): _*).asInstanceOf[AnyRef])
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](actor)) match {
case Right(instance) => instance
case Left(exception) =>
val cause = exception match {
case i: InvocationTargetException => i.getTargetException
case _ => exception
}
throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString))
}
}
}
case object RedisDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.RedisBasedMailbox")
case object BeanstalkDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.BeanstalkBasedMailbox")
case object FileDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.FileBasedMailbox")
case object ZooKeeperDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.ZooKeeperBasedMailbox")
/**
* The durable equivalent of ExecutorBasedEventDrivenDispatcher
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
case class DurableEventBasedDispatcher(
_name: String,
_storage: DurableMailboxStorage,
_throughput: Int = Dispatchers.THROUGHPUT,
_throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
_config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(
_name,
_throughput,
_throughputDeadlineTime,
_mailboxType,
_config) {
def this(_name: String, _storage: DurableMailboxStorage, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, _storage, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String, _storage: DurableMailboxStorage, throughput: Int, mailboxType: MailboxType) =
this(_name, _storage, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(_name: String, _storage: DurableMailboxStorage, throughput: Int) =
this(_name, _storage, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _storage: DurableMailboxStorage, _config: ThreadPoolConfig) =
this(_name, _storage, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
def this(_name: String, _storage: DurableMailboxStorage) =
this(_name, _storage, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
override def register(actorRef: ActorRef) {
super.register(actorRef)
val mbox = actorRef.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
if (mbox ne null) //Schedule the ActorRef for initial execution, because we might be resuming operations after a failure
super.registerForExecution(mbox)
}
override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef)
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
if (invocation.senderFuture.isDefined)
throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!")
super.dispatch(invocation)
}
}
/**
* The durable equivalent of ThreadBasedDispatcher
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
case class DurableThreadBasedDispatcher(
_actor: ActorRef,
_storage: DurableMailboxStorage,
_mailboxType: MailboxType) extends ThreadBasedDispatcher(_actor,_mailboxType) {
def this(actor: ActorRef, _storage: DurableMailboxStorage) =
this(actor, _storage, UnboundedMailbox()) // For Java API
def this(actor: ActorRef, _storage: DurableMailboxStorage, capacity: Int) =
this(actor, _storage, BoundedMailbox(capacity)) //For Java API
def this(actor: ActorRef, _storage: DurableMailboxStorage, capacity: Int, pushTimeOut: akka.util.Duration) = //For Java API
this(actor, _storage, BoundedMailbox(capacity, pushTimeOut))
override def register(actorRef: ActorRef) {
super.register(actorRef)
val mbox = actorRef.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
if (mbox ne null) //Schedule the ActorRef for initial execution, because we might be resuming operations after a failure
super.registerForExecution(mbox)
}
override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef)
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
if (invocation.senderFuture.isDefined)
throw new IllegalArgumentException("Actor has a durable mailbox that does not support !! or !!!")
super.dispatch(invocation)
}
}
/**
* Configurator for the DurableEventBasedDispatcher
* Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper" and "file"
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DurableEventBasedDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig => new DurableEventBasedDispatcher(
config.getString("name", newUuid.toString),
getStorage(config),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType(config),
threadPoolConfig)).build
}
def getStorage(config: Configuration): DurableMailboxStorage = {
val storage = config.getString("storage") map {
case "redis" => RedisDurableMailboxStorage
case "beanstalk" => BeanstalkDurableMailboxStorage
case "zookeeper" => ZooKeeperDurableMailboxStorage
case "file" => FileDurableMailboxStorage
case unknown => throw new IllegalArgumentException("[%s] is not a valid storage, valid options are [redis, beanstalk, zookeeper, file]" format unknown)
}
storage.getOrElse(throw new DurableMailboxException("No 'storage' defined for DurableEventBasedDispatcherConfigurator"))
}
}

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.mailbox
import MailboxProtocol._
import akka.actor.{Actor, ActorRef}
import akka.dispatch._
import akka.event.EventHandler
import akka.remote.MessageSerializer
import akka.remote.protocol.RemoteProtocol.MessageProtocol
import akka.AkkaException
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait DurableMailboxBase {
def serialize(message: MessageInvocation): Array[Byte]
def deserialize(bytes: Array[Byte]): MessageInvocation
}
private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
}
class DurableMailboxException private[akka](message: String, cause: Throwable = null) extends AkkaException(message, cause)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue with ExecutableMailbox with DurableMailboxBase {
import DurableExecutableMailboxConfig._
val ownerAddress = owner.address
val name = "mailbox_" + Name.replaceAllIn(ownerAddress, "_")
EventHandler.debug(this, "Creating %s mailbox [%s]".format(getClass.getName, name))
val dispatcher: ExecutorBasedEventDrivenDispatcher = owner.dispatcher match {
case e: ExecutorBasedEventDrivenDispatcher => e
case _ => null
}
//TODO: switch to RemoteProtocol
def serialize(durableMessage: MessageInvocation) = {
val message = MessageSerializer.serialize(durableMessage.message)
val builder = DurableMailboxMessageProtocol.newBuilder
.setOwnerAddress(ownerAddress)
.setMessage(message.toByteString)
if (durableMessage.sender.isDefined) builder.setSenderAddress(durableMessage.sender.get.address)
builder.build.toByteArray
}
//TODO: switch to RemoteProtocol
def deserialize(bytes: Array[Byte]) = {
val durableMessage = DurableMailboxMessageProtocol.parseFrom(bytes)
val messageProtocol = MessageProtocol.parseFrom(durableMessage.getMessage)
val message = MessageSerializer.deserialize(messageProtocol)
val ownerAddress = durableMessage.getOwnerAddress
val owner = Actor.registry.actorFor(ownerAddress).getOrElse(
throw new DurableMailboxException("No actor could be found for address [" + ownerAddress + "], could not deserialize message."))
val sender = if (durableMessage.hasSenderAddress) {
Actor.registry.actorFor(durableMessage.getSenderAddress)
} else None
new MessageInvocation(owner, message, sender, None)
}
}

View file

@ -0,0 +1,58 @@
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log
#
# ZooKeeper Logging Configuration
#
# Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only
log4j.rootLogger=${zookeeper.root.logger}
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add ROLLINGFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add TRACEFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n

View file

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- For assistance related to logback-translator or configuration -->
<!-- files in general, please contact the logback user mailing list -->
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
<!-- -->
<!-- For professional support please see -->
<!-- http://www.qos.ch/shop/products/professionalSupport -->
<!-- -->
<configuration scan="false" debug="false">
<!-- Errors were reported during translation. -->
<!-- Could not find transformer for org.apache.log4j.SimpleLayout -->
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
</appender>
<logger name="akka" level="DEBUG"/>
<logger name="org.mortbay.log" level="ERROR"/>
<logger name="org.apache.jasper" level="ERROR"/>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
</configuration>

View file

@ -0,0 +1,12 @@
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/export/crawlspace/mahadev/zookeeper/server1/data
# the port at which the clients will connect
clientPort=2181

View file

@ -0,0 +1,65 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
import akka.config.Supervision.Temporary
import akka.dispatch.MessageDispatcher
object DurableMailboxSpecActorFactory {
class MailboxTestActor extends Actor {
self.lifeCycle = Temporary
def receive = {
case "sum" => self.reply("sum")
}
}
def createMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = {
val queueActor = actorOf[MailboxTestActor]
queueActor.dispatcher = dispatcher
queueActor.start
}
}
abstract class DurableMailboxSpec(val backendName: String, val storage: DurableMailboxStorage) extends
WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll {
import DurableMailboxSpecActorFactory._
implicit val dispatcher = DurableEventBasedDispatcher(backendName, storage, 1)
"A " + backendName + " based mailbox backed actor" should {
"should handle reply to ! for 1 message" in {
val latch = new CountDownLatch(1)
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } ).start
queueActor.!("sum")(Some(sender))
latch.await(10, TimeUnit.SECONDS) must be (true)
}
"should handle reply to ! for multiple messages" in {
val latch = new CountDownLatch(5)
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } ).start
queueActor.!("sum")(Some(sender))
queueActor.!("sum")(Some(sender))
queueActor.!("sum")(Some(sender))
queueActor.!("sum")(Some(sender))
queueActor.!("sum")(Some(sender))
latch.await(10, TimeUnit.SECONDS) must be (true)
}
}
override def beforeEach() {
registry.local.shutdownAll
}
}

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.mailbox
import akka.actor.ActorRef
import akka.config.Config.config
import akka.dispatch._
import akka.event.EventHandler
import akka.AkkaException
import MailboxProtocol._
import com.redis._
class RedisBasedMailboxException(message: String) extends AkkaException(message)
trait Base64StringEncoder {
def byteArrayToString(bytes: Array[Byte]): String
def stringToByteArray(str: String): Array[Byte]
}
object CommonsCodec {
import org.apache.commons.codec.binary.Base64
import org.apache.commons.codec.binary.Base64._
val b64 = new Base64(true)
trait CommonsCodecBase64StringEncoder {
def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes)
def stringToByteArray(str: String) = b64.decode(str)
}
object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder
}
import CommonsCodec._
import CommonsCodec.Base64StringEncoder._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) {
val nodes = config.getList("akka.persistence.redis.cluster")// need an explicit definition in akka-conf
@volatile private var db = connect() //review Is the Redis connection thread safe?
def enqueue(message: MessageInvocation) = {
EventHandler.debug(this,
"\nENQUEUING message in redis-based mailbox [%s]".format(message))
withErrorHandling {
db.rpush(name, byteArrayToString(serialize(message)))
}
}
def dequeue: MessageInvocation = withErrorHandling {
try {
val item = db.lpop(name).map(stringToByteArray(_)).getOrElse(throw new NoSuchElementException(name + " not present"))
val messageInvocation = deserialize(item)
EventHandler.debug(this,
"\nDEQUEUING message in redis-based mailbox [%s]".format(messageInvocation))
messageInvocation
} catch {
case e: java.util.NoSuchElementException => null
case e =>
EventHandler.error(e, this, "Couldn't dequeue from Redis-based mailbox")
throw e
}
}
def size: Int = withErrorHandling {
db.llen(name).getOrElse(throw new NoSuchElementException(name + " not present"))
}
def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive
private[akka] def connect() =
nodes match {
case Seq() =>
// no cluster defined
new RedisClient(
config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"),
config.getInt("akka.actor.mailbox.redis.port", 6379))
case s =>
// with cluster
import com.redis.cluster._
EventHandler.info(this, "Running on Redis cluster")
new RedisCluster(nodes: _*) {
val keyTag = Some(NoOpKeyTag)
}
}
private def withErrorHandling[T](body: => T): T = {
try {
body
} catch {
case e: RedisConnectionException => {
db = connect()
body
}
case e =>
val error = new RedisBasedMailboxException("Could not connect to Redis server")
EventHandler.error(error, this, "Could not connect to Redis server")
throw error
}
}
}

View file

@ -0,0 +1,3 @@
package akka.actor.mailbox
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxStorage)

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.mailbox
import MailboxProtocol._
import akka.actor.ActorRef
import akka.dispatch._
import akka.config.Config._
import akka.event.EventHandler
import akka.util.Duration
import akka.cluster.zookeeper._
import akka.AkkaException
import org.I0Itec.zkclient.serialize._
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] object ZooKeeperMailboxConfig {
val zkServerAddresses = config.getString("akka.actor.mailbox.zookeeper.server-addresses", "localhost:2181")
val sessionTimeout = Duration(config.getInt("akka.actor.mailbox.zookeeper.session-timeout", 60), TIME_UNIT).toMillis.toInt
val connectionTimeout = Duration(config.getInt("akka.actor.mailbox.zookeeper.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val blockingQueue = config.getBool("akka.actor.mailbox.zookeeper.blocking-queue", true)
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"
object serializer extends ZkSerializer {
def serialize(data: AnyRef): Array[Byte] = data match {
case d: DurableMailboxMessageProtocol => d.toByteArray
case null => throw new ZooKeeperBasedMailboxException("Expected a DurableMailboxMessageProtocol message, was null")
case _ => throw new ZooKeeperBasedMailboxException("Expected a DurableMailboxMessageProtocol message, was [" + data.getClass + "]")
}
def deserialize(bytes: Array[Byte]): AnyRef = DurableMailboxMessageProtocol.parseFrom(bytes)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ZooKeeperBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) {
import ZooKeeperMailboxConfig._
private val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout)
private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), blockingQueue)
def enqueue(durableMessage: MessageInvocation) = {
EventHandler.debug(this,
"\nENQUEUING message in zookeeper-based mailbox [%s]".format(durableMessage))
queue.enqueue(serialize(durableMessage))
}
def dequeue: MessageInvocation = try {
val messageInvocation = deserialize(queue.dequeue.asInstanceOf[Array[Byte]])
EventHandler.debug(this,
"\nDEQUEUING message in zookeeper-based mailbox [%s]".format(messageInvocation))
messageInvocation
} catch {
case e: java.util.NoSuchElementException => null
case e: InterruptedException => null
case e =>
EventHandler.error(e, this, "Couldn't dequeue from ZooKeeper-based mailbox")
throw e
}
def size: Int = queue.size
def isEmpty: Boolean = queue.isEmpty
def clear(): Boolean = try {
queue.clear
true
} catch {
case e => false
}
def close = zkClient.close
}

View file

@ -0,0 +1,31 @@
package akka.actor.mailbox
import akka.actor.Actor
import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxStorage) {
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
var zkServer: ZkServer = _
override def beforeAll() {
zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath)
super.beforeAll
}
override def afterEach() {
Actor.registry.local.actors.foreach(_.mailbox match {
case zkm: ZooKeeperBasedMailbox => zkm.close
case _ => ()
})
super.afterEach
}
override def afterAll() {
zkServer.shutdown
super.afterAll
}
}

View file

@ -114,6 +114,45 @@ akka {
mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
# (in unit defined by the time-unit property)
}
mailbox {
file-based {
directory-path = "./_mb"
max-items = 2147483647
max-size = 2147483647
max-items = 2147483647
max-age = 0
max-journal-size = 16777216 # 16 * 1024 * 1024
max-memory-size = 134217728 # 128 * 1024 * 1024
max-journal-overflow = 10
max-journal-size-absolute = 9223372036854775807
discard-old-when-full = on
keep-journal = on
sync-journal = off
}
redis {
hostname = "127.0.0.1"
port = 6379
}
zookeeper {
server-addresses = "localhost:2181"
session-timeout = 60
connection-timeout = 60
blocking-queue = on
}
beanstalk {
hostname = "127.0.0.1"
port = 11300
reconnect-window = 5
message-submit-delay = 0
message-submit-timeout = 5
message-time-to-live = 120
}
}
}
cluster {

View file

@ -41,7 +41,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val ScalaToolsRelRepo = MavenRepository("Scala Tools Releases Repo", "http://scala-tools.org/repo-releases")
lazy val DatabinderRepo = MavenRepository("Databinder Repo", "http://databinder.net/repo")
lazy val ScalaToolsSnapshotRepo = MavenRepository("Scala-Tools Snapshot Repo", "http://scala-tools.org/repo-snapshots")
lazy val SunJDMKRepo = MavenRepository("WP5 Repository", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
lazy val SunJDMKRepo = MavenRepository("WP5 Repository", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
}
// -------------------------------------------------------------------------------------------------------------------
@ -63,6 +63,8 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo)
lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo)
lazy val sjsonModuleConfig = ModuleConfiguration("net.debasishg", ScalaToolsRelRepo)
lazy val redisModuleConfig = ModuleConfiguration("net.debasishg", ScalaToolsRelRepo)
lazy val beanstalkModuleConfig = ModuleConfiguration("beanstalk", AkkaRepo)
lazy val lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", "h2-lzf", AkkaRepo)
lazy val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo)
lazy val aspectWerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", "aspectwerkz", "2.2.3", AkkaRepo)
@ -93,82 +95,63 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
object Dependencies {
// Compile
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "provided" //Eclipse license
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "provided" //CDDL v1
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1
lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" //ApacheV2
lazy val multiverse_test = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" //ApacheV2
lazy val netty = "org.jboss.netty" % "netty" % "3.2.4.Final" % "compile" //ApacheV2
lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" //ApacheV2
lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD
lazy val sjson = "net.debasishg" %% "sjson" % "0.11" % "compile" //ApacheV2
lazy val sjson_test = "net.debasishg" %% "sjson" % "0.11" % "test" //ApacheV2
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION
lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime"
lazy val log4j = "log4j" % "log4j" % "1.2.15"
lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % ZOOKEEPER_VERSION //ApacheV2
lazy val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % ZOOKEEPER_VERSION //ApacheV2
lazy val zkClient = "zkclient" % "zkclient" % "0.2" //ApacheV2
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD
lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "provided" //Eclipse license
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "provided" //CDDL v1
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1
lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" //ApacheV2
lazy val netty = "org.jboss.netty" % "netty" % "3.2.4.Final" % "compile" //ApacheV2
lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" //ApacheV2
lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD
lazy val redis = "net.debasishg" % "redisclient_2.9.0" % "2.3.1" //ApacheV2
lazy val sjson = "net.debasishg" %% "sjson" % "0.11" % "compile" //ApacheV2
lazy val sjson_test = "net.debasishg" %% "sjson" % "0.11" % "test" //ApacheV2
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION // MIT
lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime" //MIT
lazy val log4j = "log4j" % "log4j" % "1.2.15" //ApacheV2
lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % ZOOKEEPER_VERSION //ApacheV2
lazy val zookeeper_lock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % ZOOKEEPER_VERSION //ApacheV2
lazy val zkClient = "zkclient" % "zkclient" % "0.2" //ApacheV2
// Test
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2
lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" //Eclipse license
lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" //Eclipse license
lazy val junit = "junit" % "junit" % "4.5" % "test" //Common Public License 1.0
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" //MIT
lazy val scalatest = "org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test" //ApacheV2
lazy val multiverse_test = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" //ApacheV2
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2
lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" //Eclipse license
lazy val testJettyWebApp = "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" //Eclipse license
lazy val junit = "junit" % "junit" % "4.5" % "test" //Common Public License 1.0
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" //MIT
lazy val scalatest = "org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test" //ApacheV2
}
// -------------------------------------------------------------------------------------------------------------------
// Subprojects
// -------------------------------------------------------------------------------------------------------------------
lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_))
lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor)
lazy val akka_actor_tests = project("akka-actor-tests", "akka-actor-tests", new AkkaActorTestsProject(_), akka_testkit)
lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor)
lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm, akka_actor_tests)
lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_))
lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor)
lazy val akka_actor_tests = project("akka-actor-tests", "akka-actor-tests", new AkkaActorTestsProject(_), akka_testkit)
lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor)
lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm, akka_actor_tests)
lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor)
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_remote)
lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor)
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_remote)
lazy val akka_durable_mailboxes = project("akka-durable-mailboxes", "akka-durable-mailboxes", new AkkaDurableMailboxesParentProject(_), akka_remote)
lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor)
lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor)
lazy val akka_tutorials = project("akka-tutorials", "akka-tutorials", new AkkaTutorialsParentProject(_), akka_actor)
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor)
lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor)
lazy val akka_tutorials = project("akka-tutorials", "akka-tutorials", new AkkaTutorialsParentProject(_), akka_actor)
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
// -------------------------------------------------------------------------------------------------------------------
// Miscellaneous
@ -191,7 +174,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
override def managedStyle = ManagedStyle.Maven
lazy val akkaPublishRepository = systemOptional[String]("akka.publish.repository", "default")
lazy val akkaPublishRepository = systemOptional[String]("akka.publish.repository", "default")
lazy val akkaPublishCredentials = systemOptional[String]("akka.publish.credentials", "none")
if (akkaPublishCredentials.value != "none") Credentials(akkaPublishCredentials.value, log)
@ -348,7 +331,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
class AkkaClusterProject(info: ProjectInfo) extends AkkaDefaultProject(info) with MultiJvmTests {
val bookkeeper = Dependencies.bookkeeper
val zookeeper = Dependencies.zookeeper
val zookeeperLock = Dependencies.zookeeperLock
val zookeeper_lock = Dependencies.zookeeper_lock
val zkClient = Dependencies.zkClient
val commons_io = Dependencies.commons_io
val log4j = Dependencies.log4j
@ -384,6 +367,50 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
val scalatest = Dependencies.scalatest
}
// -------------------------------------------------------------------------------------------------------------------
// The akka-durable-mailboxes subproject
// -------------------------------------------------------------------------------------------------------------------
class AkkaDurableMailboxesParentProject(info: ProjectInfo) extends ParentProject(info) {
lazy val akka_mailboxes_common =
project("akka-mailboxes-common", "akka-mailboxes-common", new AkkaMailboxesCommonProject(_), akka_remote)
lazy val akka_redis_mailbox =
project("akka-redis-mailbox", "akka-redis-mailbox", new AkkaRedisMailboxProject(_), akka_mailboxes_common)
lazy val akka_file_mailbox =
project("akka-file-mailbox", "akka-file-mailbox", new AkkaFileMailboxProject(_), akka_mailboxes_common)
lazy val akka_beanstalk_mailbox =
project("akka-beanstalk-mailbox", "akka-beanstalk-mailbox", new AkkaBeanstalkMailboxProject(_), akka_mailboxes_common)
lazy val akka_zookeeper_mailbox =
project("akka-zookeeper-mailbox", "akka-zookeeper-mailbox", new AkkaZooKeeperMailboxProject(_), akka_mailboxes_common, akka_cluster)
}
class AkkaMailboxesCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info) {
// test dependencies
val scalatest = Dependencies.scalatest
}
class AkkaRedisMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info) {
val redis = Dependencies.redis
lazy val redisTestsEnabled = systemOptional[Boolean]("mailbox.test.redis", false)
override def testOptions =
super.testOptions ++ (if (!redisTestsEnabled.value) Seq(testFilter("Redis")) else Seq.empty)
}
class AkkaFileMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info)
class AkkaBeanstalkMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info) {
val beanstalk = Dependencies.beanstalk
lazy val beanstalkTestsEnabled = systemOptional[Boolean]("mailbox.test.beanstalk", false)
override def testOptions =
super.testOptions ++ (if (!beanstalkTestsEnabled.value) Seq(testFilter("Beanstalk")) else Seq.empty)
}
class AkkaZooKeeperMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info)
// -------------------------------------------------------------------------------------------------------------------
// Samples
// -------------------------------------------------------------------------------------------------------------------
@ -525,6 +552,8 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq
}
def testFilter(containing: String) = TestFilter(test => !test.name.contains(containing))
override def testOptions = super.testOptions ++ excludeTests.map(exclude => TestFilter(test => !test.contains(exclude)))
lazy val publishRelease = {