From 5fe9dcaf4e126487a8ff5a9f4e416a7b6edf4b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 24 Jun 2012 14:43:46 +0200 Subject: [PATCH] Cleaned up stuff around file-based mailbox --- .../src/main/resources/reference.conf | 38 +++++++++---------- .../akka/actor/mailbox/FileBasedMailbox.scala | 9 ++--- .../mailbox/FileBasedMailboxSettings.scala | 32 ++++++++-------- .../mailbox/filequeue/PersistentQueue.scala | 26 ++++++------- .../akka/actor/mailbox/DurableMailbox.scala | 14 ++++--- 5 files changed, 61 insertions(+), 58 deletions(-) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index f454716af0..1fb5cceeb1 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -13,50 +13,50 @@ akka { file-based { # directory below which this queue resides directory-path = "./_mb" - + # attempting to add an item after the queue reaches this size (in items) will fail. max-items = 2147483647 - + # attempting to add an item after the queue reaches this size (in bytes) will fail. max-size = 2147483647 bytes - + # attempting to add an item larger than this size (in bytes) will fail. max-item-size = 2147483647 bytes - + # maximum expiration time for this queue (seconds). max-age = 0s - + # maximum journal size before the journal should be rotated. max-journal-size = 16 MiB - + # maximum size of a queue before it drops into read-behind mode. max-memory-size = 128 MiB - + # maximum overflow (multiplier) of a journal file before we re-create it. max-journal-overflow = 10 - + # absolute maximum size of a journal file until we rebuild it, no matter what. max-journal-size-absolute = 9223372036854775807 bytes - + # whether to drop older items (instead of newer) when the queue is full - discard-old-when-full = on - + discard-old-when-full = on + # whether to keep a journal file at all - keep-journal = on - + keep-journal = on + # whether to sync the journal after each transaction sync-journal = off # circuit breaker configuration circuit-breaker { - # maximum number of failures before opening breaker - max-failures = 3 + # maximum number of failures before opening breaker + max-failures = 3 - # duration of time beyond which a call is assumed to be timed out and considered a failure - call-timeout = 3 seconds + # duration of time beyond which a call is assumed to be timed out and considered a failure + call-timeout = 3 seconds - # duration of time to wait until attempting to reset the breaker during which all calls fail-fast - reset-timeout = 30 seconds + # duration of time to wait until attempting to reset the breaker during which all calls fail-fast + reset-timeout = 30 seconds } } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index fccb6b5aea..1416e8f148 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -21,18 +21,17 @@ class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } - class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? - private val log = Logging(system, "FileBasedMessageQueue") - val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + private val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + + private val log = Logging(system, "FileBasedMessageQueue") private val queue = try { (new java.io.File(settings.QueuePath)) match { case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir) - case _ ⇒ //All good + case _ ⇒ // All good } val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log) queue.setup // replays journal diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index dff4021d96..27088dfc92 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use val config = initialize import config._ - val QueuePath: String = getString("directory-path") - val MaxItems: Int = getInt("max-items") - val MaxSize: Long = getBytes("max-size") - val MaxItemSize: Long = getBytes("max-item-size") - val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) - val MaxJournalSize: Long = getBytes("max-journal-size") - val MaxMemorySize: Long = getBytes("max-memory-size") - val MaxJournalOverflow: Int = getInt("max-journal-overflow") - val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") - val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") - val KeepJournal: Boolean = getBoolean("keep-journal") - val SyncJournal: Boolean = getBoolean("sync-journal") + final val QueuePath: String = getString("directory-path") + final val MaxItems: Int = getInt("max-items") + final val MaxSize: Long = getBytes("max-size") + final val MaxItemSize: Long = getBytes("max-item-size") + final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) + final val MaxJournalSize: Long = getBytes("max-journal-size") + final val MaxMemorySize: Long = getBytes("max-memory-size") + final val MaxJournalOverflow: Int = getInt("max-journal-overflow") + final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") + final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") + final val KeepJournal: Boolean = getBoolean("keep-journal") + final val SyncJournal: Boolean = getBoolean("sync-journal") - val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") - val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) - val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) -} \ No newline at end of file + final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") + final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) + final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala index 1a5ddf4a8c..152b29406c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F def overlay[T](base: ⇒ T) = new OverlaySetting(base) // attempting to add an item after the queue reaches this size (in items) will fail. - val maxItems = overlay(PersistentQueue.maxItems) + final val maxItems = overlay(PersistentQueue.maxItems) // attempting to add an item after the queue reaches this size (in bytes) will fail. - val maxSize = overlay(PersistentQueue.maxSize) + final val maxSize = overlay(PersistentQueue.maxSize) // attempting to add an item larger than this size (in bytes) will fail. - val maxItemSize = overlay(PersistentQueue.maxItemSize) + final val maxItemSize = overlay(PersistentQueue.maxItemSize) // maximum expiration time for this queue (seconds). - val maxAge = overlay(PersistentQueue.maxAge) + final val maxAge = overlay(PersistentQueue.maxAge) // maximum journal size before the journal should be rotated. - val maxJournalSize = overlay(PersistentQueue.maxJournalSize) + final val maxJournalSize = overlay(PersistentQueue.maxJournalSize) // maximum size of a queue before it drops into read-behind mode. - val maxMemorySize = overlay(PersistentQueue.maxMemorySize) + final val maxMemorySize = overlay(PersistentQueue.maxMemorySize) // maximum overflow (multiplier) of a journal file before we re-create it. - val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) + final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) // absolute maximum size of a journal file until we rebuild it, no matter what. - val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) + final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) // whether to drop older items (instead of newer) when the queue is full - val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) + final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) // whether to keep a journal file at all - val keepJournal = overlay(PersistentQueue.keepJournal) + final val keepJournal = overlay(PersistentQueue.keepJournal) // whether to sync the journal after each transaction - val syncJournal = overlay(PersistentQueue.syncJournal) + final val syncJournal = overlay(PersistentQueue.syncJournal) // (optional) move expired items over to this queue - val expiredQueue = overlay(PersistentQueue.expiredQueue) + final val expiredQueue = overlay(PersistentQueue.expiredQueue) private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log) - // track tentative removals + // track tentative remofinal vals private var xidCounter: Int = 0 private val openTransactions = new mutable.HashMap[Int, QItem] def openTransactionCount = openTransactions.size diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index b21878d00e..ff985b44cc 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -69,11 +69,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ * Conventional organization of durable mailbox settings: * * {{{ - * my-durable-dispatcher { - * mailbox-type = "my.durable.mailbox" - * my-durable-mailbox { - * setting1 = 1 - * setting2 = 2 + * akka { + * actor { + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } * } * } * }}}