From 160ff867cbe67fbb321b8df8f63206b0c7aae964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 17 May 2011 14:48:06 +0200 Subject: [PATCH] Added durable mailboxes: File-based, Redis-based, Zookeeper-based and Beanstalk-based --- .../src/main/scala/akka/actor/Deployer.scala | 2 +- .../actor/mailbox/BeanstalkBasedMailbox.scala | 107 +++ .../mailbox/BeanstalkBasedMailboxSpec.scala | 3 + .../actor/mailbox/FiledBasedMailbox.scala | 68 ++ .../filequeue/BrokenItemException.scala | 22 + .../actor/mailbox/filequeue/Counter.scala | 32 + .../actor/mailbox/filequeue/Journal.scala | 346 ++++++++ .../mailbox/filequeue/PersistentQueue.scala | 468 ++++++++++ .../akka/actor/mailbox/filequeue/QItem.scala | 53 ++ .../mailbox/filequeue/QueueCollection.scala | 237 +++++ .../mailbox/filequeue/tools/QDumper.scala | 149 ++++ .../actor/mailbox/filequeue/tools/Util.scala | 53 ++ .../actor/mailbox/FileBasedMailboxSpec.scala | 21 + .../akka/actor/mailbox/MailboxProtocol.java | 835 ++++++++++++++++++ .../src/main/protocol/MailboxProtocol.proto | 30 + .../actor/mailbox/DurableDispatcher.scala | 166 ++++ .../akka/actor/mailbox/DurableMailbox.scala | 71 ++ .../src/test/resources/log4j.properties | 58 ++ .../src/test/resources/logback-test.xml | 26 + .../src/test/resources/zoo.cfg | 12 + .../actor/mailbox/DurableMailboxSpec.scala | 65 ++ .../actor/mailbox/RedisBasedMailbox.scala | 108 +++ .../actor/mailbox/RedisBasedMailboxSpec.scala | 3 + .../actor/mailbox/ZooKeeperBasedMailbox.scala | 83 ++ .../mailbox/ZooKeeperBasedMailboxSpec.scala | 31 + config/akka-reference.conf | 39 + project/build/AkkaProject.scala | 167 ++-- 27 files changed, 3185 insertions(+), 70 deletions(-) create mode 100644 akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala create mode 100644 akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/BrokenItemException.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Counter.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QItem.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/Util.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/main/java/akka/actor/mailbox/MailboxProtocol.java create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol/MailboxProtocol.proto create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/zoo.cfg create mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala create mode 100644 akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala create mode 100644 akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala create mode 100644 akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala create mode 100644 akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 435f1f1601..e6ca32e5d6 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -333,7 +333,7 @@ object LocalDeployer { private val deployments = new ConcurrentHashMap[String, Deploy] private[akka] def init(deployments: List[Deploy]) { - EventHandler.info(this, "Initializing local deployer\nDeploying actors locally [\n%s\n]" format deployments.mkString("\n\t")) + EventHandler.info(this, "Deploying actors locally [\n\t%s\n]" format deployments.mkString("\n\t")) deployments foreach (deploy(_)) // deploy } diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala new file mode 100644 index 0000000000..fd7d50f20f --- /dev/null +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.mailbox + +import MailboxProtocol._ + +import akka.actor.ActorRef +import akka.dispatch._ +import akka.config.Config._ +import akka.util.Duration +import akka.AkkaException +import akka.event.EventHandler + +import com.surftools.BeanstalkClient._ +import com.surftools.BeanstalkClientImpl._ + +class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) + +/** + * @author Jonas Bonér + */ +class BeanstalkBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) { + + val hostname = config.getString("akka.actor.mailbox.beanstalk.hostname", "0.0.0.0") + val port = config.getInt("akka.enterprise.beanstalk.port", 11300) + val reconnectWindow = Duration(config.getInt("akka.actor.mailbox.beanstalk.reconnect-window", 5), TIME_UNIT).toSeconds.toInt + val messageSubmitDelay = Duration(config.getInt("akka.actor.mailbox.beanstalk.message-submit-delay", 0), TIME_UNIT).toSeconds.toInt + val messageSubmitTimeout = Duration(config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout", 5), TIME_UNIT).toSeconds.toInt + val messageTimeToLive = Duration(config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live", 120), TIME_UNIT).toSeconds.toInt + + private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) } + + // ===== For MessageQueue ===== + + def enqueue(durableMessage: MessageInvocation) = { + Some(queue.get.put(65536, messageSubmitDelay, messageTimeToLive, serialize(durableMessage)).toInt) + EventHandler.debug(this, "\nENQUEUING message in beanstalk-based mailbox [%s]".format(durableMessage)) + } + + def dequeue: MessageInvocation = try { + val job = queue.get.reserve(null) + if (job eq null) null: MessageInvocation + else { + val bytes = job.getData + if (bytes ne null) { + queue.get.delete(job.getJobId) + val messageInvocation = deserialize(bytes) + EventHandler.debug(this, "\nDEQUEUING message in beanstalk-based mailbox [%s]".format(messageInvocation)) + messageInvocation + } else null: MessageInvocation + } + } catch { + case e: Exception => + EventHandler.error(e, this, "Beanstalk connection error") + reconnect(name) + null: MessageInvocation + } + + /** + * Completely delete the queue. + */ + def remove: Boolean = { + try { + queue.get.kick(100000) + true + } catch { + case e: Exception => false + } + } + + def size: Int = { + val item = queue.get.reserve(0) + if (item eq null) 0 else 1 + } + + def isEmpty = size == 0 + + private def connect(name: String): Client = { + @volatile var connected = false + var attempts = 0 + var client: Client = null + while (!connected) { + attempts += 1 + try { + client = new ClientImpl(hostname, port) + client.useTube(name) + client.watch(name) + connected = true + } catch { + case e: Exception => + EventHandler.error(e, this, "Unable to connect to Beanstalk. Retrying in [%s] seconds: %s".format(reconnectWindow, e)) + try { + Thread.sleep(1000 * reconnectWindow) + } catch { + case e: InterruptedException => {} + } + } + } + EventHandler.info(this, "Beanstalk-based mailbox connected to Beanstalkd server") + client + } + + private def reconnect(name: String): ThreadLocal[Client] = { + new ThreadLocal[Client] { override def initialValue: Client = connect(name) } + } +} diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala new file mode 100644 index 0000000000..abebca8229 --- /dev/null +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -0,0 +1,3 @@ +package akka.actor.mailbox + +class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxStorage) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala new file mode 100644 index 0000000000..f866154f04 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.mailbox + +import akka.actor.ActorRef +import akka.dispatch._ +import akka.config.Config._ +import akka.event.EventHandler + +import org.apache.commons.io.FileUtils + +/** + * @author Jonas Bonér + */ +private[akka] object FileBasedMailboxUtil { + val queuePath = config.getString("akka.actor.mailbox.file-based.directory-path", "./_mb") // /var/spool/akka +} + +class FileBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) { + import FileBasedMailboxUtil._ + + private val queue = try { + try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case e => {} } + val queue = new filequeue.PersistentQueue(queuePath, name, config) + queue.setup // replays journal + queue.discardExpired + queue + } catch { + case e: Exception => + EventHandler.error(e, this, "Could not create a file-based mailbox") + throw e + } + + def enqueue(message: MessageInvocation) = { + EventHandler.debug(this, "\nENQUEUING message in file-based mailbox [%s]".format( message)) + queue.add(serialize(message)) + } + + def dequeue: MessageInvocation = try { + val item = queue.remove + if (item.isDefined) { + queue.confirmRemove(item.get.xid) + val messageInvocation = deserialize(item.get.data) + EventHandler.debug(this, "\nDEQUEUING message in file-based mailbox [%s]".format(messageInvocation)) + messageInvocation + } else null + } catch { + case e: java.util.NoSuchElementException => null + case e: Exception => + EventHandler.error(e, this, "Couldn't dequeue from file-based mailbox") + throw e + } + + /** + * Completely delete the queue. + */ + def remove: Boolean = try { + queue.remove + true + } catch { + case e => false //review why catch Throwable? And swallow potential Errors? + } + + def size: Int = queue.length.toInt + + def isEmpty: Boolean = size == 0 +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/BrokenItemException.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/BrokenItemException.scala new file mode 100644 index 0000000000..bc96778ec6 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/BrokenItemException.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue + +import java.io.IOException + +case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Counter.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Counter.scala new file mode 100644 index 0000000000..707515d345 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Counter.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue + +import java.util.concurrent.atomic.AtomicLong + +class Counter { + private val value = new AtomicLong(0) + + def apply() = value.get + def set(n: Long) = value.set(n) + def incr() = value.addAndGet(1) + def incr(n: Long) = value.addAndGet(n) + def decr() = value.addAndGet(-1) + def decr(n: Long) = value.addAndGet(-n) + override def toString = value.get.toString +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala new file mode 100644 index 0000000000..ecfce7ecc6 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala @@ -0,0 +1,346 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue + +import java.io._ +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.channels.FileChannel + +import akka.event.EventHandler + +// returned from journal replay +sealed trait JournalItem +object JournalItem { + case class Add(item: QItem) extends JournalItem + case object Remove extends JournalItem + case object RemoveTentative extends JournalItem + case class SavedXid(xid: Int) extends JournalItem + case class Unremove(xid: Int) extends JournalItem + case class ConfirmRemove(xid: Int) extends JournalItem + case object EndOfFile extends JournalItem +} + + +/** + * Codes for working with the journal file for a PersistentQueue. + */ +class Journal(queuePath: String, syncJournal: => Boolean) { + + private val queueFile = new File(queuePath) + + private var writer: FileChannel = null + private var reader: Option[FileChannel] = None + private var replayer: Option[FileChannel] = None + + var size: Long = 0 + + // small temporary buffer for formatting operations into the journal: + private val buffer = new Array[Byte](16) + private val byteBuffer = ByteBuffer.wrap(buffer) + byteBuffer.order(ByteOrder.LITTLE_ENDIAN) + + private val CMD_ADD = 0 + private val CMD_REMOVE = 1 + private val CMD_ADDX = 2 + private val CMD_REMOVE_TENTATIVE = 3 + private val CMD_SAVE_XID = 4 + private val CMD_UNREMOVE = 5 + private val CMD_CONFIRM_REMOVE = 6 + private val CMD_ADD_XID = 7 + + + private def open(file: File): Unit = { + writer = new FileOutputStream(file, true).getChannel + } + + def open(): Unit = { + open(queueFile) + } + + def roll(xid: Int, openItems: List[QItem], queue: Iterable[QItem]): Unit = { + writer.close + val tmpFile = new File(queuePath + "~~" + System.currentTimeMillis) + open(tmpFile) + size = 0 + for (item <- openItems) { + addWithXid(item) + removeTentative(false) + } + saveXid(xid) + for (item <- queue) { + add(false, item) + } + if (syncJournal) writer.force(false) + writer.close + tmpFile.renameTo(queueFile) + open + } + + def close(): Unit = { + writer.close + for (r <- reader) r.close + reader = None + } + + def erase(): Unit = { + try { + close() + queueFile.delete + } catch { + case _ => + } + } + + def inReadBehind(): Boolean = reader.isDefined + + def isReplaying(): Boolean = replayer.isDefined + + private def add(allowSync: Boolean, item: QItem): Unit = { + val blob = ByteBuffer.wrap(item.pack()) + size += write(false, CMD_ADDX.toByte, blob.limit) + do { + writer.write(blob) + } while (blob.position < blob.limit) + if (allowSync && syncJournal) writer.force(false) + size += blob.limit + } + + def add(item: QItem): Unit = add(true, item) + + // used only to list pending transactions when recreating the journal. + private def addWithXid(item: QItem) = { + val blob = ByteBuffer.wrap(item.pack()) + + // only called from roll(), so the journal does not need to be synced after a write. + size += write(false, CMD_ADD_XID.toByte, item.xid, blob.limit) + do { + writer.write(blob) + } while (blob.position < blob.limit) + size += blob.limit + } + + def remove() = { + size += write(true, CMD_REMOVE.toByte) + } + + private def removeTentative(allowSync: Boolean): Unit = { + size += write(allowSync, CMD_REMOVE_TENTATIVE.toByte) + } + + def removeTentative(): Unit = removeTentative(true) + + private def saveXid(xid: Int) = { + // only called from roll(), so the journal does not need to be synced after a write. + size += write(false, CMD_SAVE_XID.toByte, xid) + } + + def unremove(xid: Int) = { + size += write(true, CMD_UNREMOVE.toByte, xid) + } + + def confirmRemove(xid: Int) = { + size += write(true, CMD_CONFIRM_REMOVE.toByte, xid) + } + + def startReadBehind(): Unit = { + val pos = if (replayer.isDefined) replayer.get.position else writer.position + val rj = new FileInputStream(queueFile).getChannel + rj.position(pos) + reader = Some(rj) + } + + def fillReadBehind(f: QItem => Unit): Unit = { + val pos = if (replayer.isDefined) replayer.get.position else writer.position + for (rj <- reader) { + if (rj.position == pos) { + // we've caught up. + rj.close + reader = None + } else { + readJournalEntry(rj) match { + case (JournalItem.Add(item), _) => f(item) + case (_, _) => + } + } + } + } + + def replay(name: String)(f: JournalItem => Unit): Unit = { + size = 0 + var lastUpdate = 0L + val TEN_MB = 10L * 1024 * 1024 + try { + val in = new FileInputStream(queueFile).getChannel + try { + replayer = Some(in) + var done = false + do { + readJournalEntry(in) match { + case (JournalItem.EndOfFile, _) => done = true + case (x, itemsize) => + size += itemsize + f(x) + if (size / TEN_MB > lastUpdate) { + lastUpdate = size / TEN_MB + EventHandler.info(this, + "Continuing to read '%s' journal; %s MB so far...".format(name, lastUpdate * 10)) + } + } + } while (!done) + } catch { + case e: BrokenItemException => + EventHandler.error(e, this, "Exception replaying journal for '%s'".format(name)) + truncateJournal(e.lastValidPosition) + } + } catch { + case e: FileNotFoundException => + EventHandler.info(this, "No transaction journal for '%s'; starting with empty queue.".format(name)) + case e: IOException => + EventHandler.error(e, this, "Exception replaying journal for '%s'".format(name)) + // this can happen if the server hardware died abruptly in the middle + // of writing a journal. not awesome but we should recover. + } + replayer = None + } + + private def truncateJournal(position: Long) { + val trancateWriter = new FileOutputStream(queueFile, true).getChannel + try { + trancateWriter.truncate(position) + } finally { + trancateWriter.close() + } + } + + def readJournalEntry(in: FileChannel): (JournalItem, Int) = { + byteBuffer.rewind + byteBuffer.limit(1) + val lastPosition = in.position + var x: Int = 0 + do { + x = in.read(byteBuffer) + } while (byteBuffer.position < byteBuffer.limit && x >= 0) + + if (x < 0) { + (JournalItem.EndOfFile, 0) + } else { + try { + buffer(0) match { + case CMD_ADD => + val data = readBlock(in) + (JournalItem.Add(QItem.unpackOldAdd(data)), 5 + data.length) + case CMD_REMOVE => + (JournalItem.Remove, 1) + case CMD_ADDX => + val data = readBlock(in) + (JournalItem.Add(QItem.unpack(data)), 5 + data.length) + case CMD_REMOVE_TENTATIVE => + (JournalItem.RemoveTentative, 1) + case CMD_SAVE_XID => + val xid = readInt(in) + (JournalItem.SavedXid(xid), 5) + case CMD_UNREMOVE => + val xid = readInt(in) + (JournalItem.Unremove(xid), 5) + case CMD_CONFIRM_REMOVE => + val xid = readInt(in) + (JournalItem.ConfirmRemove(xid), 5) + case CMD_ADD_XID => + val xid = readInt(in) + val data = readBlock(in) + val item = QItem.unpack(data) + item.xid = xid + (JournalItem.Add(item), 9 + data.length) + case n => + throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + in.position)) + } + } catch { + case ex: IOException => + throw new BrokenItemException(lastPosition, ex) + } + } + } + + def walk() = new Iterator[(JournalItem, Int)] { + val in = new FileInputStream(queuePath).getChannel + var done = false + var nextItem: Option[(JournalItem, Int)] = None + + def hasNext = { + if (done) { + false + } else { + nextItem = readJournalEntry(in) match { + case (JournalItem.EndOfFile, _) => + done = true + in.close() + None + case x => + Some(x) + } + nextItem.isDefined + } + } + + def next() = nextItem.get + } + + private def readBlock(in: FileChannel): Array[Byte] = { + val size = readInt(in) + val data = new Array[Byte](size) + val dataBuffer = ByteBuffer.wrap(data) + var x: Int = 0 + do { + x = in.read(dataBuffer) + } while (dataBuffer.position < dataBuffer.limit && x >= 0) + if (x < 0) { + // we never expect EOF when reading a block. + throw new IOException("Unexpected EOF") + } + data + } + + private def readInt(in: FileChannel): Int = { + byteBuffer.rewind + byteBuffer.limit(4) + var x: Int = 0 + do { + x = in.read(byteBuffer) + } while (byteBuffer.position < byteBuffer.limit && x >= 0) + if (x < 0) { + // we never expect EOF when reading an int. + throw new IOException("Unexpected EOF") + } + byteBuffer.rewind + byteBuffer.getInt() + } + + private def write(allowSync: Boolean, items: Any*): Int = { + byteBuffer.clear + for (item <- items) item match { + case b: Byte => byteBuffer.put(b) + case i: Int => byteBuffer.putInt(i) + } + byteBuffer.flip + while (byteBuffer.position < byteBuffer.limit) { + writer.write(byteBuffer) + } + if (allowSync && syncJournal) writer.force(false) + byteBuffer.limit + } +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala new file mode 100644 index 0000000000..9414222276 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -0,0 +1,468 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue + +import java.io._ + +import scala.collection.mutable + +import akka.event.EventHandler + +import akka.config.Configuration + +// a config value that's backed by a global setting but may be locally overridden +class OverlaySetting[T](base: => T) { + @volatile private var local: Option[T] = None + + def set(value: Option[T]) = local = value + + def apply() = local.getOrElse(base) +} + + +class PersistentQueue(persistencePath: String, val name: String, val config: Configuration) { + + private case object ItemArrived + + // current size of all data in the queue: + private var queueSize: Long = 0 + + // # of items EVER added to the queue: + private var _totalItems: Long = 0 + + // # of items that were expired by the time they were read: + private var _totalExpired: Long = 0 + + // age (in milliseconds) of the last item read from the queue: + private var _currentAge: Long = 0 + + // # of items thot were discarded because the queue was full: + private var _totalDiscarded: Long = 0 + + // # of items in the queue (including those not in memory) + private var queueLength: Long = 0 + + private var queue = new mutable.Queue[QItem] { + // scala's Queue doesn't (yet?) have a way to put back. + def unget(item: QItem) = prependElem(item) + } + private var _memoryBytes: Long = 0 + + private var closed = false + private var paused = false + + def overlay[T](base: => T) = new OverlaySetting(base) + + // attempting to add an item after the queue reaches this size (in items) will fail. + val maxItems = overlay(PersistentQueue.maxItems) + + // attempting to add an item after the queue reaches this size (in bytes) will fail. + val maxSize = overlay(PersistentQueue.maxSize) + + // attempting to add an item larger than this size (in bytes) will fail. + val maxItemSize = overlay(PersistentQueue.maxItemSize) + + // maximum expiration time for this queue (seconds). + val maxAge = overlay(PersistentQueue.maxAge) + + // maximum journal size before the journal should be rotated. + val maxJournalSize = overlay(PersistentQueue.maxJournalSize) + + // maximum size of a queue before it drops into read-behind mode. + val maxMemorySize = overlay(PersistentQueue.maxMemorySize) + + // maximum overflow (multiplier) of a journal file before we re-create it. + val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) + + // absolute maximum size of a journal file until we rebuild it, no matter what. + val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) + + // whether to drop older items (instead of newer) when the queue is full + val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) + + // whether to keep a journal file at all + val keepJournal = overlay(PersistentQueue.keepJournal) + + // whether to sync the journal after each transaction + val syncJournal = overlay(PersistentQueue.syncJournal) + + // (optional) move expired items over to this queue + val expiredQueue = overlay(PersistentQueue.expiredQueue) + + private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal()) + + // track tentative removals + private var xidCounter: Int = 0 + private val openTransactions = new mutable.HashMap[Int, QItem] + def openTransactionCount = openTransactions.size + def openTransactionIds = openTransactions.keys.toList.sortWith(_ - _ > 0) + + def length: Long = synchronized { queueLength } + def totalItems: Long = synchronized { _totalItems } + def bytes: Long = synchronized { queueSize } + def journalSize: Long = synchronized { journal.size } + def totalExpired: Long = synchronized { _totalExpired } + def currentAge: Long = synchronized { if (queueSize == 0) 0 else _currentAge } + def totalDiscarded: Long = synchronized { _totalDiscarded } + def isClosed: Boolean = synchronized { closed || paused } + + // mostly for unit tests. + def memoryLength: Long = synchronized { queue.size } + def memoryBytes: Long = synchronized { _memoryBytes } + def inReadBehind = synchronized { journal.inReadBehind } + + //FIXME, segment commented out, might have damaged semantics, investigate. + //config.subscribe { c => configure(c.getOrElse(new Config)) } + configure(config) + + def configure(config: Configuration) = synchronized { + maxItems set config.getInt("akka.actor.mailbox.file-based.max-items") + maxSize set config.getLong("akka.actor.mailbox.file-based.max-size") + maxItemSize set config.getLong("akka.actor.mailbox.file-based.max-item-size") + maxAge set config.getInt("akka.actor.mailbox.file-based.max-age") + maxJournalSize set config.getLong("akka.actor.mailbox.file-based.max-journal-size") + maxMemorySize set config.getLong("akka.actor.mailbox.file-based.max-memory-size") + maxJournalOverflow set config.getInt("akka.actor.mailbox.file-based.max-journal-overflow") + maxJournalSizeAbsolute set config.getLong("akka.actor.mailbox.file-based.max-journal-size-absolute") + discardOldWhenFull set config.getBool("akka.actor.mailbox.file-based.discard-old-when-full") + keepJournal set config.getBool("akka.actor.mailbox.file-based.journal") + syncJournal set config.getBool("akka.actor.mailbox.file-based.sync-journal") + EventHandler.info(this, + "Configuring queue %s: journal=%s, max-items=%s, max-size=%s, max-age=%s, max-journal-size=%s, max-memory-size=%s, max-journal-overflow=%s, max-journal-size-absolute=%s, discard-old-when-full=%s, sync-journal=%s" + .format( + name, keepJournal(), maxItems(), maxSize(), maxAge(), maxJournalSize(), maxMemorySize(), + maxJournalOverflow(), maxJournalSizeAbsolute(), discardOldWhenFull(), syncJournal())) + if (!keepJournal()) journal.erase() + } + + def dumpConfig(): Array[String] = synchronized { + Array( + "max-items=" + maxItems(), + "max-size=" + maxSize(), + "max-age=" + maxAge(), + "max-journal-size=" + maxJournalSize(), + "max-memory-size=" + maxMemorySize(), + "max-journal-overflow=" + maxJournalOverflow(), + "max-journal-size-absolute=" + maxJournalSizeAbsolute(), + "discard-old-when-full=" + discardOldWhenFull(), + "journal=" + keepJournal(), + "sync-journal=" + syncJournal(), + "move-expired-to" + expiredQueue().map { _.name }.getOrElse("(none)") + ) + } + + def dumpStats(): Array[(String, String)] = synchronized { + Array( + ("items", length.toString), + ("bytes", bytes.toString), + ("total-items", totalItems.toString), + ("logsize", journalSize.toString), + ("expired-items", totalExpired.toString), + ("mem-items", memoryLength.toString), + ("mem-bytes", memoryBytes.toString), + ("age", currentAge.toString), + ("discarded", totalDiscarded.toString), + ("open-transactions", openTransactionCount.toString) + ) + } + + private final def adjustExpiry(startingTime: Long, expiry: Long): Long = { + if (maxAge() > 0) { + val maxExpiry = startingTime + maxAge() + if (expiry > 0) (expiry min maxExpiry) else maxExpiry + } else { + expiry + } + } + + /** + * Add a value to the end of the queue, transactionally. + */ + def add(value: Array[Byte], expiry: Long): Boolean = synchronized { + if (closed || value.size > maxItemSize()) return false + while (queueLength >= maxItems() || queueSize >= maxSize()) { + if (!discardOldWhenFull()) return false + _remove(false) + _totalDiscarded += 1 + if (keepJournal()) journal.remove() + } + + val now = System.currentTimeMillis + val item = QItem(now, adjustExpiry(now, expiry), value, 0) + if (keepJournal() && !journal.inReadBehind) { + if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) { + // force re-creation of the journal. + EventHandler.debug(this, + "Rolling journal file for '%s' (qsize=%s)".format(name, queueSize)) + journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue) + } + if (queueSize >= maxMemorySize()) { + EventHandler.debug(this, + "Dropping to read-behind for queue '%s' (%s bytes)".format(name, queueSize)) + journal.startReadBehind + } + } + _add(item) + if (keepJournal()) journal.add(item) + true + } + + def add(value: Array[Byte]): Boolean = add(value, 0) + + /** + * Peek at the head item in the queue, if there is one. + */ + def peek(): Option[QItem] = { + synchronized { + if (closed || paused || queueLength == 0) { + None + } else { + _peek() + } + } + } + + /** + * Remove and return an item from the queue, if there is one. + * + * @param transaction true if this should be considered the first part + * of a transaction, to be committed or rolled back (put back at the + * head of the queue) + */ + def remove(transaction: Boolean): Option[QItem] = { + synchronized { + if (closed || paused || queueLength == 0) { + None + } else { + val item = _remove(transaction) + if (keepJournal()) { + if (transaction) journal.removeTentative() else journal.remove() + + if ((queueLength == 0) && (journal.size >= maxJournalSize())) { + EventHandler.debug(this, "Rolling journal file for '%s'".format(name)) + journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, Nil) + } + } + item + } + } + } + + /** + * Remove and return an item from the queue, if there is one. + */ + def remove(): Option[QItem] = remove(false) + + /** + * Return a transactionally-removed item to the queue. This is a rolled- + * back transaction. + */ + def unremove(xid: Int): Unit = { + synchronized { + if (!closed) { + if (keepJournal()) journal.unremove(xid) + _unremove(xid) + } + } + } + + def confirmRemove(xid: Int): Unit = { + synchronized { + if (!closed) { + if (keepJournal()) journal.confirmRemove(xid) + openTransactions.remove(xid) + } + } + } + + def flush(): Unit = { + while (remove(false).isDefined) { } + } + + /** + * Close the queue's journal file. Not safe to call on an active queue. + */ + def close(): Unit = synchronized { + closed = true + if (keepJournal()) journal.close() + } + + def pauseReads(): Unit = synchronized { + paused = true + } + + def resumeReads(): Unit = synchronized { + paused = false + } + + def setup(): Unit = synchronized { + queueSize = 0 + replayJournal + } + + def destroyJournal(): Unit = synchronized { + if (keepJournal()) journal.erase() + } + + private final def nextXid(): Int = { + do { + xidCounter += 1 + } while (openTransactions contains xidCounter) + xidCounter + } + + private final def fillReadBehind(): Unit = { + // if we're in read-behind mode, scan forward in the journal to keep memory as full as + // possible. this amortizes the disk overhead across all reads. + while (keepJournal() && journal.inReadBehind && _memoryBytes < maxMemorySize()) { + journal.fillReadBehind { item => + queue += item + _memoryBytes += item.data.length + } + if (!journal.inReadBehind) { + EventHandler.debug(this, "Coming out of read-behind for queue '%s'".format(name)) + } + } + } + + def replayJournal(): Unit = { + if (!keepJournal()) return + + EventHandler.debug(this, "Replaying transaction journal for '%s'".format(name)) + xidCounter = 0 + + journal.replay(name) { + case JournalItem.Add(item) => + _add(item) + // when processing the journal, this has to happen after: + if (!journal.inReadBehind && queueSize >= maxMemorySize()) { + EventHandler.debug(this, + "Dropping to read-behind for queue '%s' (%s bytes)".format(name, queueSize)) + journal.startReadBehind + } + case JournalItem.Remove => _remove(false) + case JournalItem.RemoveTentative => _remove(true) + case JournalItem.SavedXid(xid) => xidCounter = xid + case JournalItem.Unremove(xid) => _unremove(xid) + case JournalItem.ConfirmRemove(xid) => openTransactions.remove(xid) + case x => EventHandler.warning(this, "Unexpected item in journal: %s".format(x)) + } + + EventHandler.debug(this, + "Finished transaction journal for '%s' (%s items, %s bytes)" + .format(name, queueLength, journal.size)) + journal.open + + // now, any unfinished transactions must be backed out. + for (xid <- openTransactionIds) { + journal.unremove(xid) + _unremove(xid) + } + } + + def toList(): List[QItem] = { + discardExpired + queue.toList + } + + // ----- internal implementations + + private def _add(item: QItem): Unit = { + discardExpired + if (!journal.inReadBehind) { + queue += item + _memoryBytes += item.data.length + } + _totalItems += 1 + queueSize += item.data.length + queueLength += 1 + } + + private def _peek(): Option[QItem] = { + discardExpired + if (queue.isEmpty) None else Some(queue.front) + } + + private def _remove(transaction: Boolean): Option[QItem] = { + discardExpired() + if (queue.isEmpty) return None + + val now = System.currentTimeMillis + val item = queue.dequeue + val len = item.data.length + queueSize -= len + _memoryBytes -= len + queueLength -= 1 + val xid = if (transaction) nextXid else 0 + + fillReadBehind + _currentAge = now - item.addTime + if (transaction) { + item.xid = xid + openTransactions(xid) = item + } + Some(item) + } + + final def discardExpired(): Int = { + if (queue.isEmpty || journal.isReplaying) { + 0 + } else { + val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry) + if ((realExpiry != 0) && (realExpiry < System.currentTimeMillis)) { + _totalExpired += 1 + val item = queue.dequeue + val len = item.data.length + queueSize -= len + _memoryBytes -= len + queueLength -= 1 + fillReadBehind + if (keepJournal()) journal.remove() + expiredQueue().map { _.add(item.data, 0) } + 1 + discardExpired() + } else { + 0 + } + } + } + + private def _unremove(xid: Int) = { + openTransactions.remove(xid) map { item => + queueLength += 1 + queueSize += item.data.length + queue unget item + _memoryBytes += item.data.length + } + } +} + + +object PersistentQueue { + @volatile var maxItems: Int = Int.MaxValue + @volatile var maxSize: Long = Long.MaxValue + @volatile var maxItemSize: Long = Long.MaxValue + @volatile var maxAge: Int = 0 + @volatile var maxJournalSize: Long = 16 * 1024 * 1024 + @volatile var maxMemorySize: Long = 128 * 1024 * 1024 + @volatile var maxJournalOverflow: Int = 10 + @volatile var maxJournalSizeAbsolute: Long = Long.MaxValue + @volatile var discardOldWhenFull: Boolean = false + @volatile var keepJournal: Boolean = true + @volatile var syncJournal: Boolean = false + @volatile var expiredQueue: Option[PersistentQueue] = None +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QItem.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QItem.scala new file mode 100644 index 0000000000..449a3aa878 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QItem.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue + +import java.nio.{ByteBuffer, ByteOrder} + +case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) { + def pack(): Array[Byte] = { + val bytes = new Array[Byte](data.length + 16) + val buffer = ByteBuffer.wrap(bytes) + buffer.order(ByteOrder.LITTLE_ENDIAN) + buffer.putLong(addTime) + buffer.putLong(expiry) + buffer.put(data) + bytes + } +} + +object QItem { + def unpack(data: Array[Byte]): QItem = { + val buffer = ByteBuffer.wrap(data) + val bytes = new Array[Byte](data.length - 16) + buffer.order(ByteOrder.LITTLE_ENDIAN) + val addTime = buffer.getLong + val expiry = buffer.getLong + buffer.get(bytes) + QItem(addTime, expiry, bytes, 0) + } + + def unpackOldAdd(data: Array[Byte]): QItem = { + val buffer = ByteBuffer.wrap(data) + val bytes = new Array[Byte](data.length - 4) + buffer.order(ByteOrder.LITTLE_ENDIAN) + val expiry = buffer.getInt + buffer.get(bytes) + QItem(System.currentTimeMillis, if (expiry == 0) 0 else expiry * 1000, bytes, 0) + } +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala new file mode 100644 index 0000000000..9a2f6c879b --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala @@ -0,0 +1,237 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue + +import java.io.File +import java.util.concurrent.CountDownLatch + +import scala.collection.mutable + +import akka.config.{Config, Configuration} +import akka.event.EventHandler + +class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable") + + +class QueueCollection(queueFolder: String, private var queueConfigs: Configuration) { + private val path = new File(queueFolder) + + if (! path.isDirectory) { + path.mkdirs() + } + if (! path.isDirectory || ! path.canWrite) { + throw new InaccessibleQueuePath + } + + private val queues = new mutable.HashMap[String, PersistentQueue] + private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]] + private var shuttingDown = false + + // total items added since the server started up. + val totalAdded = new Counter() + + // hits/misses on removing items from the queue + val queueHits = new Counter() + val queueMisses = new Counter() + + /* FIXME, segment commented out, might have damaged semantics, investigate. + queueConfigs.subscribe { c => + synchronized { + queueConfigs = c.getOrElse(new Config) + } + }*/ + + // preload any queues + def loadQueues() { + path.list() filter { name => !(name contains "~~") } map { queue(_) } + } + + def queueNames: List[String] = synchronized { + queues.keys.toList + } + + def currentItems = queues.values.foldLeft(0L) { _ + _.length } + def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes } + + /** + * Get a named queue, creating it if necessary. + * Exposed only to unit tests. + */ + private[akka] def queue(name: String): Option[PersistentQueue] = synchronized { + if (shuttingDown) { + None + } else { + Some(queues.get(name) getOrElse { + // only happens when creating a queue for the first time. + val q = if (name contains '+') { + val master = name.split('+')(0) + fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name + EventHandler.debug(this, "Fanout queue %s added to %s".format(name, master)) + new PersistentQueue(path.getPath, name, queueConfigs) + } else { + new PersistentQueue(path.getPath, name, queueConfigs) + } + q.setup + queues(name) = q + q + }) + } + } + + /** + * Add an item to a named queue. Will not return until the item has been + * synchronously added and written to the queue journal file. + * + * @return true if the item was added; false if the server is shutting + * down + */ + def add(key: String, item: Array[Byte], expiry: Int): Boolean = { + for (fanouts <- fanout_queues.get(key); name <- fanouts) { + add(name, item, expiry) + } + + queue(key) match { + case None => false + case Some(q) => + val now = System.currentTimeMillis + val normalizedExpiry: Long = if (expiry == 0) { + 0 + } else if (expiry < 1000000) { + now + expiry + } else { + expiry + } + val result = q.add(item, normalizedExpiry) + if (result) totalAdded.incr() + result + } + } + + def add(key: String, item: Array[Byte]): Boolean = add(key, item, 0) + + /** + * Retrieve an item from a queue and pass it to a continuation. If no item is available within + * the requested time, or the server is shutting down, None is passed. + */ + def remove(key: String, timeout: Int, transaction: Boolean, peek: Boolean)(f: Option[QItem] => Unit): Unit = { + queue(key) match { + case None => + queueMisses.incr + f(None) + case Some(q) => + if (peek) { + f(q.peek()) + } else { + q.remove +/* q.removeReact(if (timeout == 0) timeout else System.currentTimeMillis + timeout, transaction) { + case None => + queueMisses.incr + f(None) + case Some(item) => + queueHits.incr + f(Some(item)) + } +*/ } + } + } + + // for testing. + def receive(key: String): Option[Array[Byte]] = { + var rv: Option[Array[Byte]] = None + val latch = new CountDownLatch(1) + remove(key, 0, false, false) { + case None => + rv = None + latch.countDown + case Some(v) => + rv = Some(v.data) + latch.countDown + } + latch.await + rv + } + + def unremove(key: String, xid: Int) { + queue(key) map { q => q.unremove(xid) } + } + + def confirmRemove(key: String, xid: Int) { + queue(key) map { q => q.confirmRemove(xid) } + } + + def flush(key: String) { + queue(key) map { q => q.flush() } + } + + def delete(name: String): Unit = synchronized { + if (!shuttingDown) { + queues.get(name) map { q => + q.close() + q.destroyJournal() + queues.remove(name) + } + if (name contains '+') { + val master = name.split('+')(0) + fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) -= name + EventHandler.debug(this, "Fanout queue %s dropped from %s".format(name, master)) + } + } + } + + def flushExpired(name: String): Int = synchronized { + if (shuttingDown) { + 0 + } else { + queue(name) map { q => q.discardExpired() } getOrElse(0) + } + } + + def flushAllExpired(): Int = synchronized { + queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName) } + } + + def stats(key: String): Array[(String, String)] = queue(key) match { + case None => Array[(String, String)]() + case Some(q) => + q.dumpStats() ++ + fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList + } + + def dumpConfig(key: String): Array[String] = { + queue(key) match { + case None => Array() + case Some(q) => q.dumpConfig() + } + } + + /** + * Shutdown this queue collection. All actors are asked to exit, and + * any future queue requests will fail. + */ + def shutdown: Unit = synchronized { + if (shuttingDown) { + return + } + shuttingDown = true + for ((name, q) <- queues) { + // synchronous, so the journals are all officially closed before we return. + q.close + } + queues.clear + } +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala new file mode 100644 index 0000000000..9e99c9cd50 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala @@ -0,0 +1,149 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue.tools + +import java.io.{FileNotFoundException, IOException} +import scala.collection.mutable +import akka.actor.mailbox.filequeue._ + +class QueueDumper(filename: String) { + var offset = 0L + var operations = 0L + var currentXid = 0 + + val queue = new mutable.Queue[Int] { + def unget(item: Int) = prependElem(item) + } + val openTransactions = new mutable.HashMap[Int, Int] + + def apply() { + val journal = new Journal(filename, false) + var lastDisplay = 0L + + try { + for ((item, itemsize) <- journal.walk()) { + operations += 1 + dumpItem(item) + offset += itemsize + if (QDumper.quiet && offset - lastDisplay > 1024 * 1024) { + print("\rReading journal: %-6s".format(Util.bytesToHuman(offset, 0))) + Console.flush() + lastDisplay = offset + } + } + print("\r" + (" " * 30)) + + println() + val totalItems = queue.size + openTransactions.size + val totalBytes = queue.foldLeft(0L) { _ + _ } + openTransactions.values.foldLeft(0L) { _ + _ } + println("Journal size: %d bytes, with %d operations.".format(offset, operations)) + println("%d items totalling %d bytes.".format(totalItems, totalBytes)) + } catch { + case e: FileNotFoundException => + println("Can't open journal file: " + filename) + case e: IOException => + println("Exception reading journal file: " + filename) + e.printStackTrace() + } + } + + def dumpItem(item: JournalItem) { + val now = System.currentTimeMillis + if (!QDumper.quiet) print("%08x ".format(offset & 0xffffffffL)) + item match { + case JournalItem.Add(qitem) => + if (!QDumper.quiet) { + print("ADD %-6d".format(qitem.data.size)) + if (qitem.xid > 0) { + print(" xid=%d".format(qitem.xid)) + } + if (qitem.expiry > 0) { + if (qitem.expiry - now < 0) { + print(" expired") + } else { + print(" exp=%d".format(qitem.expiry - now)) + } + } + println() + } + queue += qitem.data.size + case JournalItem.Remove => + if (!QDumper.quiet) println("REM") + queue.dequeue + case JournalItem.RemoveTentative => + do { + currentXid += 1 + } while (openTransactions contains currentXid) + openTransactions(currentXid) = queue.dequeue + if (!QDumper.quiet) println("RSV %d".format(currentXid)) + case JournalItem.SavedXid(xid) => + if (!QDumper.quiet) println("XID %d".format(xid)) + currentXid = xid + case JournalItem.Unremove(xid) => + queue.unget(openTransactions.remove(xid).get) + if (!QDumper.quiet) println("CAN %d".format(xid)) + case JournalItem.ConfirmRemove(xid) => + if (!QDumper.quiet) println("ACK %d".format(xid)) + openTransactions.remove(xid) + case x => + if (!QDumper.quiet) println(x) + } + } +} + + +object QDumper { + val filenames = new mutable.ListBuffer[String] + var quiet = false + + def usage() { + println() + println("usage: qdump.sh ") + println(" describe the contents of a kestrel journal file") + println() + println("options:") + println(" -q quiet: don't describe every line, just the summary") + println() + } + + def parseArgs(args: List[String]): Unit = args match { + case Nil => + case "--help" :: xs => + usage() + System.exit(0) + case "-q" :: xs => + quiet = true + parseArgs(xs) + case x :: xs => + filenames += x + parseArgs(xs) + } + + def main(args: Array[String]) { + parseArgs(args.toList) + if (filenames.size == 0) { + usage() + System.exit(0) + } + + for (filename <- filenames) { + println("Queue: " + filename) + new QueueDumper(filename)() + } + } +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/Util.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/Util.scala new file mode 100644 index 0000000000..de46bbede9 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/Util.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2009 Twitter, Inc. + * Copyright 2009 Robey Pointer + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.actor.mailbox.filequeue.tools + +object Util { + val KILOBYTE = 1024L + val MEGABYTE = 1024 * KILOBYTE + val GIGABYTE = 1024 * MEGABYTE + + def bytesToHuman(bytes: Long, minDivisor: Long) = { + if ((bytes == 0) && (minDivisor == 0)) { + "0" + } else { + val divisor = if ((bytes >= GIGABYTE * 95 / 100) || (minDivisor == GIGABYTE)) { + GIGABYTE + } else if ((bytes >= MEGABYTE * 95 / 100) || (minDivisor == MEGABYTE)) { + MEGABYTE + } else { + KILOBYTE + } + + // add 1/2 when computing the dot, to force it to round up. + var dot = ((bytes % divisor) * 20 + divisor) / (2 * divisor) + var base = (bytes - (bytes % divisor)) / divisor + if (dot >= 10) { + base += 1 + dot -= 10 + } + + base.toString + (if (base < 100) ("." + dot) else "") + (divisor match { + case KILOBYTE => "K" + case MEGABYTE => "M" + case GIGABYTE => "G" + case _ => "" + }) + } + } +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala new file mode 100644 index 0000000000..35d6472a2c --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -0,0 +1,21 @@ +package akka.actor.mailbox + +import org.apache.commons.io.FileUtils + +class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxStorage) { + + def clean { + import FileBasedMailboxUtil._ + FileUtils.deleteDirectory(new java.io.File(queuePath)) + } + + override def beforeAll() { + clean + super.beforeAll + } + + override def afterEach() { + clean + super.afterEach + } +} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/java/akka/actor/mailbox/MailboxProtocol.java b/akka-durable-mailboxes/akka-mailboxes-common/src/main/java/akka/actor/mailbox/MailboxProtocol.java new file mode 100644 index 0000000000..1fcb87781c --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/java/akka/actor/mailbox/MailboxProtocol.java @@ -0,0 +1,835 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: MailboxProtocol.proto + +package akka.actor.mailbox; + +public final class MailboxProtocol { + private MailboxProtocol() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public static final class DurableMailboxMessageProtocol extends + com.google.protobuf.GeneratedMessage { + // Use DurableMailboxMessageProtocol.newBuilder() to construct. + private DurableMailboxMessageProtocol() { + initFields(); + } + private DurableMailboxMessageProtocol(boolean noInit) {} + + private static final DurableMailboxMessageProtocol defaultInstance; + public static DurableMailboxMessageProtocol getDefaultInstance() { + return defaultInstance; + } + + public DurableMailboxMessageProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.actor.mailbox.MailboxProtocol.internal_static_DurableMailboxMessageProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.actor.mailbox.MailboxProtocol.internal_static_DurableMailboxMessageProtocol_fieldAccessorTable; + } + + // required string ownerAddress = 1; + public static final int OWNERADDRESS_FIELD_NUMBER = 1; + private boolean hasOwnerAddress; + private java.lang.String ownerAddress_ = ""; + public boolean hasOwnerAddress() { return hasOwnerAddress; } + public java.lang.String getOwnerAddress() { return ownerAddress_; } + + // optional string senderAddress = 2; + public static final int SENDERADDRESS_FIELD_NUMBER = 2; + private boolean hasSenderAddress; + private java.lang.String senderAddress_ = ""; + public boolean hasSenderAddress() { return hasSenderAddress; } + public java.lang.String getSenderAddress() { return senderAddress_; } + + // optional .UuidProtocol futureUuid = 3; + public static final int FUTUREUUID_FIELD_NUMBER = 3; + private boolean hasFutureUuid; + private akka.actor.mailbox.MailboxProtocol.UuidProtocol futureUuid_; + public boolean hasFutureUuid() { return hasFutureUuid; } + public akka.actor.mailbox.MailboxProtocol.UuidProtocol getFutureUuid() { return futureUuid_; } + + // required bytes message = 4; + public static final int MESSAGE_FIELD_NUMBER = 4; + private boolean hasMessage; + private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasMessage() { return hasMessage; } + public com.google.protobuf.ByteString getMessage() { return message_; } + + private void initFields() { + futureUuid_ = akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance(); + } + public final boolean isInitialized() { + if (!hasOwnerAddress) return false; + if (!hasMessage) return false; + if (hasFutureUuid()) { + if (!getFutureUuid().isInitialized()) return false; + } + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasOwnerAddress()) { + output.writeString(1, getOwnerAddress()); + } + if (hasSenderAddress()) { + output.writeString(2, getSenderAddress()); + } + if (hasFutureUuid()) { + output.writeMessage(3, getFutureUuid()); + } + if (hasMessage()) { + output.writeBytes(4, getMessage()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasOwnerAddress()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(1, getOwnerAddress()); + } + if (hasSenderAddress()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getSenderAddress()); + } + if (hasFutureUuid()) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(3, getFutureUuid()); + } + if (hasMessage()) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, getMessage()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol result; + + // Construct using akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol(); + return builder; + } + + protected akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.getDescriptor(); + } + + public akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol getDefaultInstanceForType() { + return akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol) { + return mergeFrom((akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol other) { + if (other == akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.getDefaultInstance()) return this; + if (other.hasOwnerAddress()) { + setOwnerAddress(other.getOwnerAddress()); + } + if (other.hasSenderAddress()) { + setSenderAddress(other.getSenderAddress()); + } + if (other.hasFutureUuid()) { + mergeFutureUuid(other.getFutureUuid()); + } + if (other.hasMessage()) { + setMessage(other.getMessage()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + setOwnerAddress(input.readString()); + break; + } + case 18: { + setSenderAddress(input.readString()); + break; + } + case 26: { + akka.actor.mailbox.MailboxProtocol.UuidProtocol.Builder subBuilder = akka.actor.mailbox.MailboxProtocol.UuidProtocol.newBuilder(); + if (hasFutureUuid()) { + subBuilder.mergeFrom(getFutureUuid()); + } + input.readMessage(subBuilder, extensionRegistry); + setFutureUuid(subBuilder.buildPartial()); + break; + } + case 34: { + setMessage(input.readBytes()); + break; + } + } + } + } + + + // required string ownerAddress = 1; + public boolean hasOwnerAddress() { + return result.hasOwnerAddress(); + } + public java.lang.String getOwnerAddress() { + return result.getOwnerAddress(); + } + public Builder setOwnerAddress(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasOwnerAddress = true; + result.ownerAddress_ = value; + return this; + } + public Builder clearOwnerAddress() { + result.hasOwnerAddress = false; + result.ownerAddress_ = getDefaultInstance().getOwnerAddress(); + return this; + } + + // optional string senderAddress = 2; + public boolean hasSenderAddress() { + return result.hasSenderAddress(); + } + public java.lang.String getSenderAddress() { + return result.getSenderAddress(); + } + public Builder setSenderAddress(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasSenderAddress = true; + result.senderAddress_ = value; + return this; + } + public Builder clearSenderAddress() { + result.hasSenderAddress = false; + result.senderAddress_ = getDefaultInstance().getSenderAddress(); + return this; + } + + // optional .UuidProtocol futureUuid = 3; + public boolean hasFutureUuid() { + return result.hasFutureUuid(); + } + public akka.actor.mailbox.MailboxProtocol.UuidProtocol getFutureUuid() { + return result.getFutureUuid(); + } + public Builder setFutureUuid(akka.actor.mailbox.MailboxProtocol.UuidProtocol value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasFutureUuid = true; + result.futureUuid_ = value; + return this; + } + public Builder setFutureUuid(akka.actor.mailbox.MailboxProtocol.UuidProtocol.Builder builderForValue) { + result.hasFutureUuid = true; + result.futureUuid_ = builderForValue.build(); + return this; + } + public Builder mergeFutureUuid(akka.actor.mailbox.MailboxProtocol.UuidProtocol value) { + if (result.hasFutureUuid() && + result.futureUuid_ != akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance()) { + result.futureUuid_ = + akka.actor.mailbox.MailboxProtocol.UuidProtocol.newBuilder(result.futureUuid_).mergeFrom(value).buildPartial(); + } else { + result.futureUuid_ = value; + } + result.hasFutureUuid = true; + return this; + } + public Builder clearFutureUuid() { + result.hasFutureUuid = false; + result.futureUuid_ = akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance(); + return this; + } + + // required bytes message = 4; + public boolean hasMessage() { + return result.hasMessage(); + } + public com.google.protobuf.ByteString getMessage() { + return result.getMessage(); + } + public Builder setMessage(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasMessage = true; + result.message_ = value; + return this; + } + public Builder clearMessage() { + result.hasMessage = false; + result.message_ = getDefaultInstance().getMessage(); + return this; + } + + // @@protoc_insertion_point(builder_scope:DurableMailboxMessageProtocol) + } + + static { + defaultInstance = new DurableMailboxMessageProtocol(true); + akka.actor.mailbox.MailboxProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:DurableMailboxMessageProtocol) + } + + public static final class UuidProtocol extends + com.google.protobuf.GeneratedMessage { + // Use UuidProtocol.newBuilder() to construct. + private UuidProtocol() { + initFields(); + } + private UuidProtocol(boolean noInit) {} + + private static final UuidProtocol defaultInstance; + public static UuidProtocol getDefaultInstance() { + return defaultInstance; + } + + public UuidProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.actor.mailbox.MailboxProtocol.internal_static_UuidProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.actor.mailbox.MailboxProtocol.internal_static_UuidProtocol_fieldAccessorTable; + } + + // required uint64 high = 1; + public static final int HIGH_FIELD_NUMBER = 1; + private boolean hasHigh; + private long high_ = 0L; + public boolean hasHigh() { return hasHigh; } + public long getHigh() { return high_; } + + // required uint64 low = 2; + public static final int LOW_FIELD_NUMBER = 2; + private boolean hasLow; + private long low_ = 0L; + public boolean hasLow() { return hasLow; } + public long getLow() { return low_; } + + private void initFields() { + } + public final boolean isInitialized() { + if (!hasHigh) return false; + if (!hasLow) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasHigh()) { + output.writeUInt64(1, getHigh()); + } + if (hasLow()) { + output.writeUInt64(2, getLow()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasHigh()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, getHigh()); + } + if (hasLow()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(2, getLow()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.actor.mailbox.MailboxProtocol.UuidProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.actor.mailbox.MailboxProtocol.UuidProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private akka.actor.mailbox.MailboxProtocol.UuidProtocol result; + + // Construct using akka.actor.mailbox.MailboxProtocol.UuidProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new akka.actor.mailbox.MailboxProtocol.UuidProtocol(); + return builder; + } + + protected akka.actor.mailbox.MailboxProtocol.UuidProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new akka.actor.mailbox.MailboxProtocol.UuidProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDescriptor(); + } + + public akka.actor.mailbox.MailboxProtocol.UuidProtocol getDefaultInstanceForType() { + return akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public akka.actor.mailbox.MailboxProtocol.UuidProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private akka.actor.mailbox.MailboxProtocol.UuidProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public akka.actor.mailbox.MailboxProtocol.UuidProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + akka.actor.mailbox.MailboxProtocol.UuidProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.actor.mailbox.MailboxProtocol.UuidProtocol) { + return mergeFrom((akka.actor.mailbox.MailboxProtocol.UuidProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.actor.mailbox.MailboxProtocol.UuidProtocol other) { + if (other == akka.actor.mailbox.MailboxProtocol.UuidProtocol.getDefaultInstance()) return this; + if (other.hasHigh()) { + setHigh(other.getHigh()); + } + if (other.hasLow()) { + setLow(other.getLow()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 8: { + setHigh(input.readUInt64()); + break; + } + case 16: { + setLow(input.readUInt64()); + break; + } + } + } + } + + + // required uint64 high = 1; + public boolean hasHigh() { + return result.hasHigh(); + } + public long getHigh() { + return result.getHigh(); + } + public Builder setHigh(long value) { + result.hasHigh = true; + result.high_ = value; + return this; + } + public Builder clearHigh() { + result.hasHigh = false; + result.high_ = 0L; + return this; + } + + // required uint64 low = 2; + public boolean hasLow() { + return result.hasLow(); + } + public long getLow() { + return result.getLow(); + } + public Builder setLow(long value) { + result.hasLow = true; + result.low_ = value; + return this; + } + public Builder clearLow() { + result.hasLow = false; + result.low_ = 0L; + return this; + } + + // @@protoc_insertion_point(builder_scope:UuidProtocol) + } + + static { + defaultInstance = new UuidProtocol(true); + akka.actor.mailbox.MailboxProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:UuidProtocol) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_DurableMailboxMessageProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_DurableMailboxMessageProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_UuidProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_UuidProtocol_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\025MailboxProtocol.proto\"\200\001\n\035DurableMailb" + + "oxMessageProtocol\022\024\n\014ownerAddress\030\001 \002(\t\022" + + "\025\n\rsenderAddress\030\002 \001(\t\022!\n\nfutureUuid\030\003 \001" + + "(\0132\r.UuidProtocol\022\017\n\007message\030\004 \002(\014\")\n\014Uu" + + "idProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004B\026\n" + + "\022akka.actor.mailboxH\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_DurableMailboxMessageProtocol_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_DurableMailboxMessageProtocol_descriptor, + new java.lang.String[] { "OwnerAddress", "SenderAddress", "FutureUuid", "Message", }, + akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.class, + akka.actor.mailbox.MailboxProtocol.DurableMailboxMessageProtocol.Builder.class); + internal_static_UuidProtocol_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_UuidProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_UuidProtocol_descriptor, + new java.lang.String[] { "High", "Low", }, + akka.actor.mailbox.MailboxProtocol.UuidProtocol.class, + akka.actor.mailbox.MailboxProtocol.UuidProtocol.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + public static void internalForceInit() {} + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol/MailboxProtocol.proto b/akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol/MailboxProtocol.proto new file mode 100644 index 0000000000..47052886ab --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol/MailboxProtocol.proto @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +option java_package = "akka.actor.mailbox"; +option optimize_for = SPEED; + +/****************************************** + Compile with: + cd ./akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol + protoc MailboxProtocol.proto --java_out ../java +*******************************************/ + +/** + * Defines the durable mailbox message. + */ +message DurableMailboxMessageProtocol { + required string ownerAddress = 1; + optional string senderAddress = 2; + optional UuidProtocol futureUuid = 3; + required bytes message = 4; +} + +/** + * Defines a UUID. + */ +message UuidProtocol { + required uint64 high = 1; + required uint64 low = 2; +} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala new file mode 100644 index 0000000000..247956ff7c --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.mailbox + +import akka.actor.{newUuid, ActorRef} +import akka.util.ReflectiveAccess +import akka.dispatch._ +import akka.config._ +import akka.event.EventHandler + +import java.lang.reflect.InvocationTargetException + +/** + * @author Jonas Bonér + */ +sealed abstract class DurableMailboxStorage(mailboxFQN: String) { + val constructorSignature = Array[Class[_]](classOf[ActorRef]) + + val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorRef].getClassLoader) match { + case Right(clazz) => clazz + case Left(exception) => + val cause = exception match { + case i: InvocationTargetException => i.getTargetException + case _ => exception + } + throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString)) + } + + //TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc + def createFor(actor: ActorRef): AnyRef = { + EventHandler.debug(this, "Creating durable mailbox [%s] for [%s]".format(mailboxClass.getName, actor)) + val ctor = mailboxClass.getDeclaredConstructor(constructorSignature: _*) + ctor.setAccessible(true) + Some(ctor.newInstance(Array[AnyRef](actor): _*).asInstanceOf[AnyRef]) + + ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](actor)) match { + case Right(instance) => instance + case Left(exception) => + val cause = exception match { + case i: InvocationTargetException => i.getTargetException + case _ => exception + } + throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString)) + } + } +} + +case object RedisDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.RedisBasedMailbox") +case object BeanstalkDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.BeanstalkBasedMailbox") +case object FileDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.FileBasedMailbox") +case object ZooKeeperDurableMailboxStorage extends DurableMailboxStorage("akka.actor.mailbox.ZooKeeperBasedMailbox") + +/** + * The durable equivalent of ExecutorBasedEventDrivenDispatcher + * + * @author Jonas Bonér + */ +case class DurableEventBasedDispatcher( + _name: String, + _storage: DurableMailboxStorage, + _throughput: Int = Dispatchers.THROUGHPUT, + _throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + _mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + _config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher( + _name, + _throughput, + _throughputDeadlineTime, + _mailboxType, + _config) { + + def this(_name: String, _storage: DurableMailboxStorage, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(_name, _storage, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage + + def this(_name: String, _storage: DurableMailboxStorage, throughput: Int, mailboxType: MailboxType) = + this(_name, _storage, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(_name: String, _storage: DurableMailboxStorage, throughput: Int) = + this(_name, _storage, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(_name: String, _storage: DurableMailboxStorage, _config: ThreadPoolConfig) = + this(_name, _storage, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + + def this(_name: String, _storage: DurableMailboxStorage) = + this(_name, _storage, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + override def register(actorRef: ActorRef) { + super.register(actorRef) + val mbox = actorRef.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + if (mbox ne null) //Schedule the ActorRef for initial execution, because we might be resuming operations after a failure + super.registerForExecution(mbox) + } + + override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef) + + private[akka] override def dispatch(invocation: MessageInvocation): Unit = { + if (invocation.senderFuture.isDefined) + throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!") + super.dispatch(invocation) + } +} + +/** + * The durable equivalent of ThreadBasedDispatcher + * + * @author Jonas Bonér + */ +case class DurableThreadBasedDispatcher( + _actor: ActorRef, + _storage: DurableMailboxStorage, + _mailboxType: MailboxType) extends ThreadBasedDispatcher(_actor,_mailboxType) { + + def this(actor: ActorRef, _storage: DurableMailboxStorage) = + this(actor, _storage, UnboundedMailbox()) // For Java API + + def this(actor: ActorRef, _storage: DurableMailboxStorage, capacity: Int) = + this(actor, _storage, BoundedMailbox(capacity)) //For Java API + + def this(actor: ActorRef, _storage: DurableMailboxStorage, capacity: Int, pushTimeOut: akka.util.Duration) = //For Java API + this(actor, _storage, BoundedMailbox(capacity, pushTimeOut)) + + override def register(actorRef: ActorRef) { + super.register(actorRef) + val mbox = actorRef.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + if (mbox ne null) //Schedule the ActorRef for initial execution, because we might be resuming operations after a failure + super.registerForExecution(mbox) + } + + override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef) + + private[akka] override def dispatch(invocation: MessageInvocation): Unit = { + if (invocation.senderFuture.isDefined) + throw new IllegalArgumentException("Actor has a durable mailbox that does not support !! or !!!") + super.dispatch(invocation) + } +} + +/** + * Configurator for the DurableEventBasedDispatcher + * Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper" and "file" + * + * @author Jonas Bonér + */ +class DurableEventBasedDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: Configuration): MessageDispatcher = { + configureThreadPool(config, threadPoolConfig => new DurableEventBasedDispatcher( + config.getString("name", newUuid.toString), + getStorage(config), + config.getInt("throughput", Dispatchers.THROUGHPUT), + config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), + mailboxType(config), + threadPoolConfig)).build + } + + def getStorage(config: Configuration): DurableMailboxStorage = { + val storage = config.getString("storage") map { + case "redis" => RedisDurableMailboxStorage + case "beanstalk" => BeanstalkDurableMailboxStorage + case "zookeeper" => ZooKeeperDurableMailboxStorage + case "file" => FileDurableMailboxStorage + case unknown => throw new IllegalArgumentException("[%s] is not a valid storage, valid options are [redis, beanstalk, zookeeper, file]" format unknown) + } + + storage.getOrElse(throw new DurableMailboxException("No 'storage' defined for DurableEventBasedDispatcherConfigurator")) + } +} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala new file mode 100644 index 0000000000..db5d98f04a --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.mailbox + +import MailboxProtocol._ + +import akka.actor.{Actor, ActorRef} +import akka.dispatch._ +import akka.event.EventHandler +import akka.remote.MessageSerializer +import akka.remote.protocol.RemoteProtocol.MessageProtocol +import akka.AkkaException + +/** + * @author Jonas Bonér + */ +trait DurableMailboxBase { + def serialize(message: MessageInvocation): Array[Byte] + def deserialize(bytes: Array[Byte]): MessageInvocation +} + +private[akka] object DurableExecutableMailboxConfig { + val Name = "[\\.\\/\\$\\s]".r +} + +class DurableMailboxException private[akka](message: String, cause: Throwable = null) extends AkkaException(message, cause) + +/** + * @author Jonas Bonér + */ +abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue with ExecutableMailbox with DurableMailboxBase { + import DurableExecutableMailboxConfig._ + + val ownerAddress = owner.address + val name = "mailbox_" + Name.replaceAllIn(ownerAddress, "_") + + EventHandler.debug(this, "Creating %s mailbox [%s]".format(getClass.getName, name)) + + val dispatcher: ExecutorBasedEventDrivenDispatcher = owner.dispatcher match { + case e: ExecutorBasedEventDrivenDispatcher => e + case _ => null + } + + //TODO: switch to RemoteProtocol + def serialize(durableMessage: MessageInvocation) = { + val message = MessageSerializer.serialize(durableMessage.message) + val builder = DurableMailboxMessageProtocol.newBuilder + .setOwnerAddress(ownerAddress) + .setMessage(message.toByteString) + if (durableMessage.sender.isDefined) builder.setSenderAddress(durableMessage.sender.get.address) + builder.build.toByteArray + } + + //TODO: switch to RemoteProtocol + def deserialize(bytes: Array[Byte]) = { + val durableMessage = DurableMailboxMessageProtocol.parseFrom(bytes) + val messageProtocol = MessageProtocol.parseFrom(durableMessage.getMessage) + val message = MessageSerializer.deserialize(messageProtocol) + val ownerAddress = durableMessage.getOwnerAddress + val owner = Actor.registry.actorFor(ownerAddress).getOrElse( + throw new DurableMailboxException("No actor could be found for address [" + ownerAddress + "], could not deserialize message.")) + + + val sender = if (durableMessage.hasSenderAddress) { + Actor.registry.actorFor(durableMessage.getSenderAddress) + } else None + + new MessageInvocation(owner, message, sender, None) + } +} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties new file mode 100644 index 0000000000..9825970594 --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties @@ -0,0 +1,58 @@ +# Define some default values that can be overridden by system properties +zookeeper.root.logger=INFO, CONSOLE +zookeeper.console.threshold=INFO +zookeeper.log.dir=. +zookeeper.log.file=zookeeper.log +zookeeper.log.threshold=DEBUG +zookeeper.tracelog.dir=. +zookeeper.tracelog.file=zookeeper_trace.log + +# +# ZooKeeper Logging Configuration +# + +# Format is " (, )+ + +# DEFAULT: console appender only +log4j.rootLogger=${zookeeper.root.logger} + +# Example with rolling log file +#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE + +# Example with rolling log file and tracing +#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE + +# +# Log INFO level and above messages to the console +# +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold} +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n + +# +# Add ROLLINGFILE to rootLogger to get log file output +# Log DEBUG level and above messages to a log file +log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender +log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold} +log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file} + +# Max log file size of 10MB +log4j.appender.ROLLINGFILE.MaxFileSize=10MB +# uncomment the next line to limit number of backup files +#log4j.appender.ROLLINGFILE.MaxBackupIndex=10 + +log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout +log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n + + +# +# Add TRACEFILE to rootLogger to get log file output +# Log DEBUG level and above messages to a log file +log4j.appender.TRACEFILE=org.apache.log4j.FileAppender +log4j.appender.TRACEFILE.Threshold=TRACE +log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file} + +log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout +### Notice we are including log4j's NDC here (%x) +log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..240a412687 --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n + + + + + + + + + + + diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/zoo.cfg b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/zoo.cfg new file mode 100644 index 0000000000..b71eadcc33 --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/zoo.cfg @@ -0,0 +1,12 @@ +# The number of milliseconds of each tick +tickTime=2000 +# The number of ticks that the initial +# synchronization phase can take +initLimit=10 +# The number of ticks that can pass between +# sending a request and getting an acknowledgement +syncLimit=5 +# the directory where the snapshot is stored. +dataDir=/export/crawlspace/mahadev/zookeeper/server1/data +# the port at which the clients will connect +clientPort=2181 diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala new file mode 100644 index 0000000000..b1b3b57d9a --- /dev/null +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -0,0 +1,65 @@ +package akka.actor.mailbox + +import java.util.concurrent.TimeUnit + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } + +import akka.actor._ +import akka.actor.Actor._ +import java.util.concurrent.CountDownLatch +import akka.config.Supervision.Temporary +import akka.dispatch.MessageDispatcher + +object DurableMailboxSpecActorFactory { + + class MailboxTestActor extends Actor { + self.lifeCycle = Temporary + def receive = { + case "sum" => self.reply("sum") + } + } + + def createMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = { + val queueActor = actorOf[MailboxTestActor] + queueActor.dispatcher = dispatcher + queueActor.start + } +} + +abstract class DurableMailboxSpec(val backendName: String, val storage: DurableMailboxStorage) extends + WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll { + import DurableMailboxSpecActorFactory._ + + implicit val dispatcher = DurableEventBasedDispatcher(backendName, storage, 1) + + "A " + backendName + " based mailbox backed actor" should { + + "should handle reply to ! for 1 message" in { + val latch = new CountDownLatch(1) + val queueActor = createMailboxTestActor(backendName + " should handle reply to !") + val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } ).start + + queueActor.!("sum")(Some(sender)) + latch.await(10, TimeUnit.SECONDS) must be (true) + } + + "should handle reply to ! for multiple messages" in { + val latch = new CountDownLatch(5) + val queueActor = createMailboxTestActor(backendName + " should handle reply to !") + val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } ).start + + queueActor.!("sum")(Some(sender)) + queueActor.!("sum")(Some(sender)) + queueActor.!("sum")(Some(sender)) + queueActor.!("sum")(Some(sender)) + queueActor.!("sum")(Some(sender)) + latch.await(10, TimeUnit.SECONDS) must be (true) + } + } + + override def beforeEach() { + registry.local.shutdownAll + } +} diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala new file mode 100644 index 0000000000..0ec5f72df8 --- /dev/null +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.mailbox + +import akka.actor.ActorRef +import akka.config.Config.config +import akka.dispatch._ +import akka.event.EventHandler +import akka.AkkaException + +import MailboxProtocol._ + +import com.redis._ + +class RedisBasedMailboxException(message: String) extends AkkaException(message) + +trait Base64StringEncoder { + def byteArrayToString(bytes: Array[Byte]): String + def stringToByteArray(str: String): Array[Byte] +} + +object CommonsCodec { + import org.apache.commons.codec.binary.Base64 + import org.apache.commons.codec.binary.Base64._ + + val b64 = new Base64(true) + + trait CommonsCodecBase64StringEncoder { + def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) + def stringToByteArray(str: String) = b64.decode(str) + } + + object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder +} + +import CommonsCodec._ +import CommonsCodec.Base64StringEncoder._ + +/** + * @author Jonas Bonér + */ +class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) { + val nodes = config.getList("akka.persistence.redis.cluster")// need an explicit definition in akka-conf + + @volatile private var db = connect() //review Is the Redis connection thread safe? + + def enqueue(message: MessageInvocation) = { + EventHandler.debug(this, + "\nENQUEUING message in redis-based mailbox [%s]".format(message)) + withErrorHandling { + db.rpush(name, byteArrayToString(serialize(message))) + } + } + + def dequeue: MessageInvocation = withErrorHandling { + try { + val item = db.lpop(name).map(stringToByteArray(_)).getOrElse(throw new NoSuchElementException(name + " not present")) + val messageInvocation = deserialize(item) + EventHandler.debug(this, + "\nDEQUEUING message in redis-based mailbox [%s]".format(messageInvocation)) + messageInvocation + } catch { + case e: java.util.NoSuchElementException => null + case e => + EventHandler.error(e, this, "Couldn't dequeue from Redis-based mailbox") + throw e + } + } + + def size: Int = withErrorHandling { + db.llen(name).getOrElse(throw new NoSuchElementException(name + " not present")) + } + + def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive + + private[akka] def connect() = + nodes match { + case Seq() => + // no cluster defined + new RedisClient( + config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"), + config.getInt("akka.actor.mailbox.redis.port", 6379)) + + case s => + // with cluster + import com.redis.cluster._ + EventHandler.info(this, "Running on Redis cluster") + new RedisCluster(nodes: _*) { + val keyTag = Some(NoOpKeyTag) + } + } + + private def withErrorHandling[T](body: => T): T = { + try { + body + } catch { + case e: RedisConnectionException => { + db = connect() + body + } + case e => + val error = new RedisBasedMailboxException("Could not connect to Redis server") + EventHandler.error(error, this, "Could not connect to Redis server") + throw error + } + } +} diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala new file mode 100644 index 0000000000..3e6e477789 --- /dev/null +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -0,0 +1,3 @@ +package akka.actor.mailbox + +class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxStorage) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala new file mode 100644 index 0000000000..3886cf1cf4 --- /dev/null +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.mailbox + +import MailboxProtocol._ + +import akka.actor.ActorRef +import akka.dispatch._ +import akka.config.Config._ +import akka.event.EventHandler +import akka.util.Duration +import akka.cluster.zookeeper._ +import akka.AkkaException + +import org.I0Itec.zkclient.serialize._ + +class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) + +/** + * @author Jonas Bonér + */ +private[akka] object ZooKeeperMailboxConfig { + val zkServerAddresses = config.getString("akka.actor.mailbox.zookeeper.server-addresses", "localhost:2181") + val sessionTimeout = Duration(config.getInt("akka.actor.mailbox.zookeeper.session-timeout", 60), TIME_UNIT).toMillis.toInt + val connectionTimeout = Duration(config.getInt("akka.actor.mailbox.zookeeper.connection-timeout", 60), TIME_UNIT).toMillis.toInt + val blockingQueue = config.getBool("akka.actor.mailbox.zookeeper.blocking-queue", true) + + val queueNode = "/queues" + val queuePathTemplate = queueNode + "/%s" + + object serializer extends ZkSerializer { + def serialize(data: AnyRef): Array[Byte] = data match { + case d: DurableMailboxMessageProtocol => d.toByteArray + case null => throw new ZooKeeperBasedMailboxException("Expected a DurableMailboxMessageProtocol message, was null") + case _ => throw new ZooKeeperBasedMailboxException("Expected a DurableMailboxMessageProtocol message, was [" + data.getClass + "]") + } + + def deserialize(bytes: Array[Byte]): AnyRef = DurableMailboxMessageProtocol.parseFrom(bytes) + } +} + +/** + * @author Jonas Bonér + */ +class ZooKeeperBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) { + import ZooKeeperMailboxConfig._ + + private val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout) + private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), blockingQueue) + + def enqueue(durableMessage: MessageInvocation) = { + EventHandler.debug(this, + "\nENQUEUING message in zookeeper-based mailbox [%s]".format(durableMessage)) + queue.enqueue(serialize(durableMessage)) + } + + def dequeue: MessageInvocation = try { + val messageInvocation = deserialize(queue.dequeue.asInstanceOf[Array[Byte]]) + EventHandler.debug(this, + "\nDEQUEUING message in zookeeper-based mailbox [%s]".format(messageInvocation)) + messageInvocation + } catch { + case e: java.util.NoSuchElementException => null + case e: InterruptedException => null + case e => + EventHandler.error(e, this, "Couldn't dequeue from ZooKeeper-based mailbox") + throw e + } + + def size: Int = queue.size + + def isEmpty: Boolean = queue.isEmpty + + def clear(): Boolean = try { + queue.clear + true + } catch { + case e => false + } + + def close = zkClient.close +} diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala new file mode 100644 index 0000000000..b798d8fe8e --- /dev/null +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -0,0 +1,31 @@ +package akka.actor.mailbox + +import akka.actor.Actor +import akka.cluster.zookeeper._ + +import org.I0Itec.zkclient._ + +class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxStorage) { + val dataPath = "_akka_cluster/data" + val logPath = "_akka_cluster/log" + + var zkServer: ZkServer = _ + + override def beforeAll() { + zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath) + super.beforeAll + } + + override def afterEach() { + Actor.registry.local.actors.foreach(_.mailbox match { + case zkm: ZooKeeperBasedMailbox => zkm.close + case _ => () + }) + super.afterEach + } + + override def afterAll() { + zkServer.shutdown + super.afterAll + } +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 7f5b46a290..4ca27ad32b 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -114,6 +114,45 @@ akka { mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout # (in unit defined by the time-unit property) } + + mailbox { + + file-based { + directory-path = "./_mb" + max-items = 2147483647 + max-size = 2147483647 + max-items = 2147483647 + max-age = 0 + max-journal-size = 16777216 # 16 * 1024 * 1024 + max-memory-size = 134217728 # 128 * 1024 * 1024 + max-journal-overflow = 10 + max-journal-size-absolute = 9223372036854775807 + discard-old-when-full = on + keep-journal = on + sync-journal = off + } + + redis { + hostname = "127.0.0.1" + port = 6379 + } + + zookeeper { + server-addresses = "localhost:2181" + session-timeout = 60 + connection-timeout = 60 + blocking-queue = on + } + + beanstalk { + hostname = "127.0.0.1" + port = 11300 + reconnect-window = 5 + message-submit-delay = 0 + message-submit-timeout = 5 + message-time-to-live = 120 + } + } } cluster { diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 330ff6329e..156fa02dc7 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -41,7 +41,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val ScalaToolsRelRepo = MavenRepository("Scala Tools Releases Repo", "http://scala-tools.org/repo-releases") lazy val DatabinderRepo = MavenRepository("Databinder Repo", "http://databinder.net/repo") lazy val ScalaToolsSnapshotRepo = MavenRepository("Scala-Tools Snapshot Repo", "http://scala-tools.org/repo-snapshots") - lazy val SunJDMKRepo = MavenRepository("WP5 Repository", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") + lazy val SunJDMKRepo = MavenRepository("WP5 Repository", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") } // ------------------------------------------------------------------------------------------------------------------- @@ -63,6 +63,8 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo) lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo) lazy val sjsonModuleConfig = ModuleConfiguration("net.debasishg", ScalaToolsRelRepo) + lazy val redisModuleConfig = ModuleConfiguration("net.debasishg", ScalaToolsRelRepo) + lazy val beanstalkModuleConfig = ModuleConfiguration("beanstalk", AkkaRepo) lazy val lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", "h2-lzf", AkkaRepo) lazy val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo) lazy val aspectWerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", "aspectwerkz", "2.2.3", AkkaRepo) @@ -93,82 +95,63 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec object Dependencies { // Compile - lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain - - lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2 - - lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2 - - lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 - - lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2 - - lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 - - lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "provided" //Eclipse license - lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2 - - lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 - - lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 - lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 - - lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "provided" //CDDL v1 - - lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1 - - lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1 - - lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" //ApacheV2 - lazy val multiverse_test = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" //ApacheV2 - - lazy val netty = "org.jboss.netty" % "netty" % "3.2.4.Final" % "compile" //ApacheV2 - - lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" //ApacheV2 - - lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD - - lazy val sjson = "net.debasishg" %% "sjson" % "0.11" % "compile" //ApacheV2 - lazy val sjson_test = "net.debasishg" %% "sjson" % "0.11" % "test" //ApacheV2 - - lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION - lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime" - - lazy val log4j = "log4j" % "log4j" % "1.2.15" - - lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % ZOOKEEPER_VERSION //ApacheV2 - lazy val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % ZOOKEEPER_VERSION //ApacheV2 - lazy val zkClient = "zkclient" % "zkclient" % "0.2" //ApacheV2 + lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain + lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2 + lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD + lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2 + lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 + lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2 + lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 + lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "provided" //Eclipse license + lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2 + lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 + lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 + lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 + lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "provided" //CDDL v1 + lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1 + lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1 + lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" //ApacheV2 + lazy val netty = "org.jboss.netty" % "netty" % "3.2.4.Final" % "compile" //ApacheV2 + lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" //ApacheV2 + lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD + lazy val redis = "net.debasishg" % "redisclient_2.9.0" % "2.3.1" //ApacheV2 + lazy val sjson = "net.debasishg" %% "sjson" % "0.11" % "compile" //ApacheV2 + lazy val sjson_test = "net.debasishg" %% "sjson" % "0.11" % "test" //ApacheV2 + lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION // MIT + lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.28" % "runtime" //MIT + lazy val log4j = "log4j" % "log4j" % "1.2.15" //ApacheV2 + lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % ZOOKEEPER_VERSION //ApacheV2 + lazy val zookeeper_lock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % ZOOKEEPER_VERSION //ApacheV2 + lazy val zkClient = "zkclient" % "zkclient" % "0.2" //ApacheV2 // Test - - lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2 - - lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" //Eclipse license - lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" //Eclipse license - - lazy val junit = "junit" % "junit" % "4.5" % "test" //Common Public License 1.0 - lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" //MIT - lazy val scalatest = "org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test" //ApacheV2 + lazy val multiverse_test = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" //ApacheV2 + lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2 + lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" //Eclipse license + lazy val testJettyWebApp = "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" //Eclipse license + lazy val junit = "junit" % "junit" % "4.5" % "test" //Common Public License 1.0 + lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" //MIT + lazy val scalatest = "org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test" //ApacheV2 } // ------------------------------------------------------------------------------------------------------------------- // Subprojects // ------------------------------------------------------------------------------------------------------------------- - lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) - lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) - lazy val akka_actor_tests = project("akka-actor-tests", "akka-actor-tests", new AkkaActorTestsProject(_), akka_testkit) - lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) - lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm, akka_actor_tests) + lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) + lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) + lazy val akka_actor_tests = project("akka-actor-tests", "akka-actor-tests", new AkkaActorTestsProject(_), akka_testkit) + lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) + lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm, akka_actor_tests) - lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) - lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_remote) + lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) + lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_remote) + lazy val akka_durable_mailboxes = project("akka-durable-mailboxes", "akka-durable-mailboxes", new AkkaDurableMailboxesParentProject(_), akka_remote) - lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor) - lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor) - lazy val akka_tutorials = project("akka-tutorials", "akka-tutorials", new AkkaTutorialsParentProject(_), akka_actor) - lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) + lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor) + lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor) + lazy val akka_tutorials = project("akka-tutorials", "akka-tutorials", new AkkaTutorialsParentProject(_), akka_actor) + lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) // ------------------------------------------------------------------------------------------------------------------- // Miscellaneous @@ -191,7 +174,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec override def managedStyle = ManagedStyle.Maven - lazy val akkaPublishRepository = systemOptional[String]("akka.publish.repository", "default") + lazy val akkaPublishRepository = systemOptional[String]("akka.publish.repository", "default") lazy val akkaPublishCredentials = systemOptional[String]("akka.publish.credentials", "none") if (akkaPublishCredentials.value != "none") Credentials(akkaPublishCredentials.value, log) @@ -348,7 +331,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec class AkkaClusterProject(info: ProjectInfo) extends AkkaDefaultProject(info) with MultiJvmTests { val bookkeeper = Dependencies.bookkeeper val zookeeper = Dependencies.zookeeper - val zookeeperLock = Dependencies.zookeeperLock + val zookeeper_lock = Dependencies.zookeeper_lock val zkClient = Dependencies.zkClient val commons_io = Dependencies.commons_io val log4j = Dependencies.log4j @@ -384,6 +367,50 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec val scalatest = Dependencies.scalatest } + // ------------------------------------------------------------------------------------------------------------------- + // The akka-durable-mailboxes subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaDurableMailboxesParentProject(info: ProjectInfo) extends ParentProject(info) { + lazy val akka_mailboxes_common = + project("akka-mailboxes-common", "akka-mailboxes-common", new AkkaMailboxesCommonProject(_), akka_remote) + lazy val akka_redis_mailbox = + project("akka-redis-mailbox", "akka-redis-mailbox", new AkkaRedisMailboxProject(_), akka_mailboxes_common) + lazy val akka_file_mailbox = + project("akka-file-mailbox", "akka-file-mailbox", new AkkaFileMailboxProject(_), akka_mailboxes_common) + lazy val akka_beanstalk_mailbox = + project("akka-beanstalk-mailbox", "akka-beanstalk-mailbox", new AkkaBeanstalkMailboxProject(_), akka_mailboxes_common) + lazy val akka_zookeeper_mailbox = + project("akka-zookeeper-mailbox", "akka-zookeeper-mailbox", new AkkaZooKeeperMailboxProject(_), akka_mailboxes_common, akka_cluster) + } + + class AkkaMailboxesCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info) { + // test dependencies + val scalatest = Dependencies.scalatest + } + + class AkkaRedisMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info) { + val redis = Dependencies.redis + + lazy val redisTestsEnabled = systemOptional[Boolean]("mailbox.test.redis", false) + + override def testOptions = + super.testOptions ++ (if (!redisTestsEnabled.value) Seq(testFilter("Redis")) else Seq.empty) + } + + class AkkaFileMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info) + + class AkkaBeanstalkMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info) { + val beanstalk = Dependencies.beanstalk + + lazy val beanstalkTestsEnabled = systemOptional[Boolean]("mailbox.test.beanstalk", false) + + override def testOptions = + super.testOptions ++ (if (!beanstalkTestsEnabled.value) Seq(testFilter("Beanstalk")) else Seq.empty) + } + + class AkkaZooKeeperMailboxProject(info: ProjectInfo) extends AkkaDefaultProject(info) + // ------------------------------------------------------------------------------------------------------------------- // Samples // ------------------------------------------------------------------------------------------------------------------- @@ -525,6 +552,8 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq } + def testFilter(containing: String) = TestFilter(test => !test.name.contains(containing)) + override def testOptions = super.testOptions ++ excludeTests.map(exclude => TestFilter(test => !test.contains(exclude))) lazy val publishRelease = {