Cleaned up stuff around file-based mailbox
This commit is contained in:
parent
9c4c856be3
commit
5fe9dcaf4e
5 changed files with 61 additions and 58 deletions
|
|
@ -13,50 +13,50 @@ akka {
|
|||
file-based {
|
||||
# directory below which this queue resides
|
||||
directory-path = "./_mb"
|
||||
|
||||
|
||||
# attempting to add an item after the queue reaches this size (in items) will fail.
|
||||
max-items = 2147483647
|
||||
|
||||
|
||||
# attempting to add an item after the queue reaches this size (in bytes) will fail.
|
||||
max-size = 2147483647 bytes
|
||||
|
||||
|
||||
# attempting to add an item larger than this size (in bytes) will fail.
|
||||
max-item-size = 2147483647 bytes
|
||||
|
||||
|
||||
# maximum expiration time for this queue (seconds).
|
||||
max-age = 0s
|
||||
|
||||
|
||||
# maximum journal size before the journal should be rotated.
|
||||
max-journal-size = 16 MiB
|
||||
|
||||
|
||||
# maximum size of a queue before it drops into read-behind mode.
|
||||
max-memory-size = 128 MiB
|
||||
|
||||
|
||||
# maximum overflow (multiplier) of a journal file before we re-create it.
|
||||
max-journal-overflow = 10
|
||||
|
||||
|
||||
# absolute maximum size of a journal file until we rebuild it, no matter what.
|
||||
max-journal-size-absolute = 9223372036854775807 bytes
|
||||
|
||||
|
||||
# whether to drop older items (instead of newer) when the queue is full
|
||||
discard-old-when-full = on
|
||||
|
||||
discard-old-when-full = on
|
||||
|
||||
# whether to keep a journal file at all
|
||||
keep-journal = on
|
||||
|
||||
keep-journal = on
|
||||
|
||||
# whether to sync the journal after each transaction
|
||||
sync-journal = off
|
||||
|
||||
# circuit breaker configuration
|
||||
circuit-breaker {
|
||||
# maximum number of failures before opening breaker
|
||||
max-failures = 3
|
||||
# maximum number of failures before opening breaker
|
||||
max-failures = 3
|
||||
|
||||
# duration of time beyond which a call is assumed to be timed out and considered a failure
|
||||
call-timeout = 3 seconds
|
||||
# duration of time beyond which a call is assumed to be timed out and considered a failure
|
||||
call-timeout = 3 seconds
|
||||
|
||||
# duration of time to wait until attempting to reset the breaker during which all calls fail-fast
|
||||
reset-timeout = 30 seconds
|
||||
# duration of time to wait until attempting to reset the breaker during which all calls fail-fast
|
||||
reset-timeout = 30 seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,18 +21,17 @@ class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config)
|
|||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||
}
|
||||
}
|
||||
|
||||
class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
||||
// TODO Is it reasonable for all FileBasedMailboxes to have their own logger?
|
||||
private val log = Logging(system, "FileBasedMessageQueue")
|
||||
|
||||
val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout)
|
||||
private val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout)
|
||||
|
||||
private val log = Logging(system, "FileBasedMessageQueue")
|
||||
|
||||
private val queue = try {
|
||||
(new java.io.File(settings.QueuePath)) match {
|
||||
case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir)
|
||||
case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir)
|
||||
case _ ⇒ //All good
|
||||
case _ ⇒ // All good
|
||||
}
|
||||
val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log)
|
||||
queue.setup // replays journal
|
||||
|
|
|
|||
|
|
@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use
|
|||
val config = initialize
|
||||
import config._
|
||||
|
||||
val QueuePath: String = getString("directory-path")
|
||||
val MaxItems: Int = getInt("max-items")
|
||||
val MaxSize: Long = getBytes("max-size")
|
||||
val MaxItemSize: Long = getBytes("max-item-size")
|
||||
val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS)
|
||||
val MaxJournalSize: Long = getBytes("max-journal-size")
|
||||
val MaxMemorySize: Long = getBytes("max-memory-size")
|
||||
val MaxJournalOverflow: Int = getInt("max-journal-overflow")
|
||||
val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
|
||||
val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
|
||||
val KeepJournal: Boolean = getBoolean("keep-journal")
|
||||
val SyncJournal: Boolean = getBoolean("sync-journal")
|
||||
final val QueuePath: String = getString("directory-path")
|
||||
final val MaxItems: Int = getInt("max-items")
|
||||
final val MaxSize: Long = getBytes("max-size")
|
||||
final val MaxItemSize: Long = getBytes("max-item-size")
|
||||
final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS)
|
||||
final val MaxJournalSize: Long = getBytes("max-journal-size")
|
||||
final val MaxMemorySize: Long = getBytes("max-memory-size")
|
||||
final val MaxJournalOverflow: Int = getInt("max-journal-overflow")
|
||||
final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
|
||||
final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
|
||||
final val KeepJournal: Boolean = getBoolean("keep-journal")
|
||||
final val SyncJournal: Boolean = getBoolean("sync-journal")
|
||||
|
||||
val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
|
||||
val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
|
||||
val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
|
||||
}
|
||||
final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
|
||||
final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
|
||||
final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
|
|||
def overlay[T](base: ⇒ T) = new OverlaySetting(base)
|
||||
|
||||
// attempting to add an item after the queue reaches this size (in items) will fail.
|
||||
val maxItems = overlay(PersistentQueue.maxItems)
|
||||
final val maxItems = overlay(PersistentQueue.maxItems)
|
||||
|
||||
// attempting to add an item after the queue reaches this size (in bytes) will fail.
|
||||
val maxSize = overlay(PersistentQueue.maxSize)
|
||||
final val maxSize = overlay(PersistentQueue.maxSize)
|
||||
|
||||
// attempting to add an item larger than this size (in bytes) will fail.
|
||||
val maxItemSize = overlay(PersistentQueue.maxItemSize)
|
||||
final val maxItemSize = overlay(PersistentQueue.maxItemSize)
|
||||
|
||||
// maximum expiration time for this queue (seconds).
|
||||
val maxAge = overlay(PersistentQueue.maxAge)
|
||||
final val maxAge = overlay(PersistentQueue.maxAge)
|
||||
|
||||
// maximum journal size before the journal should be rotated.
|
||||
val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
|
||||
final val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
|
||||
|
||||
// maximum size of a queue before it drops into read-behind mode.
|
||||
val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
|
||||
final val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
|
||||
|
||||
// maximum overflow (multiplier) of a journal file before we re-create it.
|
||||
val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
|
||||
final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
|
||||
|
||||
// absolute maximum size of a journal file until we rebuild it, no matter what.
|
||||
val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
|
||||
final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
|
||||
|
||||
// whether to drop older items (instead of newer) when the queue is full
|
||||
val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
|
||||
final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
|
||||
|
||||
// whether to keep a journal file at all
|
||||
val keepJournal = overlay(PersistentQueue.keepJournal)
|
||||
final val keepJournal = overlay(PersistentQueue.keepJournal)
|
||||
|
||||
// whether to sync the journal after each transaction
|
||||
val syncJournal = overlay(PersistentQueue.syncJournal)
|
||||
final val syncJournal = overlay(PersistentQueue.syncJournal)
|
||||
|
||||
// (optional) move expired items over to this queue
|
||||
val expiredQueue = overlay(PersistentQueue.expiredQueue)
|
||||
final val expiredQueue = overlay(PersistentQueue.expiredQueue)
|
||||
|
||||
private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log)
|
||||
|
||||
// track tentative removals
|
||||
// track tentative remofinal vals
|
||||
private var xidCounter: Int = 0
|
||||
private val openTransactions = new mutable.HashMap[Int, QItem]
|
||||
def openTransactionCount = openTransactions.size
|
||||
|
|
|
|||
|
|
@ -69,11 +69,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
|||
* Conventional organization of durable mailbox settings:
|
||||
*
|
||||
* {{{
|
||||
* my-durable-dispatcher {
|
||||
* mailbox-type = "my.durable.mailbox"
|
||||
* my-durable-mailbox {
|
||||
* setting1 = 1
|
||||
* setting2 = 2
|
||||
* akka {
|
||||
* actor {
|
||||
* my-durable-dispatcher {
|
||||
* mailbox-type = "my.durable.mailbox"
|
||||
* my-durable-mailbox {
|
||||
* setting1 = 1
|
||||
* setting2 = 2
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue