From 4b977361ebf52878d25126050fc53b1ec53b8f65 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 12 Mar 2014 11:23:58 +0100 Subject: [PATCH] !tra #3920 Remove deprecated durable mailboxes --- .../project/migration-guide-2.3.x-2.4.x.rst | 2 +- .../src/main/resources/reference.conf | 70 --- .../mailbox/filebased/FileBasedMailbox.scala | 86 ---- .../filebased/FileBasedMailboxSettings.scala | 37 -- .../filequeue/BrokenItemException.scala | 23 - .../mailbox/filebased/filequeue/Counter.scala | 33 -- .../mailbox/filebased/filequeue/Journal.scala | 345 ------------- .../filebased/filequeue/PersistentQueue.scala | 479 ------------------ .../mailbox/filebased/filequeue/QItem.scala | 55 -- .../filebased/filequeue/QueueCollection.scala | 227 --------- .../filebased/filequeue/tools/QDumper.scala | 158 ------ .../filebased/filequeue/tools/Util.scala | 54 -- .../filebased/FileBasedMailboxSpec.scala | 48 -- .../akka/actor/mailbox/DurableMailbox.scala | 134 ----- .../src/test/resources/log4j.properties | 58 --- .../src/test/resources/logback-test.xml | 26 - .../actor/mailbox/DurableMailboxSpec.scala | 178 ------- project/AkkaBuild.scala | 43 +- 18 files changed, 3 insertions(+), 2053 deletions(-) delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala delete mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala delete mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties delete mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml delete mode 100644 akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index c880fb8f84..ba539ef13a 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -26,4 +26,4 @@ The following, previously deprecated, features have been removed: * akka-dataflow * akka-transactor - +* durable mailboxes (akka-mailboxes-common, akka-file-mailbox) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf deleted file mode 100644 index 286f544c29..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ /dev/null @@ -1,70 +0,0 @@ -############################################# -# Akka File Mailboxes Reference Config File # -############################################# - -# This is the reference config file that contains all the default settings. -# Make your edits/overrides in your application.conf. -# -# For more information see - -akka { - actor { - mailbox { - # deprecated, superseded by akka-persistence - file-based { - # directory below which this queue resides - directory-path = "./_mb" - - # attempting to add an item after the queue reaches this size (in items) - # will fail. - max-items = 2147483647 - - # attempting to add an item after the queue reaches this size (in bytes) - # will fail. - max-size = 2147483647 bytes - - # attempting to add an item larger than this size (in bytes) will fail. - max-item-size = 2147483647 bytes - - # maximum expiration time for this queue (seconds). - max-age = 0s - - # maximum journal size before the journal should be rotated. - max-journal-size = 16 MiB - - # maximum size of a queue before it drops into read-behind mode. - max-memory-size = 128 MiB - - # maximum overflow (multiplier) of a journal file before we re-create it. - max-journal-overflow = 10 - - # absolute maximum size of a journal file until we rebuild it, - # no matter what. - max-journal-size-absolute = 9223372036854775807 bytes - - # whether to drop older items (instead of newer) when the queue is full - discard-old-when-full = on - - # whether to keep a journal file at all - keep-journal = on - - # whether to sync the journal after each transaction - sync-journal = off - - # circuit breaker configuration - circuit-breaker { - # maximum number of failures before opening breaker - max-failures = 3 - - # duration of time beyond which a call is assumed to be timed out and - # considered a failure - call-timeout = 3 seconds - - # duration of time to wait until attempting to reset the breaker during - # which all calls fail-fast - reset-timeout = 30 seconds - } - } - } - } -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala deleted file mode 100644 index f15d2faf95..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.actor.mailbox.filebased - -import akka.actor.mailbox._ -import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem } -import akka.event.Logging -import com.typesafe.config.Config -import akka.ConfigurationException -import akka.dispatch._ -import scala.util.control.NonFatal -import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker } -import scala.concurrent.duration.Duration - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { - private val settings = new FileBasedMailboxSettings(systemSettings, config) - override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = (owner zip system).headOption match { - case Some((o, s: ExtendedActorSystem)) ⇒ new FileBasedMessageQueue(o, s, settings) - case _ ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") - } -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val settings: FileBasedMailboxSettings) - extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization { - // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? - private val log = Logging(system, "FileBasedMessageQueue") - - val breaker = CircuitBreaker(system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) - - private val queue = try { - (new java.io.File(settings.QueuePath)) match { - case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) - case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir) - case _ ⇒ // All good - } - val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log) - queue.setup // replays journal - queue.discardExpired - queue - } catch { - case NonFatal(e) ⇒ - log.error(e, "Could not create a file-based mailbox") - throw e - } - - def enqueue(receiver: ActorRef, envelope: Envelope) { - breaker.withSyncCircuitBreaker(queue.add(serialize(envelope))) - } - - def dequeue(): Envelope = { - breaker.withSyncCircuitBreaker( - try { - queue.remove.map(item ⇒ { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull - } catch { - case _: java.util.NoSuchElementException ⇒ null - case e: CircuitBreakerOpenException ⇒ - log.debug(e.getMessage()) - throw e - case NonFatal(e) ⇒ - log.error(e, "Couldn't dequeue from file-based mailbox, due to [{}]", e.getMessage()) - throw e - }) - } - - def numberOfMessages: Int = { - breaker.withSyncCircuitBreaker(queue.length.toInt) - } - - def hasMessages: Boolean = numberOfMessages > 0 - - /** - * Completely delete the queue. - */ - def remove: Boolean = try { - queue.remove - true - } catch { - case NonFatal(_) ⇒ false - } - - def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala deleted file mode 100644 index f7cc2aa383..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.actor.mailbox.filebased - -import akka.actor.mailbox._ -import com.typesafe.config.Config -import scala.concurrent.duration._ -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.ActorSystem - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) - extends DurableMailboxSettings { - - def name: String = "file-based" - - val config = initialize - import config._ - - final val QueuePath: String = getString("directory-path") - final val MaxItems: Int = getInt("max-items") - final val MaxSize: Long = getBytes("max-size") - final val MaxItemSize: Long = getBytes("max-item-size") - final val MaxAge: FiniteDuration = Duration(getMilliseconds("max-age"), MILLISECONDS) - final val MaxJournalSize: Long = getBytes("max-journal-size") - final val MaxMemorySize: Long = getBytes("max-memory-size") - final val MaxJournalOverflow: Int = getInt("max-journal-overflow") - final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") - final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") - final val KeepJournal: Boolean = getBoolean("keep-journal") - final val SyncJournal: Boolean = getBoolean("sync-journal") - - final val CircuitBreakerMaxFailures: Int = getInt("circuit-breaker.max-failures") - final val CircuitBreakerCallTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) - final val CircuitBreakerResetTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala deleted file mode 100644 index 67ebffcf24..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue - -import java.io.IOException - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -final case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala deleted file mode 100644 index 9d3133018f..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue - -import java.util.concurrent.atomic.AtomicLong - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class Counter { - private val value = new AtomicLong(0) - - def apply() = value.get - def set(n: Long) = value.set(n) - def incr() = value.addAndGet(1) - def incr(n: Long) = value.addAndGet(n) - def decr() = value.addAndGet(-1) - def decr(n: Long) = value.addAndGet(-n) - override def toString = value.get.toString -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala deleted file mode 100644 index 2e7539e07a..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue - -import java.io._ -import java.nio.{ ByteBuffer, ByteOrder } -import java.nio.channels.FileChannel -import akka.event.LoggingAdapter -import scala.util.control.NonFatal - -// returned from journal replay -sealed trait JournalItem -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object JournalItem { - final case class Add(item: QItem) extends JournalItem - case object Remove extends JournalItem - case object RemoveTentative extends JournalItem - final case class SavedXid(xid: Int) extends JournalItem - final case class Unremove(xid: Int) extends JournalItem - final case class ConfirmRemove(xid: Int) extends JournalItem - case object EndOfFile extends JournalItem -} - -/** - * Codes for working with the journal file for a PersistentQueue. - */ -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter) { - - private val queueFile = new File(queuePath) - - private var writer: FileChannel = null - private var reader: Option[FileChannel] = None - private var replayer: Option[FileChannel] = None - - var size: Long = 0 - - // small temporary buffer for formatting operations into the journal: - private val buffer = new Array[Byte](16) - private val byteBuffer = ByteBuffer.wrap(buffer) - byteBuffer.order(ByteOrder.LITTLE_ENDIAN) - - private val CMD_ADD = 0 - private val CMD_REMOVE = 1 - private val CMD_ADDX = 2 - private val CMD_REMOVE_TENTATIVE = 3 - private val CMD_SAVE_XID = 4 - private val CMD_UNREMOVE = 5 - private val CMD_CONFIRM_REMOVE = 6 - private val CMD_ADD_XID = 7 - - private def open(file: File) { - writer = new FileOutputStream(file, true).getChannel - } - - def open() { - open(queueFile) - } - - def roll(xid: Int, openItems: List[QItem], queue: Iterable[QItem]) { - writer.close - val tmpFile = new File(queuePath + "~~" + System.currentTimeMillis) - open(tmpFile) - size = 0 - for (item ← openItems) { - addWithXid(item) - removeTentative(false) - } - saveXid(xid) - for (item ← queue) { - add(false, item) - } - if (syncJournal) writer.force(false) - writer.close - tmpFile.renameTo(queueFile) - open - } - - def close() { - writer.close - for (r ← reader) r.close - reader = None - } - - def erase() { - try { - close() - queueFile.delete - } catch { - case NonFatal(_) ⇒ - } - } - - def inReadBehind(): Boolean = reader.isDefined - - def isReplaying(): Boolean = replayer.isDefined - - private def add(allowSync: Boolean, item: QItem) { - val blob = ByteBuffer.wrap(item.pack()) - size += write(false, CMD_ADDX.toByte, blob.limit) - do { - writer.write(blob) - } while (blob.position < blob.limit) - if (allowSync && syncJournal) writer.force(false) - size += blob.limit - } - - def add(item: QItem): Unit = add(true, item) - - // used only to list pending transactions when recreating the journal. - private def addWithXid(item: QItem) = { - val blob = ByteBuffer.wrap(item.pack()) - - // only called from roll(), so the journal does not need to be synced after a write. - size += write(false, CMD_ADD_XID.toByte, item.xid, blob.limit) - do { - writer.write(blob) - } while (blob.position < blob.limit) - size += blob.limit - } - - def remove() = { - size += write(true, CMD_REMOVE.toByte) - } - - private def removeTentative(allowSync: Boolean) { - size += write(allowSync, CMD_REMOVE_TENTATIVE.toByte) - } - - def removeTentative(): Unit = removeTentative(true) - - private def saveXid(xid: Int) = { - // only called from roll(), so the journal does not need to be synced after a write. - size += write(false, CMD_SAVE_XID.toByte, xid) - } - - def unremove(xid: Int) = { - size += write(true, CMD_UNREMOVE.toByte, xid) - } - - def confirmRemove(xid: Int) = { - size += write(true, CMD_CONFIRM_REMOVE.toByte, xid) - } - - def startReadBehind() { - val pos = if (replayer.isDefined) replayer.get.position else writer.position - val rj = new FileInputStream(queueFile).getChannel - rj.position(pos) - reader = Some(rj) - } - - def fillReadBehind(f: QItem ⇒ Unit) { - val pos = if (replayer.isDefined) replayer.get.position else writer.position - for (rj ← reader) { - if (rj.position == pos) { - // we've caught up. - rj.close - reader = None - } else { - readJournalEntry(rj) match { - case (JournalItem.Add(item), _) ⇒ f(item) - case (_, _) ⇒ - } - } - } - } - - def replay(name: String)(f: JournalItem ⇒ Unit) { - size = 0 - var lastUpdate = 0L - val TEN_MB = 10L * 1024 * 1024 - try { - val in = new FileInputStream(queueFile).getChannel - try { - replayer = Some(in) - var done = false - do { - readJournalEntry(in) match { - case (JournalItem.EndOfFile, _) ⇒ done = true - case (x, itemsize) ⇒ - size += itemsize - f(x) - if (size / TEN_MB > lastUpdate) { - lastUpdate = size / TEN_MB - log.info("Continuing to read '{}' journal; {} MB so far...", name, lastUpdate * 10) - } - } - } while (!done) - } catch { - case e: BrokenItemException ⇒ - log.error(e, "Exception replaying journal for '{}'", name) - truncateJournal(e.lastValidPosition) - } - } catch { - case e: FileNotFoundException ⇒ - log.info("No transaction journal for '{}'; starting with empty queue.", name) - case e: IOException ⇒ - log.error(e, "Exception replaying journal for '{}'", name) - // this can happen if the server hardware died abruptly in the middle - // of writing a journal. not awesome but we should recover. - } - replayer = None - } - - private def truncateJournal(position: Long) { - val trancateWriter = new FileOutputStream(queueFile, true).getChannel - try { - trancateWriter.truncate(position) - } finally { - trancateWriter.close() - } - } - - def readJournalEntry(in: FileChannel): (JournalItem, Int) = { - byteBuffer.rewind - byteBuffer.limit(1) - val lastPosition = in.position - var x: Int = 0 - do { - x = in.read(byteBuffer) - } while (byteBuffer.position < byteBuffer.limit && x >= 0) - - if (x < 0) { - (JournalItem.EndOfFile, 0) - } else { - try { - buffer(0) match { - case CMD_ADD ⇒ - val data = readBlock(in) - (JournalItem.Add(QItem.unpackOldAdd(data)), 5 + data.length) - case CMD_REMOVE ⇒ - (JournalItem.Remove, 1) - case CMD_ADDX ⇒ - val data = readBlock(in) - (JournalItem.Add(QItem.unpack(data)), 5 + data.length) - case CMD_REMOVE_TENTATIVE ⇒ - (JournalItem.RemoveTentative, 1) - case CMD_SAVE_XID ⇒ - val xid = readInt(in) - (JournalItem.SavedXid(xid), 5) - case CMD_UNREMOVE ⇒ - val xid = readInt(in) - (JournalItem.Unremove(xid), 5) - case CMD_CONFIRM_REMOVE ⇒ - val xid = readInt(in) - (JournalItem.ConfirmRemove(xid), 5) - case CMD_ADD_XID ⇒ - val xid = readInt(in) - val data = readBlock(in) - val item = QItem.unpack(data) - item.xid = xid - (JournalItem.Add(item), 9 + data.length) - case n ⇒ - throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + in.position)) - } - } catch { - case ex: IOException ⇒ - throw new BrokenItemException(lastPosition, ex) - } - } - } - - def walk() = new Iterator[(JournalItem, Int)] { - val in = new FileInputStream(queuePath).getChannel - var done = false - var nextItem: Option[(JournalItem, Int)] = None - - def hasNext = { - if (done) { - false - } else { - nextItem = readJournalEntry(in) match { - case (JournalItem.EndOfFile, _) ⇒ - done = true - in.close() - None - case x ⇒ - Some(x) - } - nextItem.isDefined - } - } - - def next() = nextItem.get - } - - private def readBlock(in: FileChannel): Array[Byte] = { - val size = readInt(in) - val data = new Array[Byte](size) - val dataBuffer = ByteBuffer.wrap(data) - var x: Int = 0 - do { - x = in.read(dataBuffer) - } while (dataBuffer.position < dataBuffer.limit && x >= 0) - if (x < 0) { - // we never expect EOF when reading a block. - throw new IOException("Unexpected EOF") - } - data - } - - private def readInt(in: FileChannel): Int = { - byteBuffer.rewind - byteBuffer.limit(4) - var x: Int = 0 - do { - x = in.read(byteBuffer) - } while (byteBuffer.position < byteBuffer.limit && x >= 0) - if (x < 0) { - // we never expect EOF when reading an int. - throw new IOException("Unexpected EOF") - } - byteBuffer.rewind - byteBuffer.getInt() - } - - private def write(allowSync: Boolean, items: Any*): Int = { - byteBuffer.clear - for (item ← items) item match { - case b: Byte ⇒ byteBuffer.put(b) - case i: Int ⇒ byteBuffer.putInt(i) - } - byteBuffer.flip - while (byteBuffer.position < byteBuffer.limit) { - writer.write(byteBuffer) - } - if (allowSync && syncJournal) writer.force(false) - byteBuffer.limit - } -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala deleted file mode 100644 index 3f1582994b..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue - -import java.io._ -import scala.collection.mutable -import akka.event.LoggingAdapter -import java.util.concurrent.TimeUnit -import scala.annotation.tailrec -import akka.actor.mailbox.filebased.FileBasedMailboxSettings - -// a config value that's backed by a global setting but may be locally overridden -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class OverlaySetting[T](base: ⇒ T) { - @volatile - private var local: Option[T] = None - - def set(value: Option[T]) = local = value - - def apply() = local.getOrElse(base) -} - -trait Prependable[T] { - def prepend(t: T): Unit -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) { - - private case object ItemArrived - - // current size of all data in the queue: - private var queueSize: Long = 0 - - // # of items EVER added to the queue: - private var _totalItems: Long = 0 - - // # of items that were expired by the time they were read: - private var _totalExpired: Long = 0 - - // age (in milliseconds) of the last item read from the queue: - private var _currentAge: Long = 0 - - // # of items thot were discarded because the queue was full: - private var _totalDiscarded: Long = 0 - - // # of items in the queue (including those not in memory) - private var queueLength: Long = 0 - - private var queue: mutable.Queue[QItem] with Prependable[QItem] = new mutable.Queue[QItem] with Prependable[QItem] { - // scala's Queue doesn't (yet?) have a way to put back. - def prepend(item: QItem) = prependElem(item) - } - private var _memoryBytes: Long = 0 - - private var closed = false - private var paused = false - - def overlay[T](base: ⇒ T) = new OverlaySetting(base) - - // attempting to add an item after the queue reaches this size (in items) will fail. - final val maxItems = overlay(PersistentQueue.maxItems) - - // attempting to add an item after the queue reaches this size (in bytes) will fail. - final val maxSize = overlay(PersistentQueue.maxSize) - - // attempting to add an item larger than this size (in bytes) will fail. - final val maxItemSize = overlay(PersistentQueue.maxItemSize) - - // maximum expiration time for this queue (seconds). - final val maxAge = overlay(PersistentQueue.maxAge) - - // maximum journal size before the journal should be rotated. - final val maxJournalSize = overlay(PersistentQueue.maxJournalSize) - - // maximum size of a queue before it drops into read-behind mode. - final val maxMemorySize = overlay(PersistentQueue.maxMemorySize) - - // maximum overflow (multiplier) of a journal file before we re-create it. - final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) - - // absolute maximum size of a journal file until we rebuild it, no matter what. - final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) - - // whether to drop older items (instead of newer) when the queue is full - final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) - - // whether to keep a journal file at all - final val keepJournal = overlay(PersistentQueue.keepJournal) - - // whether to sync the journal after each transaction - final val syncJournal = overlay(PersistentQueue.syncJournal) - - // (optional) move expired items over to this queue - final val expiredQueue = overlay(PersistentQueue.expiredQueue) - - private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log) - - // track tentative remofinal vals - private var xidCounter: Int = 0 - private val openTransactions = new mutable.HashMap[Int, QItem] - def openTransactionCount = openTransactions.size - def openTransactionIds = openTransactions.keys.toList.sortWith(_ - _ > 0) - - def length: Long = synchronized { queueLength } - def totalItems: Long = synchronized { _totalItems } - def bytes: Long = synchronized { queueSize } - def journalSize: Long = synchronized { journal.size } - def totalExpired: Long = synchronized { _totalExpired } - def currentAge: Long = synchronized { if (queueSize == 0) 0 else _currentAge } - def totalDiscarded: Long = synchronized { _totalDiscarded } - def isClosed: Boolean = synchronized { closed || paused } - - // mostly for unit tests. - def memoryLength: Long = synchronized { queue.size } - def memoryBytes: Long = synchronized { _memoryBytes } - def inReadBehind = synchronized { journal.inReadBehind } - - configure(settings) - - def configure(settings: FileBasedMailboxSettings) = synchronized { - maxItems set Some(settings.MaxItems) - maxSize set Some(settings.MaxSize) - maxItemSize set Some(settings.MaxItemSize) - maxAge set Some(settings.MaxAge.toSeconds.toInt) - maxJournalSize set Some(settings.MaxJournalSize) - maxMemorySize set Some(settings.MaxMemorySize) - maxJournalOverflow set Some(settings.MaxJournalOverflow) - maxJournalSizeAbsolute set Some(settings.MaxJournalSizeAbsolute) - discardOldWhenFull set Some(settings.DiscardOldWhenFull) - keepJournal set Some(settings.KeepJournal) - syncJournal set Some(settings.SyncJournal) - log.info("Configuring queue %s: journal=%s, max-items=%s, max-size=%s, max-age=%s, max-journal-size=%s, max-memory-size=%s, max-journal-overflow=%s, max-journal-size-absolute=%s, discard-old-when-full=%s, sync-journal=%s" - .format( - name, keepJournal(), maxItems(), maxSize(), maxAge(), maxJournalSize(), maxMemorySize(), - maxJournalOverflow(), maxJournalSizeAbsolute(), discardOldWhenFull(), syncJournal())) - if (!keepJournal()) journal.erase() - } - - def dumpConfig(): Array[String] = synchronized { - Array( - "max-items=" + maxItems(), - "max-size=" + maxSize(), - "max-age=" + maxAge(), - "max-journal-size=" + maxJournalSize(), - "max-memory-size=" + maxMemorySize(), - "max-journal-overflow=" + maxJournalOverflow(), - "max-journal-size-absolute=" + maxJournalSizeAbsolute(), - "discard-old-when-full=" + discardOldWhenFull(), - "journal=" + keepJournal(), - "sync-journal=" + syncJournal(), - "move-expired-to" + expiredQueue().map { _.name }.getOrElse("(none)")) - } - - def dumpStats(): Array[(String, String)] = synchronized { - Array( - ("items", length.toString), - ("bytes", bytes.toString), - ("total-items", totalItems.toString), - ("logsize", journalSize.toString), - ("expired-items", totalExpired.toString), - ("mem-items", memoryLength.toString), - ("mem-bytes", memoryBytes.toString), - ("age", currentAge.toString), - ("discarded", totalDiscarded.toString), - ("open-transactions", openTransactionCount.toString)) - } - - private final def adjustExpiry(startingTime: Long, expiry: Long): Long = { - if (maxAge() > 0) { - val maxExpiry = startingTime + maxAge() - if (expiry > 0) (expiry min maxExpiry) else maxExpiry - } else { - expiry - } - } - - /** - * Add a value to the end of the queue, transactionally. - */ - def add(value: Array[Byte], expiry: Long): Boolean = synchronized { - if (closed || value.size > maxItemSize()) return false - while (queueLength >= maxItems() || queueSize >= maxSize()) { - if (!discardOldWhenFull()) return false - _remove(false) - _totalDiscarded += 1 - if (keepJournal()) journal.remove() - } - - val now = System.currentTimeMillis - val item = QItem(now, adjustExpiry(now, expiry), value, 0) - if (keepJournal() && !journal.inReadBehind) { - if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) { - // force re-creation of the journal. - log.debug("Rolling journal file for '{}' (qsize={})", name, queueSize) - journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue) - } - if (queueSize >= maxMemorySize()) { - log.debug("Dropping to read-behind for queue '{}' ({} bytes)", name, queueSize) - journal.startReadBehind - } - } - _add(item) - if (keepJournal()) journal.add(item) - true - } - - def add(value: Array[Byte]): Boolean = add(value, 0) - - /** - * Peek at the head item in the queue, if there is one. - */ - def peek(): Option[QItem] = { - synchronized { - if (closed || paused || queueLength == 0) { - None - } else { - _peek() - } - } - } - - /** - * Remove and return an item from the queue, if there is one. - * - * @param transaction true if this should be considered the first part - * of a transaction, to be committed or rolled back (put back at the - * head of the queue) - */ - def remove(transaction: Boolean): Option[QItem] = { - synchronized { - if (closed || paused || queueLength == 0) { - None - } else { - val item = _remove(transaction) - if (keepJournal()) { - if (transaction) journal.removeTentative() else journal.remove() - - if ((queueLength == 0) && (journal.size >= maxJournalSize())) { - log.debug("Rolling journal file for '{}'", name) - journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, Nil) - } - } - item - } - } - } - - /** - * Remove and return an item from the queue, if there is one. - */ - def remove(): Option[QItem] = remove(false) - - /** - * Return a transactionally-removed item to the queue. This is a rolled- - * back transaction. - */ - def unremove(xid: Int) { - synchronized { - if (!closed) { - if (keepJournal()) journal.unremove(xid) - _unremove(xid) - } - } - } - - def confirmRemove(xid: Int) { - synchronized { - if (!closed) { - if (keepJournal()) journal.confirmRemove(xid) - openTransactions.remove(xid) - } - } - } - - def flush() { - while (remove(false).isDefined) {} - } - - /** - * Close the queue's journal file. Not safe to call on an active queue. - */ - def close(): Unit = synchronized { - closed = true - if (keepJournal()) journal.close() - } - - def pauseReads(): Unit = synchronized { - paused = true - } - - def resumeReads(): Unit = synchronized { - paused = false - } - - def setup(): Unit = synchronized { - queueSize = 0 - replayJournal - } - - def destroyJournal(): Unit = synchronized { - if (keepJournal()) journal.erase() - } - - private final def nextXid(): Int = { - do { - xidCounter += 1 - } while (openTransactions contains xidCounter) - xidCounter - } - - private final def fillReadBehind() { - // if we're in read-behind mode, scan forward in the journal to keep memory as full as - // possible. this amortizes the disk overhead across all reads. - while (keepJournal() && journal.inReadBehind && _memoryBytes < maxMemorySize()) { - journal.fillReadBehind { item ⇒ - queue += item - _memoryBytes += item.data.length - } - if (!journal.inReadBehind) { - log.debug("Coming out of read-behind for queue '{}'", name) - } - } - } - - def replayJournal() { - if (!keepJournal()) return - - log.debug("Replaying transaction journal for '{}'", name) - xidCounter = 0 - - journal.replay(name) { - case JournalItem.Add(item) ⇒ - _add(item) - // when processing the journal, this has to happen after: - if (!journal.inReadBehind && queueSize >= maxMemorySize()) { - log.debug("Dropping to read-behind for queue '{}' ({} bytes)", name, queueSize) - journal.startReadBehind - } - case JournalItem.Remove ⇒ _remove(false) - case JournalItem.RemoveTentative ⇒ _remove(true) - case JournalItem.SavedXid(xid) ⇒ xidCounter = xid - case JournalItem.Unremove(xid) ⇒ _unremove(xid) - case JournalItem.ConfirmRemove(xid) ⇒ openTransactions.remove(xid) - case x ⇒ log.warning("Unexpected item in journal: {}", x) - } - - log.debug("Finished transaction journal for '{}' ({} items, {} bytes)", - name, queueLength, journal.size) - journal.open - - // now, any unfinished transactions must be backed out. - for (xid ← openTransactionIds) { - journal.unremove(xid) - _unremove(xid) - } - } - - def toList(): List[QItem] = { - discardExpired - queue.toList - } - - // ----- internal implementations - - private def _add(item: QItem) { - discardExpired - if (!journal.inReadBehind) { - queue += item - _memoryBytes += item.data.length - } - _totalItems += 1 - queueSize += item.data.length - queueLength += 1 - } - - private def _peek(): Option[QItem] = { - discardExpired - if (queue.isEmpty) None else Some(queue.front) - } - - private def _remove(transaction: Boolean): Option[QItem] = { - discardExpired() - if (queue.isEmpty) return None - - val now = System.currentTimeMillis - val item = queue.dequeue - val len = item.data.length - queueSize -= len - _memoryBytes -= len - queueLength -= 1 - val xid = if (transaction) nextXid else 0 - - fillReadBehind - _currentAge = now - item.addTime - if (transaction) { - item.xid = xid - openTransactions(xid) = item - } - Some(item) - } - - final def discardExpired(): Int = { - @tailrec def internalDisard(discarded: Int): Int = { - if (queue.isEmpty || journal.isReplaying) { - discarded - } else { - val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry) - if ((realExpiry != 0) && (realExpiry < System.currentTimeMillis)) { - _totalExpired += 1 - val item = queue.dequeue - val len = item.data.length - queueSize -= len - _memoryBytes -= len - queueLength -= 1 - fillReadBehind - if (keepJournal()) journal.remove() - expiredQueue().map { _.add(item.data, 0) } - internalDisard(discarded + 1) - } else { - discarded - } - } - } - internalDisard(0) - } - - private def _unremove(xid: Int) = { - openTransactions.remove(xid) map { item ⇒ - queueLength += 1 - queueSize += item.data.length - queue prepend item - _memoryBytes += item.data.length - } - } -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object PersistentQueue { - @volatile - var maxItems: Int = Int.MaxValue - @volatile - var maxSize: Long = Long.MaxValue - @volatile - var maxItemSize: Long = Long.MaxValue - @volatile - var maxAge: Int = 0 - @volatile - var maxJournalSize: Long = 16 * 1024 * 1024 - @volatile - var maxMemorySize: Long = 128 * 1024 * 1024 - @volatile - var maxJournalOverflow: Int = 10 - @volatile - var maxJournalSizeAbsolute: Long = Long.MaxValue - @volatile - var discardOldWhenFull: Boolean = false - @volatile - var keepJournal: Boolean = true - @volatile - var syncJournal: Boolean = false - @volatile - var expiredQueue: Option[PersistentQueue] = None -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala deleted file mode 100644 index f1e0a766ea..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue - -import java.nio.{ ByteBuffer, ByteOrder } - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -final case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) { - def pack(): Array[Byte] = { - val bytes = new Array[Byte](data.length + 16) - val buffer = ByteBuffer.wrap(bytes) - buffer.order(ByteOrder.LITTLE_ENDIAN) - buffer.putLong(addTime) - buffer.putLong(expiry) - buffer.put(data) - bytes - } -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object QItem { - def unpack(data: Array[Byte]): QItem = { - val buffer = ByteBuffer.wrap(data) - val bytes = new Array[Byte](data.length - 16) - buffer.order(ByteOrder.LITTLE_ENDIAN) - val addTime = buffer.getLong - val expiry = buffer.getLong - buffer.get(bytes) - QItem(addTime, expiry, bytes, 0) - } - - def unpackOldAdd(data: Array[Byte]): QItem = { - val buffer = ByteBuffer.wrap(data) - val bytes = new Array[Byte](data.length - 4) - buffer.order(ByteOrder.LITTLE_ENDIAN) - val expiry = buffer.getInt - buffer.get(bytes) - QItem(System.currentTimeMillis, if (expiry == 0) 0 else expiry * 1000, bytes, 0) - } -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala deleted file mode 100644 index 43112cd08a..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue - -import java.io.File -import java.util.concurrent.CountDownLatch -import scala.collection.mutable -import akka.event.LoggingAdapter -import akka.actor.mailbox.filebased.FileBasedMailboxSettings - -class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable") -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class QueueCollection(queueFolder: String, settings: FileBasedMailboxSettings, log: LoggingAdapter) { - private val path = new File(queueFolder) - - if (!path.isDirectory) { - path.mkdirs() - } - if (!path.isDirectory || !path.canWrite) { - throw new InaccessibleQueuePath - } - - private val queues = new mutable.HashMap[String, PersistentQueue] - private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]] - private var shuttingDown = false - - // total items added since the server started up. - val totalAdded = new Counter() - - // hits/misses on removing items from the queue - val queueHits = new Counter() - val queueMisses = new Counter() - - // preload any queues - def loadQueues() { - path.list() filter { name ⇒ !(name contains "~~") } map { queue(_) } - } - - def queueNames: List[String] = synchronized { - queues.keys.toList - } - - def currentItems = queues.values.foldLeft(0L) { _ + _.length } - def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes } - - /** - * Get a named queue, creating it if necessary. - * Exposed only to unit tests. - */ - private[akka] def queue(name: String): Option[PersistentQueue] = synchronized { - if (shuttingDown) { - None - } else { - Some(queues.get(name) getOrElse { - // only happens when creating a queue for the first time. - val q = if (name contains '+') { - val master = name.split('+')(0) - fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name - log.debug("Fanout queue {} added to {}", name, master) - new PersistentQueue(path.getPath, name, settings, log) - } else { - new PersistentQueue(path.getPath, name, settings, log) - } - q.setup - queues(name) = q - q - }) - } - } - - /** - * Add an item to a named queue. Will not return until the item has been - * synchronously added and written to the queue journal file. - * - * @return true if the item was added; false if the server is shutting - * down - */ - def add(key: String, item: Array[Byte], expiry: Int): Boolean = { - for (fanouts ← fanout_queues.get(key); name ← fanouts) { - add(name, item, expiry) - } - - queue(key) match { - case None ⇒ false - case Some(q) ⇒ - val now = System.currentTimeMillis - val normalizedExpiry: Long = if (expiry == 0) { - 0 - } else if (expiry < 1000000) { - now + expiry - } else { - expiry - } - val result = q.add(item, normalizedExpiry) - if (result) totalAdded.incr() - result - } - } - - def add(key: String, item: Array[Byte]): Boolean = add(key, item, 0) - - /** - * Retrieve an item from a queue and pass it to a continuation. If no item is available within - * the requested time, or the server is shutting down, None is passed. - */ - def remove(key: String, timeout: Int, transaction: Boolean, peek: Boolean)(f: Option[QItem] ⇒ Unit) { - queue(key) match { - case None ⇒ - queueMisses.incr - f(None) - case Some(q) ⇒ - if (peek) { - f(q.peek()) - } else { - q.remove - /* q.removeReact(if (timeout == 0) timeout else System.currentTimeMillis + timeout, transaction) { - case None => - queueMisses.incr - f(None) - case Some(item) => - queueHits.incr - f(Some(item)) - } -*/ } - } - } - - // for testing. - def receive(key: String): Option[Array[Byte]] = { - var rv: Option[Array[Byte]] = None - val latch = new CountDownLatch(1) - remove(key, 0, false, false) { - case None ⇒ - rv = None - latch.countDown - case Some(v) ⇒ - rv = Some(v.data) - latch.countDown - } - latch.await - rv - } - - def unremove(key: String, xid: Int) { - queue(key) map { q ⇒ q.unremove(xid) } - } - - def confirmRemove(key: String, xid: Int) { - queue(key) map { q ⇒ q.confirmRemove(xid) } - } - - def flush(key: String) { - queue(key) map { q ⇒ q.flush() } - } - - def delete(name: String): Unit = synchronized { - if (!shuttingDown) { - queues.get(name) map { q ⇒ - q.close() - q.destroyJournal() - queues.remove(name) - } - if (name contains '+') { - val master = name.split('+')(0) - fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) -= name - log.debug("Fanout queue {} dropped from {}", name, master) - } - } - } - - def flushExpired(name: String): Int = synchronized { - if (shuttingDown) { - 0 - } else { - queue(name) map { q ⇒ q.discardExpired() } getOrElse (0) - } - } - - def flushAllExpired(): Int = synchronized { - queueNames.foldLeft(0) { (sum, qName) ⇒ sum + flushExpired(qName) } - } - - def stats(key: String): Array[(String, String)] = queue(key) match { - case None ⇒ Array[(String, String)]() - case Some(q) ⇒ - q.dumpStats() ++ - fanout_queues.get(key).map { qset ⇒ ("children", qset.mkString(",")) }.toList - } - - def dumpConfig(key: String): Array[String] = { - queue(key) match { - case None ⇒ Array() - case Some(q) ⇒ q.dumpConfig() - } - } - - /** - * Shutdown this queue collection. All actors are asked to exit, and - * any future queue requests will fail. - */ - def shutdown: Unit = synchronized { - if (shuttingDown) { - return - } - shuttingDown = true - for ((name, q) ← queues) { - // synchronous, so the journals are all officially closed before we return. - q.close - } - queues.clear - } -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala deleted file mode 100644 index 07c506dec2..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue.tools - -import language.reflectiveCalls - -import java.io.{ FileNotFoundException, IOException } -import scala.collection.mutable -import akka.actor.mailbox.filebased.filequeue._ -import akka.event.LoggingAdapter -import akka.actor.ActorSystem - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -class QueueDumper(filename: String, log: LoggingAdapter) { - var offset = 0L - var operations = 0L - var currentXid = 0 - - val queue = new mutable.Queue[Int] { - def unget(item: Int) = prependElem(item) - } - val openTransactions = new mutable.HashMap[Int, Int] - - def apply() { - val journal = new Journal(filename, false, log) - var lastDisplay = 0L - - try { - for ((item, itemsize) ← journal.walk()) { - operations += 1 - dumpItem(item) - offset += itemsize - if (QDumper.quiet && offset - lastDisplay > 1024 * 1024) { - print("\rReading journal: %-6s".format(Util.bytesToHuman(offset, 0))) - Console.flush() - lastDisplay = offset - } - } - print("\r" + (" " * 30)) - - println() - val totalItems = queue.size + openTransactions.size - val totalBytes = queue.foldLeft(0L) { _ + _ } + openTransactions.values.foldLeft(0L) { _ + _ } - println("Journal size: %d bytes, with %d operations.".format(offset, operations)) - println("%d items totalling %d bytes.".format(totalItems, totalBytes)) - } catch { - case e: FileNotFoundException ⇒ - println("Can't open journal file: " + filename) - case e: IOException ⇒ - println("Exception reading journal file: " + filename) - e.printStackTrace() - } - } - - def dumpItem(item: JournalItem) { - val now = System.currentTimeMillis - if (!QDumper.quiet) print("%08x ".format(offset & 0xffffffffL)) - item match { - case JournalItem.Add(qitem) ⇒ - if (!QDumper.quiet) { - print("ADD %-6d".format(qitem.data.size)) - if (qitem.xid > 0) { - print(" xid=%d".format(qitem.xid)) - } - if (qitem.expiry > 0) { - if (qitem.expiry - now < 0) { - print(" expired") - } else { - print(" exp=%d".format(qitem.expiry - now)) - } - } - println() - } - queue += qitem.data.size - case JournalItem.Remove ⇒ - if (!QDumper.quiet) println("REM") - queue.dequeue - case JournalItem.RemoveTentative ⇒ - do { - currentXid += 1 - } while (openTransactions contains currentXid) - openTransactions(currentXid) = queue.dequeue - if (!QDumper.quiet) println("RSV %d".format(currentXid)) - case JournalItem.SavedXid(xid) ⇒ - if (!QDumper.quiet) println("XID %d".format(xid)) - currentXid = xid - case JournalItem.Unremove(xid) ⇒ - queue.unget(openTransactions.remove(xid).get) - if (!QDumper.quiet) println("CAN %d".format(xid)) - case JournalItem.ConfirmRemove(xid) ⇒ - if (!QDumper.quiet) println("ACK %d".format(xid)) - openTransactions.remove(xid) - case x ⇒ - if (!QDumper.quiet) println(x) - } - } -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object QDumper { - val filenames = new mutable.ListBuffer[String] - var quiet = false - - def usage() { - println() - println("usage: qdump.sh ") - println(" describe the contents of a kestrel journal file") - println() - println("options:") - println(" -q quiet: don't describe every line, just the summary") - println() - } - - def parseArgs(args: List[String]): Unit = args match { - case Nil ⇒ - case "--help" :: xs ⇒ - usage() - System.exit(0) - case "-q" :: xs ⇒ - quiet = true - parseArgs(xs) - case x :: xs ⇒ - filenames += x - parseArgs(xs) - } - - def main(args: Array[String]) { - parseArgs(args.toList) - if (filenames.size == 0) { - usage() - System.exit(0) - } - - val system = ActorSystem() - - for (filename ← filenames) { - println("Queue: " + filename) - new QueueDumper(filename, system.log)() - } - - system.shutdown() - } -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala deleted file mode 100644 index c8e46b49d3..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2009 Twitter, Inc. - * Copyright 2009 Robey Pointer - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.actor.mailbox.filebased.filequeue.tools - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object Util { - val KILOBYTE = 1024L - val MEGABYTE = 1024 * KILOBYTE - val GIGABYTE = 1024 * MEGABYTE - - def bytesToHuman(bytes: Long, minDivisor: Long) = { - if ((bytes == 0) && (minDivisor == 0)) { - "0" - } else { - val divisor = if ((bytes >= GIGABYTE * 95 / 100) || (minDivisor == GIGABYTE)) { - GIGABYTE - } else if ((bytes >= MEGABYTE * 95 / 100) || (minDivisor == MEGABYTE)) { - MEGABYTE - } else { - KILOBYTE - } - - // add 1/2 when computing the dot, to force it to round up. - var dot = ((bytes % divisor) * 20 + divisor) / (2 * divisor) - var base = (bytes - (bytes % divisor)) / divisor - if (dot >= 10) { - base += 1 - dot -= 10 - } - - base.toString + (if (base < 100) ("." + dot) else "") + (divisor match { - case KILOBYTE ⇒ "K" - case MEGABYTE ⇒ "M" - case GIGABYTE ⇒ "G" - case _ ⇒ "" - }) - } - } -} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala deleted file mode 100644 index 7e48ca8676..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala +++ /dev/null @@ -1,48 +0,0 @@ -package akka.actor.mailbox.filebased - -import language.postfixOps - -import akka.actor.mailbox._ -import scala.concurrent.duration._ -import org.apache.commons.io.FileUtils -import akka.dispatch.Mailbox - -object FileBasedMailboxSpec { - val config = """ - File-dispatcher { - mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType - throughput = 1 - file-based.directory-path = "file-based" - file-based.circuit-breaker.max-failures = 5 - file-based.circuit-breaker.call-timeout = 5 seconds - } - """ -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { - - val settings = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")) - - "FileBasedMailboxSettings" must { - "read the file-based section" in { - settings.QueuePath should be("file-based") - settings.CircuitBreakerMaxFailures should be(5) - settings.CircuitBreakerCallTimeout should be(5 seconds) - } - } - - private[akka] def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] - - def clean(): Unit = FileUtils.deleteDirectory(new java.io.File(settings.QueuePath)) - - override def atStartup() { - clean() - super.atStartup() - } - - override def afterTermination() { - clean() - super.afterTermination() - } -} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala deleted file mode 100644 index 8db4eb586d..0000000000 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.actor.mailbox - -import akka.dispatch.{ Envelope, MessageQueue } -import akka.remote.MessageSerializer -import akka.remote.WireFormats.{ ActorRefData, RemoteEnvelope } -import com.typesafe.config.Config -import akka.actor._ - -private[akka] object DurableExecutableMailboxConfig { - val Name = "[\\.\\/\\$\\s]".r -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActorSystem) extends MessageQueue { - import DurableExecutableMailboxConfig._ - - def ownerPath: ActorPath = owner.path - val ownerPathString: String = ownerPath.elements.mkString("/") - val name: String = "mailbox_" + Name.replaceAllIn(ownerPathString, "_") - -} - -/** - * Java API - * DurableMessageQueue with functionality to serialize and deserialize Envelopes (messages) - */ -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: ExtendedActorSystem) - extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization - -/** - * DurableMessageSerialization can be mixed into a DurableMessageQueue and adds functionality - * to serialize and deserialize Envelopes (messages) - */ -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -trait DurableMessageSerialization { this: DurableMessageQueue ⇒ - - /** - * Serializes the given Envelope into an Array of Bytes using an efficient serialization/deserialization strategy - */ - def serialize(durableMessage: Envelope): Array[Byte] = { - - // It's alright to use ref.path.toString here - // When the sender is a LocalActorRef it should be local when deserialized also. - // When the sender is a RemoteActorRef the path.toString already contains remote address information. - def serializeActorRef(ref: ActorRef): ActorRefData = ActorRefData.newBuilder.setPath(ref.path.toString).build - - val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef]) - val builder = RemoteEnvelope.newBuilder - .setMessage(message) - .setRecipient(serializeActorRef(owner)) - .setSender(serializeActorRef(durableMessage.sender)) - - builder.build.toByteArray - } - - /** - * Deserializes an array of Bytes that were serialized using the DurableMessageSerialization.serialize method, - * into an Envelope. - */ - def deserialize(bytes: Array[Byte]): Envelope = { - - def deserializeActorRef(refProtocol: ActorRefData): ActorRef = - system.provider.resolveActorRef(refProtocol.getPath) - - val durableMessage = RemoteEnvelope.parseFrom(bytes) - val message = MessageSerializer.deserialize(system, durableMessage.getMessage) - val sender = deserializeActorRef(durableMessage.getSender) - - Envelope(message, sender, system) - } - -} - -/** - * Conventional organization of durable mailbox settings: - * - * {{{ - * akka { - * actor { - * my-durable-dispatcher { - * mailbox-type = "my.durable.mailbox" - * my-durable-mailbox { - * setting1 = 1 - * setting2 = 2 - * } - * } - * } - * } - * }}} - * - * where name=“my-durable-mailbox” in this example. - */ -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -trait DurableMailboxSettings { - /** - * A reference to the enclosing actor system. - */ - def systemSettings: ActorSystem.Settings - - /** - * A reference to the config section which the user specified for this mailbox’s dispatcher. - */ - def userConfig: Config - - /** - * The extracted config section for this mailbox, which is the “name” - * section (if that exists), falling back to system defaults. Typical - * implementation looks like: - * - * {{{ - * val config = initialize - * }}} - */ - def config: Config - - /** - * Name of this mailbox type for purposes of configuration scoping. Reference - * defaults go into “akka.actor.mailbox.”. - */ - def name: String - - /** - * Obtain default extracted mailbox config section from userConfig and system. - */ - def initialize: Config = - if (userConfig.hasPath(name)) - userConfig.getConfig(name).withFallback(systemSettings.config.getConfig("akka.actor.mailbox." + name)) - else systemSettings.config.getConfig("akka.actor.mailbox." + name) -} - diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties deleted file mode 100644 index 9825970594..0000000000 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/log4j.properties +++ /dev/null @@ -1,58 +0,0 @@ -# Define some default values that can be overridden by system properties -zookeeper.root.logger=INFO, CONSOLE -zookeeper.console.threshold=INFO -zookeeper.log.dir=. -zookeeper.log.file=zookeeper.log -zookeeper.log.threshold=DEBUG -zookeeper.tracelog.dir=. -zookeeper.tracelog.file=zookeeper_trace.log - -# -# ZooKeeper Logging Configuration -# - -# Format is " (, )+ - -# DEFAULT: console appender only -log4j.rootLogger=${zookeeper.root.logger} - -# Example with rolling log file -#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE - -# Example with rolling log file and tracing -#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE - -# -# Log INFO level and above messages to the console -# -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold} -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n - -# -# Add ROLLINGFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender -log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold} -log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file} - -# Max log file size of 10MB -log4j.appender.ROLLINGFILE.MaxFileSize=10MB -# uncomment the next line to limit number of backup files -#log4j.appender.ROLLINGFILE.MaxBackupIndex=10 - -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n - - -# -# Add TRACEFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -log4j.appender.TRACEFILE=org.apache.log4j.FileAppender -log4j.appender.TRACEFILE.Threshold=TRACE -log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file} - -log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout -### Notice we are including log4j's NDC here (%x) -log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml b/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml deleted file mode 100644 index 240a412687..0000000000 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/resources/logback-test.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - - - - - - - - - - [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n - - - - - - - - - - - diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala deleted file mode 100644 index fe1d808b41..0000000000 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.actor.mailbox - -import language.postfixOps - -import java.io.InputStream -import java.util.concurrent.TimeoutException - -import scala.annotation.tailrec - -import org.scalatest.{ WordSpecLike, BeforeAndAfterAll } -import org.scalatest.Matchers - -import com.typesafe.config.{ ConfigFactory, Config } - -import DurableMailboxSpecActorFactory.{ MailboxTestActor, AccumulatorActor } -import akka.actor.{ RepointableRef, Props, ActorSystem, ActorRefWithCell, ActorRef, ActorCell, Actor } -import akka.dispatch.Mailbox -import akka.testkit.TestKit -import scala.concurrent.duration._ - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object DurableMailboxSpecActorFactory { - - class MailboxTestActor extends Actor { - def receive = { case x ⇒ sender ! x } - } - - class AccumulatorActor extends Actor { - var num = 0l - def receive = { - case x: Int ⇒ num += x - case "sum" ⇒ sender ! num - } - } - -} - -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -object DurableMailboxSpec { - def fallbackConfig: Config = ConfigFactory.parseString(""" - akka { - loggers = ["akka.testkit.TestEventListener"] - loglevel = "WARNING" - stdout-loglevel = "WARNING" - } - """) -} - -/** - * Reusable test fixture for durable mailboxes. Implements a few basic tests. More - * tests can be added in concrete subclass. - * - * Subclass must define dispatcher in the supplied config for the specific backend. - * The id of the dispatcher should be the same as the `-dispatcher`. - */ -@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") -abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String) - extends TestKit(system) with WordSpecLike with Matchers with BeforeAndAfterAll { - - import DurableMailboxSpecActorFactory._ - - /** - * Subclass must define dispatcher in the supplied config for the specific backend. - * The id of the dispatcher should be the same as the `-dispatcher`. - */ - def this(backendName: String, config: String) = { - this(ActorSystem(backendName + "BasedDurableMailboxSpec", - ConfigFactory.parseString(config).withFallback(DurableMailboxSpec.fallbackConfig)), - backendName) - } - - final override def beforeAll { - atStartup() - } - - /** - * May be implemented in concrete subclass to do additional things once before test - * cases are run. - */ - protected def atStartup() {} - - final override def afterAll { - TestKit.shutdownActorSystem(system) - try system.awaitTermination(5 seconds) catch { - case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) - } - afterTermination() - } - - /** - * May be implemented in concrete subclass to do additional things once after all - * test cases have been run. - */ - def afterTermination() {} - - protected def streamMustContain(in: InputStream, words: String): Unit = { - val output = new Array[Byte](8192) - - def now = System.currentTimeMillis - - def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // don’t want parse errors - - @tailrec def read(end: Int = 0, start: Long = now): Int = - in.read(output, end, output.length - end) match { - case -1 ⇒ end - case x ⇒ - val next = end + x - if (string(next).contains(words) || now - start > 10000 || next == output.length) next - else read(next, start) - } - - val result = string(read()) - if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result) - } - - def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = { - val ref = id match { - case null | "" ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher")) - case some ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some) - } - awaitCond(ref match { - case r: RepointableRef ⇒ r.isStarted - }, 1 second, 10 millis) - ref - } - - private def isDurableMailbox(m: Mailbox): Boolean = - m.messageQueue.isInstanceOf[DurableMessageQueue] - - "A " + backendName + " based mailbox backed actor" must { - - "get a new, unique, durable mailbox" in { - val a1, a2 = createMailboxTestActor() - val mb1 = a1.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox - val mb2 = a2.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox - isDurableMailbox(mb1) should be(true) - isDurableMailbox(mb2) should be(true) - (mb1 ne mb2) should be(true) - } - - "deliver messages at most once" in { - val queueActor = createMailboxTestActor() - implicit val sender = testActor - - val msgs = 1 to 100 map { x ⇒ "foo" + x } - - msgs foreach { m ⇒ queueActor ! m } - - msgs foreach { m ⇒ expectMsg(m) } - - expectNoMsg() - } - - "support having multiple actors at the same time" in { - val actors = Vector.fill(3)(createMailboxTestActor(Props[AccumulatorActor])) - - actors foreach { a ⇒ isDurableMailbox(a.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox) should be(true) } - - val msgs = 1 to 3 - - val expectedResult: Long = msgs.sum - - for (a ← actors; m ← msgs) a ! m - - for (a ← actors) { - implicit val sender = testActor - a ! "sum" - expectMsg(expectedResult) - } - - expectNoMsg() - } - } - -} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index f09084616f..5f15f3a177 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -50,7 +50,6 @@ object AkkaBuild extends Build { settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Publish.versionSettings ++ SphinxSupport.settings ++ Dist.settings ++ s3Settings ++ mimaSettings ++ unidocScaladocSettings ++ Protobuf.settings ++ inConfig(JavaDoc)(Defaults.configSettings) ++ Seq( - testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean, parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", unidocExclude := Seq(samples.id, remoteTests.id), @@ -76,7 +75,7 @@ object AkkaBuild extends Build { validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => } ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, - persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) + persistence, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) ) lazy val akkaScalaNightly = Project( @@ -85,7 +84,7 @@ object AkkaBuild extends Build { // remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings) // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, - persistence, mailboxes, kernel, osgi, contrib, multiNodeTestkit) + persistence, kernel, osgi, contrib, multiNodeTestkit) ) // this detached pseudo-project is used for running the tests against a different Scala version than the one used for compilation @@ -264,36 +263,6 @@ object AkkaBuild extends Build { ) ) - val testMailbox = SettingKey[Boolean]("test-mailbox") - - lazy val mailboxes = Project( - id = "akka-durable-mailboxes", - base = file("akka-durable-mailboxes"), - settings = parentSettings, - aggregate = Seq(mailboxesCommon, fileMailbox) - ) - - lazy val mailboxesCommon = Project( - id = "akka-mailboxes-common", - base = file("akka-durable-mailboxes/akka-mailboxes-common"), - dependencies = Seq(remote, testkit % "compile;test->test"), - settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ OSGi.mailboxesCommon ++ Seq( - libraryDependencies ++= Dependencies.mailboxes, - previousArtifact := akkaPreviousArtifact("akka-mailboxes-common"), - publishArtifact in Test := true - ) - ) - - lazy val fileMailbox = Project( - id = "akka-file-mailbox", - base = file("akka-durable-mailboxes/akka-file-mailbox"), - dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"), - settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ OSGi.fileMailbox ++ Seq( - libraryDependencies ++= Dependencies.fileMailbox, - previousArtifact := akkaPreviousArtifact("akka-file-mailbox") - ) - ) - lazy val zeroMQ = Project( id = "akka-zeromq", base = file("akka-zeromq"), @@ -1001,10 +970,6 @@ object AkkaBuild extends Build { val cluster = exports(Seq("akka.cluster.*"), imports = Seq(protobufImport())) - val fileMailbox = exports(Seq("akka.actor.mailbox.filebased.*")) - - val mailboxesCommon = exports(Seq("akka.actor.mailbox.*"), imports = Seq(protobufImport())) - val osgi = exports(Seq("akka.osgi.*")) val osgiDiningHakkersSampleApi = exports(Seq("akka.sample.osgi.api")) @@ -1141,10 +1106,6 @@ object Dependencies { val persistence = Seq(levelDB, levelDBNative, protobuf, Test.scalatest, Test.junit, Test.commonsIo) - val mailboxes = Seq(Test.scalatest, Test.junit) - - val fileMailbox = Seq(Test.commonsIo, Test.scalatest, Test.junit) - val kernel = Seq(Test.scalatest, Test.junit) val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)