!tra #3920 Remove deprecated durable mailboxes
This commit is contained in:
parent
9cc586b674
commit
4b977361eb
18 changed files with 3 additions and 2053 deletions
|
|
@ -26,4 +26,4 @@ The following, previously deprecated, features have been removed:
|
|||
|
||||
* akka-dataflow
|
||||
* akka-transactor
|
||||
|
||||
* durable mailboxes (akka-mailboxes-common, akka-file-mailbox)
|
||||
|
|
|
|||
|
|
@ -1,70 +0,0 @@
|
|||
#############################################
|
||||
# Akka File Mailboxes Reference Config File #
|
||||
#############################################
|
||||
|
||||
# This is the reference config file that contains all the default settings.
|
||||
# Make your edits/overrides in your application.conf.
|
||||
#
|
||||
# For more information see <https://github.com/robey/kestrel/>
|
||||
|
||||
akka {
|
||||
actor {
|
||||
mailbox {
|
||||
# deprecated, superseded by akka-persistence
|
||||
file-based {
|
||||
# directory below which this queue resides
|
||||
directory-path = "./_mb"
|
||||
|
||||
# attempting to add an item after the queue reaches this size (in items)
|
||||
# will fail.
|
||||
max-items = 2147483647
|
||||
|
||||
# attempting to add an item after the queue reaches this size (in bytes)
|
||||
# will fail.
|
||||
max-size = 2147483647 bytes
|
||||
|
||||
# attempting to add an item larger than this size (in bytes) will fail.
|
||||
max-item-size = 2147483647 bytes
|
||||
|
||||
# maximum expiration time for this queue (seconds).
|
||||
max-age = 0s
|
||||
|
||||
# maximum journal size before the journal should be rotated.
|
||||
max-journal-size = 16 MiB
|
||||
|
||||
# maximum size of a queue before it drops into read-behind mode.
|
||||
max-memory-size = 128 MiB
|
||||
|
||||
# maximum overflow (multiplier) of a journal file before we re-create it.
|
||||
max-journal-overflow = 10
|
||||
|
||||
# absolute maximum size of a journal file until we rebuild it,
|
||||
# no matter what.
|
||||
max-journal-size-absolute = 9223372036854775807 bytes
|
||||
|
||||
# whether to drop older items (instead of newer) when the queue is full
|
||||
discard-old-when-full = on
|
||||
|
||||
# whether to keep a journal file at all
|
||||
keep-journal = on
|
||||
|
||||
# whether to sync the journal after each transaction
|
||||
sync-journal = off
|
||||
|
||||
# circuit breaker configuration
|
||||
circuit-breaker {
|
||||
# maximum number of failures before opening breaker
|
||||
max-failures = 3
|
||||
|
||||
# duration of time beyond which a call is assumed to be timed out and
|
||||
# considered a failure
|
||||
call-timeout = 3 seconds
|
||||
|
||||
# duration of time to wait until attempting to reset the breaker during
|
||||
# which all calls fail-fast
|
||||
reset-timeout = 30 seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,86 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased
|
||||
|
||||
import akka.actor.mailbox._
|
||||
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem }
|
||||
import akka.event.Logging
|
||||
import com.typesafe.config.Config
|
||||
import akka.ConfigurationException
|
||||
import akka.dispatch._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker }
|
||||
import scala.concurrent.duration.Duration
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||
private val settings = new FileBasedMailboxSettings(systemSettings, config)
|
||||
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = (owner zip system).headOption match {
|
||||
case Some((o, s: ExtendedActorSystem)) ⇒ new FileBasedMessageQueue(o, s, settings)
|
||||
case _ ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val settings: FileBasedMailboxSettings)
|
||||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {
|
||||
// TODO Is it reasonable for all FileBasedMailboxes to have their own logger?
|
||||
private val log = Logging(system, "FileBasedMessageQueue")
|
||||
|
||||
val breaker = CircuitBreaker(system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout)
|
||||
|
||||
private val queue = try {
|
||||
(new java.io.File(settings.QueuePath)) match {
|
||||
case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir)
|
||||
case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir)
|
||||
case _ ⇒ // All good
|
||||
}
|
||||
val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log)
|
||||
queue.setup // replays journal
|
||||
queue.discardExpired
|
||||
queue
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
log.error(e, "Could not create a file-based mailbox")
|
||||
throw e
|
||||
}
|
||||
|
||||
def enqueue(receiver: ActorRef, envelope: Envelope) {
|
||||
breaker.withSyncCircuitBreaker(queue.add(serialize(envelope)))
|
||||
}
|
||||
|
||||
def dequeue(): Envelope = {
|
||||
breaker.withSyncCircuitBreaker(
|
||||
try {
|
||||
queue.remove.map(item ⇒ { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull
|
||||
} catch {
|
||||
case _: java.util.NoSuchElementException ⇒ null
|
||||
case e: CircuitBreakerOpenException ⇒
|
||||
log.debug(e.getMessage())
|
||||
throw e
|
||||
case NonFatal(e) ⇒
|
||||
log.error(e, "Couldn't dequeue from file-based mailbox, due to [{}]", e.getMessage())
|
||||
throw e
|
||||
})
|
||||
}
|
||||
|
||||
def numberOfMessages: Int = {
|
||||
breaker.withSyncCircuitBreaker(queue.length.toInt)
|
||||
}
|
||||
|
||||
def hasMessages: Boolean = numberOfMessages > 0
|
||||
|
||||
/**
|
||||
* Completely delete the queue.
|
||||
*/
|
||||
def remove: Boolean = try {
|
||||
queue.remove
|
||||
true
|
||||
} catch {
|
||||
case NonFatal(_) ⇒ false
|
||||
}
|
||||
|
||||
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
|
||||
}
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.mailbox.filebased
|
||||
|
||||
import akka.actor.mailbox._
|
||||
import com.typesafe.config.Config
|
||||
import scala.concurrent.duration._
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||
extends DurableMailboxSettings {
|
||||
|
||||
def name: String = "file-based"
|
||||
|
||||
val config = initialize
|
||||
import config._
|
||||
|
||||
final val QueuePath: String = getString("directory-path")
|
||||
final val MaxItems: Int = getInt("max-items")
|
||||
final val MaxSize: Long = getBytes("max-size")
|
||||
final val MaxItemSize: Long = getBytes("max-item-size")
|
||||
final val MaxAge: FiniteDuration = Duration(getMilliseconds("max-age"), MILLISECONDS)
|
||||
final val MaxJournalSize: Long = getBytes("max-journal-size")
|
||||
final val MaxMemorySize: Long = getBytes("max-memory-size")
|
||||
final val MaxJournalOverflow: Int = getInt("max-journal-overflow")
|
||||
final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
|
||||
final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
|
||||
final val KeepJournal: Boolean = getBoolean("keep-journal")
|
||||
final val SyncJournal: Boolean = getBoolean("sync-journal")
|
||||
|
||||
final val CircuitBreakerMaxFailures: Int = getInt("circuit-breaker.max-failures")
|
||||
final val CircuitBreakerCallTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
|
||||
final val CircuitBreakerResetTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
|
||||
}
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue
|
||||
|
||||
import java.io.IOException
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
final case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class Counter {
|
||||
private val value = new AtomicLong(0)
|
||||
|
||||
def apply() = value.get
|
||||
def set(n: Long) = value.set(n)
|
||||
def incr() = value.addAndGet(1)
|
||||
def incr(n: Long) = value.addAndGet(n)
|
||||
def decr() = value.addAndGet(-1)
|
||||
def decr(n: Long) = value.addAndGet(-n)
|
||||
override def toString = value.get.toString
|
||||
}
|
||||
|
|
@ -1,345 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue
|
||||
|
||||
import java.io._
|
||||
import java.nio.{ ByteBuffer, ByteOrder }
|
||||
import java.nio.channels.FileChannel
|
||||
import akka.event.LoggingAdapter
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
// returned from journal replay
|
||||
sealed trait JournalItem
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object JournalItem {
|
||||
final case class Add(item: QItem) extends JournalItem
|
||||
case object Remove extends JournalItem
|
||||
case object RemoveTentative extends JournalItem
|
||||
final case class SavedXid(xid: Int) extends JournalItem
|
||||
final case class Unremove(xid: Int) extends JournalItem
|
||||
final case class ConfirmRemove(xid: Int) extends JournalItem
|
||||
case object EndOfFile extends JournalItem
|
||||
}
|
||||
|
||||
/**
|
||||
* Codes for working with the journal file for a PersistentQueue.
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter) {
|
||||
|
||||
private val queueFile = new File(queuePath)
|
||||
|
||||
private var writer: FileChannel = null
|
||||
private var reader: Option[FileChannel] = None
|
||||
private var replayer: Option[FileChannel] = None
|
||||
|
||||
var size: Long = 0
|
||||
|
||||
// small temporary buffer for formatting operations into the journal:
|
||||
private val buffer = new Array[Byte](16)
|
||||
private val byteBuffer = ByteBuffer.wrap(buffer)
|
||||
byteBuffer.order(ByteOrder.LITTLE_ENDIAN)
|
||||
|
||||
private val CMD_ADD = 0
|
||||
private val CMD_REMOVE = 1
|
||||
private val CMD_ADDX = 2
|
||||
private val CMD_REMOVE_TENTATIVE = 3
|
||||
private val CMD_SAVE_XID = 4
|
||||
private val CMD_UNREMOVE = 5
|
||||
private val CMD_CONFIRM_REMOVE = 6
|
||||
private val CMD_ADD_XID = 7
|
||||
|
||||
private def open(file: File) {
|
||||
writer = new FileOutputStream(file, true).getChannel
|
||||
}
|
||||
|
||||
def open() {
|
||||
open(queueFile)
|
||||
}
|
||||
|
||||
def roll(xid: Int, openItems: List[QItem], queue: Iterable[QItem]) {
|
||||
writer.close
|
||||
val tmpFile = new File(queuePath + "~~" + System.currentTimeMillis)
|
||||
open(tmpFile)
|
||||
size = 0
|
||||
for (item ← openItems) {
|
||||
addWithXid(item)
|
||||
removeTentative(false)
|
||||
}
|
||||
saveXid(xid)
|
||||
for (item ← queue) {
|
||||
add(false, item)
|
||||
}
|
||||
if (syncJournal) writer.force(false)
|
||||
writer.close
|
||||
tmpFile.renameTo(queueFile)
|
||||
open
|
||||
}
|
||||
|
||||
def close() {
|
||||
writer.close
|
||||
for (r ← reader) r.close
|
||||
reader = None
|
||||
}
|
||||
|
||||
def erase() {
|
||||
try {
|
||||
close()
|
||||
queueFile.delete
|
||||
} catch {
|
||||
case NonFatal(_) ⇒
|
||||
}
|
||||
}
|
||||
|
||||
def inReadBehind(): Boolean = reader.isDefined
|
||||
|
||||
def isReplaying(): Boolean = replayer.isDefined
|
||||
|
||||
private def add(allowSync: Boolean, item: QItem) {
|
||||
val blob = ByteBuffer.wrap(item.pack())
|
||||
size += write(false, CMD_ADDX.toByte, blob.limit)
|
||||
do {
|
||||
writer.write(blob)
|
||||
} while (blob.position < blob.limit)
|
||||
if (allowSync && syncJournal) writer.force(false)
|
||||
size += blob.limit
|
||||
}
|
||||
|
||||
def add(item: QItem): Unit = add(true, item)
|
||||
|
||||
// used only to list pending transactions when recreating the journal.
|
||||
private def addWithXid(item: QItem) = {
|
||||
val blob = ByteBuffer.wrap(item.pack())
|
||||
|
||||
// only called from roll(), so the journal does not need to be synced after a write.
|
||||
size += write(false, CMD_ADD_XID.toByte, item.xid, blob.limit)
|
||||
do {
|
||||
writer.write(blob)
|
||||
} while (blob.position < blob.limit)
|
||||
size += blob.limit
|
||||
}
|
||||
|
||||
def remove() = {
|
||||
size += write(true, CMD_REMOVE.toByte)
|
||||
}
|
||||
|
||||
private def removeTentative(allowSync: Boolean) {
|
||||
size += write(allowSync, CMD_REMOVE_TENTATIVE.toByte)
|
||||
}
|
||||
|
||||
def removeTentative(): Unit = removeTentative(true)
|
||||
|
||||
private def saveXid(xid: Int) = {
|
||||
// only called from roll(), so the journal does not need to be synced after a write.
|
||||
size += write(false, CMD_SAVE_XID.toByte, xid)
|
||||
}
|
||||
|
||||
def unremove(xid: Int) = {
|
||||
size += write(true, CMD_UNREMOVE.toByte, xid)
|
||||
}
|
||||
|
||||
def confirmRemove(xid: Int) = {
|
||||
size += write(true, CMD_CONFIRM_REMOVE.toByte, xid)
|
||||
}
|
||||
|
||||
def startReadBehind() {
|
||||
val pos = if (replayer.isDefined) replayer.get.position else writer.position
|
||||
val rj = new FileInputStream(queueFile).getChannel
|
||||
rj.position(pos)
|
||||
reader = Some(rj)
|
||||
}
|
||||
|
||||
def fillReadBehind(f: QItem ⇒ Unit) {
|
||||
val pos = if (replayer.isDefined) replayer.get.position else writer.position
|
||||
for (rj ← reader) {
|
||||
if (rj.position == pos) {
|
||||
// we've caught up.
|
||||
rj.close
|
||||
reader = None
|
||||
} else {
|
||||
readJournalEntry(rj) match {
|
||||
case (JournalItem.Add(item), _) ⇒ f(item)
|
||||
case (_, _) ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def replay(name: String)(f: JournalItem ⇒ Unit) {
|
||||
size = 0
|
||||
var lastUpdate = 0L
|
||||
val TEN_MB = 10L * 1024 * 1024
|
||||
try {
|
||||
val in = new FileInputStream(queueFile).getChannel
|
||||
try {
|
||||
replayer = Some(in)
|
||||
var done = false
|
||||
do {
|
||||
readJournalEntry(in) match {
|
||||
case (JournalItem.EndOfFile, _) ⇒ done = true
|
||||
case (x, itemsize) ⇒
|
||||
size += itemsize
|
||||
f(x)
|
||||
if (size / TEN_MB > lastUpdate) {
|
||||
lastUpdate = size / TEN_MB
|
||||
log.info("Continuing to read '{}' journal; {} MB so far...", name, lastUpdate * 10)
|
||||
}
|
||||
}
|
||||
} while (!done)
|
||||
} catch {
|
||||
case e: BrokenItemException ⇒
|
||||
log.error(e, "Exception replaying journal for '{}'", name)
|
||||
truncateJournal(e.lastValidPosition)
|
||||
}
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒
|
||||
log.info("No transaction journal for '{}'; starting with empty queue.", name)
|
||||
case e: IOException ⇒
|
||||
log.error(e, "Exception replaying journal for '{}'", name)
|
||||
// this can happen if the server hardware died abruptly in the middle
|
||||
// of writing a journal. not awesome but we should recover.
|
||||
}
|
||||
replayer = None
|
||||
}
|
||||
|
||||
private def truncateJournal(position: Long) {
|
||||
val trancateWriter = new FileOutputStream(queueFile, true).getChannel
|
||||
try {
|
||||
trancateWriter.truncate(position)
|
||||
} finally {
|
||||
trancateWriter.close()
|
||||
}
|
||||
}
|
||||
|
||||
def readJournalEntry(in: FileChannel): (JournalItem, Int) = {
|
||||
byteBuffer.rewind
|
||||
byteBuffer.limit(1)
|
||||
val lastPosition = in.position
|
||||
var x: Int = 0
|
||||
do {
|
||||
x = in.read(byteBuffer)
|
||||
} while (byteBuffer.position < byteBuffer.limit && x >= 0)
|
||||
|
||||
if (x < 0) {
|
||||
(JournalItem.EndOfFile, 0)
|
||||
} else {
|
||||
try {
|
||||
buffer(0) match {
|
||||
case CMD_ADD ⇒
|
||||
val data = readBlock(in)
|
||||
(JournalItem.Add(QItem.unpackOldAdd(data)), 5 + data.length)
|
||||
case CMD_REMOVE ⇒
|
||||
(JournalItem.Remove, 1)
|
||||
case CMD_ADDX ⇒
|
||||
val data = readBlock(in)
|
||||
(JournalItem.Add(QItem.unpack(data)), 5 + data.length)
|
||||
case CMD_REMOVE_TENTATIVE ⇒
|
||||
(JournalItem.RemoveTentative, 1)
|
||||
case CMD_SAVE_XID ⇒
|
||||
val xid = readInt(in)
|
||||
(JournalItem.SavedXid(xid), 5)
|
||||
case CMD_UNREMOVE ⇒
|
||||
val xid = readInt(in)
|
||||
(JournalItem.Unremove(xid), 5)
|
||||
case CMD_CONFIRM_REMOVE ⇒
|
||||
val xid = readInt(in)
|
||||
(JournalItem.ConfirmRemove(xid), 5)
|
||||
case CMD_ADD_XID ⇒
|
||||
val xid = readInt(in)
|
||||
val data = readBlock(in)
|
||||
val item = QItem.unpack(data)
|
||||
item.xid = xid
|
||||
(JournalItem.Add(item), 9 + data.length)
|
||||
case n ⇒
|
||||
throw new BrokenItemException(lastPosition, new IOException("invalid opcode in journal: " + n.toInt + " at position " + in.position))
|
||||
}
|
||||
} catch {
|
||||
case ex: IOException ⇒
|
||||
throw new BrokenItemException(lastPosition, ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def walk() = new Iterator[(JournalItem, Int)] {
|
||||
val in = new FileInputStream(queuePath).getChannel
|
||||
var done = false
|
||||
var nextItem: Option[(JournalItem, Int)] = None
|
||||
|
||||
def hasNext = {
|
||||
if (done) {
|
||||
false
|
||||
} else {
|
||||
nextItem = readJournalEntry(in) match {
|
||||
case (JournalItem.EndOfFile, _) ⇒
|
||||
done = true
|
||||
in.close()
|
||||
None
|
||||
case x ⇒
|
||||
Some(x)
|
||||
}
|
||||
nextItem.isDefined
|
||||
}
|
||||
}
|
||||
|
||||
def next() = nextItem.get
|
||||
}
|
||||
|
||||
private def readBlock(in: FileChannel): Array[Byte] = {
|
||||
val size = readInt(in)
|
||||
val data = new Array[Byte](size)
|
||||
val dataBuffer = ByteBuffer.wrap(data)
|
||||
var x: Int = 0
|
||||
do {
|
||||
x = in.read(dataBuffer)
|
||||
} while (dataBuffer.position < dataBuffer.limit && x >= 0)
|
||||
if (x < 0) {
|
||||
// we never expect EOF when reading a block.
|
||||
throw new IOException("Unexpected EOF")
|
||||
}
|
||||
data
|
||||
}
|
||||
|
||||
private def readInt(in: FileChannel): Int = {
|
||||
byteBuffer.rewind
|
||||
byteBuffer.limit(4)
|
||||
var x: Int = 0
|
||||
do {
|
||||
x = in.read(byteBuffer)
|
||||
} while (byteBuffer.position < byteBuffer.limit && x >= 0)
|
||||
if (x < 0) {
|
||||
// we never expect EOF when reading an int.
|
||||
throw new IOException("Unexpected EOF")
|
||||
}
|
||||
byteBuffer.rewind
|
||||
byteBuffer.getInt()
|
||||
}
|
||||
|
||||
private def write(allowSync: Boolean, items: Any*): Int = {
|
||||
byteBuffer.clear
|
||||
for (item ← items) item match {
|
||||
case b: Byte ⇒ byteBuffer.put(b)
|
||||
case i: Int ⇒ byteBuffer.putInt(i)
|
||||
}
|
||||
byteBuffer.flip
|
||||
while (byteBuffer.position < byteBuffer.limit) {
|
||||
writer.write(byteBuffer)
|
||||
}
|
||||
if (allowSync && syncJournal) writer.force(false)
|
||||
byteBuffer.limit
|
||||
}
|
||||
}
|
||||
|
|
@ -1,479 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue
|
||||
|
||||
import java.io._
|
||||
import scala.collection.mutable
|
||||
import akka.event.LoggingAdapter
|
||||
import java.util.concurrent.TimeUnit
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.mailbox.filebased.FileBasedMailboxSettings
|
||||
|
||||
// a config value that's backed by a global setting but may be locally overridden
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class OverlaySetting[T](base: ⇒ T) {
|
||||
@volatile
|
||||
private var local: Option[T] = None
|
||||
|
||||
def set(value: Option[T]) = local = value
|
||||
|
||||
def apply() = local.getOrElse(base)
|
||||
}
|
||||
|
||||
trait Prependable[T] {
|
||||
def prepend(t: T): Unit
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) {
|
||||
|
||||
private case object ItemArrived
|
||||
|
||||
// current size of all data in the queue:
|
||||
private var queueSize: Long = 0
|
||||
|
||||
// # of items EVER added to the queue:
|
||||
private var _totalItems: Long = 0
|
||||
|
||||
// # of items that were expired by the time they were read:
|
||||
private var _totalExpired: Long = 0
|
||||
|
||||
// age (in milliseconds) of the last item read from the queue:
|
||||
private var _currentAge: Long = 0
|
||||
|
||||
// # of items thot were discarded because the queue was full:
|
||||
private var _totalDiscarded: Long = 0
|
||||
|
||||
// # of items in the queue (including those not in memory)
|
||||
private var queueLength: Long = 0
|
||||
|
||||
private var queue: mutable.Queue[QItem] with Prependable[QItem] = new mutable.Queue[QItem] with Prependable[QItem] {
|
||||
// scala's Queue doesn't (yet?) have a way to put back.
|
||||
def prepend(item: QItem) = prependElem(item)
|
||||
}
|
||||
private var _memoryBytes: Long = 0
|
||||
|
||||
private var closed = false
|
||||
private var paused = false
|
||||
|
||||
def overlay[T](base: ⇒ T) = new OverlaySetting(base)
|
||||
|
||||
// attempting to add an item after the queue reaches this size (in items) will fail.
|
||||
final val maxItems = overlay(PersistentQueue.maxItems)
|
||||
|
||||
// attempting to add an item after the queue reaches this size (in bytes) will fail.
|
||||
final val maxSize = overlay(PersistentQueue.maxSize)
|
||||
|
||||
// attempting to add an item larger than this size (in bytes) will fail.
|
||||
final val maxItemSize = overlay(PersistentQueue.maxItemSize)
|
||||
|
||||
// maximum expiration time for this queue (seconds).
|
||||
final val maxAge = overlay(PersistentQueue.maxAge)
|
||||
|
||||
// maximum journal size before the journal should be rotated.
|
||||
final val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
|
||||
|
||||
// maximum size of a queue before it drops into read-behind mode.
|
||||
final val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
|
||||
|
||||
// maximum overflow (multiplier) of a journal file before we re-create it.
|
||||
final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
|
||||
|
||||
// absolute maximum size of a journal file until we rebuild it, no matter what.
|
||||
final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
|
||||
|
||||
// whether to drop older items (instead of newer) when the queue is full
|
||||
final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
|
||||
|
||||
// whether to keep a journal file at all
|
||||
final val keepJournal = overlay(PersistentQueue.keepJournal)
|
||||
|
||||
// whether to sync the journal after each transaction
|
||||
final val syncJournal = overlay(PersistentQueue.syncJournal)
|
||||
|
||||
// (optional) move expired items over to this queue
|
||||
final val expiredQueue = overlay(PersistentQueue.expiredQueue)
|
||||
|
||||
private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log)
|
||||
|
||||
// track tentative remofinal vals
|
||||
private var xidCounter: Int = 0
|
||||
private val openTransactions = new mutable.HashMap[Int, QItem]
|
||||
def openTransactionCount = openTransactions.size
|
||||
def openTransactionIds = openTransactions.keys.toList.sortWith(_ - _ > 0)
|
||||
|
||||
def length: Long = synchronized { queueLength }
|
||||
def totalItems: Long = synchronized { _totalItems }
|
||||
def bytes: Long = synchronized { queueSize }
|
||||
def journalSize: Long = synchronized { journal.size }
|
||||
def totalExpired: Long = synchronized { _totalExpired }
|
||||
def currentAge: Long = synchronized { if (queueSize == 0) 0 else _currentAge }
|
||||
def totalDiscarded: Long = synchronized { _totalDiscarded }
|
||||
def isClosed: Boolean = synchronized { closed || paused }
|
||||
|
||||
// mostly for unit tests.
|
||||
def memoryLength: Long = synchronized { queue.size }
|
||||
def memoryBytes: Long = synchronized { _memoryBytes }
|
||||
def inReadBehind = synchronized { journal.inReadBehind }
|
||||
|
||||
configure(settings)
|
||||
|
||||
def configure(settings: FileBasedMailboxSettings) = synchronized {
|
||||
maxItems set Some(settings.MaxItems)
|
||||
maxSize set Some(settings.MaxSize)
|
||||
maxItemSize set Some(settings.MaxItemSize)
|
||||
maxAge set Some(settings.MaxAge.toSeconds.toInt)
|
||||
maxJournalSize set Some(settings.MaxJournalSize)
|
||||
maxMemorySize set Some(settings.MaxMemorySize)
|
||||
maxJournalOverflow set Some(settings.MaxJournalOverflow)
|
||||
maxJournalSizeAbsolute set Some(settings.MaxJournalSizeAbsolute)
|
||||
discardOldWhenFull set Some(settings.DiscardOldWhenFull)
|
||||
keepJournal set Some(settings.KeepJournal)
|
||||
syncJournal set Some(settings.SyncJournal)
|
||||
log.info("Configuring queue %s: journal=%s, max-items=%s, max-size=%s, max-age=%s, max-journal-size=%s, max-memory-size=%s, max-journal-overflow=%s, max-journal-size-absolute=%s, discard-old-when-full=%s, sync-journal=%s"
|
||||
.format(
|
||||
name, keepJournal(), maxItems(), maxSize(), maxAge(), maxJournalSize(), maxMemorySize(),
|
||||
maxJournalOverflow(), maxJournalSizeAbsolute(), discardOldWhenFull(), syncJournal()))
|
||||
if (!keepJournal()) journal.erase()
|
||||
}
|
||||
|
||||
def dumpConfig(): Array[String] = synchronized {
|
||||
Array(
|
||||
"max-items=" + maxItems(),
|
||||
"max-size=" + maxSize(),
|
||||
"max-age=" + maxAge(),
|
||||
"max-journal-size=" + maxJournalSize(),
|
||||
"max-memory-size=" + maxMemorySize(),
|
||||
"max-journal-overflow=" + maxJournalOverflow(),
|
||||
"max-journal-size-absolute=" + maxJournalSizeAbsolute(),
|
||||
"discard-old-when-full=" + discardOldWhenFull(),
|
||||
"journal=" + keepJournal(),
|
||||
"sync-journal=" + syncJournal(),
|
||||
"move-expired-to" + expiredQueue().map { _.name }.getOrElse("(none)"))
|
||||
}
|
||||
|
||||
def dumpStats(): Array[(String, String)] = synchronized {
|
||||
Array(
|
||||
("items", length.toString),
|
||||
("bytes", bytes.toString),
|
||||
("total-items", totalItems.toString),
|
||||
("logsize", journalSize.toString),
|
||||
("expired-items", totalExpired.toString),
|
||||
("mem-items", memoryLength.toString),
|
||||
("mem-bytes", memoryBytes.toString),
|
||||
("age", currentAge.toString),
|
||||
("discarded", totalDiscarded.toString),
|
||||
("open-transactions", openTransactionCount.toString))
|
||||
}
|
||||
|
||||
private final def adjustExpiry(startingTime: Long, expiry: Long): Long = {
|
||||
if (maxAge() > 0) {
|
||||
val maxExpiry = startingTime + maxAge()
|
||||
if (expiry > 0) (expiry min maxExpiry) else maxExpiry
|
||||
} else {
|
||||
expiry
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a value to the end of the queue, transactionally.
|
||||
*/
|
||||
def add(value: Array[Byte], expiry: Long): Boolean = synchronized {
|
||||
if (closed || value.size > maxItemSize()) return false
|
||||
while (queueLength >= maxItems() || queueSize >= maxSize()) {
|
||||
if (!discardOldWhenFull()) return false
|
||||
_remove(false)
|
||||
_totalDiscarded += 1
|
||||
if (keepJournal()) journal.remove()
|
||||
}
|
||||
|
||||
val now = System.currentTimeMillis
|
||||
val item = QItem(now, adjustExpiry(now, expiry), value, 0)
|
||||
if (keepJournal() && !journal.inReadBehind) {
|
||||
if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) {
|
||||
// force re-creation of the journal.
|
||||
log.debug("Rolling journal file for '{}' (qsize={})", name, queueSize)
|
||||
journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue)
|
||||
}
|
||||
if (queueSize >= maxMemorySize()) {
|
||||
log.debug("Dropping to read-behind for queue '{}' ({} bytes)", name, queueSize)
|
||||
journal.startReadBehind
|
||||
}
|
||||
}
|
||||
_add(item)
|
||||
if (keepJournal()) journal.add(item)
|
||||
true
|
||||
}
|
||||
|
||||
def add(value: Array[Byte]): Boolean = add(value, 0)
|
||||
|
||||
/**
|
||||
* Peek at the head item in the queue, if there is one.
|
||||
*/
|
||||
def peek(): Option[QItem] = {
|
||||
synchronized {
|
||||
if (closed || paused || queueLength == 0) {
|
||||
None
|
||||
} else {
|
||||
_peek()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove and return an item from the queue, if there is one.
|
||||
*
|
||||
* @param transaction true if this should be considered the first part
|
||||
* of a transaction, to be committed or rolled back (put back at the
|
||||
* head of the queue)
|
||||
*/
|
||||
def remove(transaction: Boolean): Option[QItem] = {
|
||||
synchronized {
|
||||
if (closed || paused || queueLength == 0) {
|
||||
None
|
||||
} else {
|
||||
val item = _remove(transaction)
|
||||
if (keepJournal()) {
|
||||
if (transaction) journal.removeTentative() else journal.remove()
|
||||
|
||||
if ((queueLength == 0) && (journal.size >= maxJournalSize())) {
|
||||
log.debug("Rolling journal file for '{}'", name)
|
||||
journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, Nil)
|
||||
}
|
||||
}
|
||||
item
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove and return an item from the queue, if there is one.
|
||||
*/
|
||||
def remove(): Option[QItem] = remove(false)
|
||||
|
||||
/**
|
||||
* Return a transactionally-removed item to the queue. This is a rolled-
|
||||
* back transaction.
|
||||
*/
|
||||
def unremove(xid: Int) {
|
||||
synchronized {
|
||||
if (!closed) {
|
||||
if (keepJournal()) journal.unremove(xid)
|
||||
_unremove(xid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def confirmRemove(xid: Int) {
|
||||
synchronized {
|
||||
if (!closed) {
|
||||
if (keepJournal()) journal.confirmRemove(xid)
|
||||
openTransactions.remove(xid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def flush() {
|
||||
while (remove(false).isDefined) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the queue's journal file. Not safe to call on an active queue.
|
||||
*/
|
||||
def close(): Unit = synchronized {
|
||||
closed = true
|
||||
if (keepJournal()) journal.close()
|
||||
}
|
||||
|
||||
def pauseReads(): Unit = synchronized {
|
||||
paused = true
|
||||
}
|
||||
|
||||
def resumeReads(): Unit = synchronized {
|
||||
paused = false
|
||||
}
|
||||
|
||||
def setup(): Unit = synchronized {
|
||||
queueSize = 0
|
||||
replayJournal
|
||||
}
|
||||
|
||||
def destroyJournal(): Unit = synchronized {
|
||||
if (keepJournal()) journal.erase()
|
||||
}
|
||||
|
||||
private final def nextXid(): Int = {
|
||||
do {
|
||||
xidCounter += 1
|
||||
} while (openTransactions contains xidCounter)
|
||||
xidCounter
|
||||
}
|
||||
|
||||
private final def fillReadBehind() {
|
||||
// if we're in read-behind mode, scan forward in the journal to keep memory as full as
|
||||
// possible. this amortizes the disk overhead across all reads.
|
||||
while (keepJournal() && journal.inReadBehind && _memoryBytes < maxMemorySize()) {
|
||||
journal.fillReadBehind { item ⇒
|
||||
queue += item
|
||||
_memoryBytes += item.data.length
|
||||
}
|
||||
if (!journal.inReadBehind) {
|
||||
log.debug("Coming out of read-behind for queue '{}'", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def replayJournal() {
|
||||
if (!keepJournal()) return
|
||||
|
||||
log.debug("Replaying transaction journal for '{}'", name)
|
||||
xidCounter = 0
|
||||
|
||||
journal.replay(name) {
|
||||
case JournalItem.Add(item) ⇒
|
||||
_add(item)
|
||||
// when processing the journal, this has to happen after:
|
||||
if (!journal.inReadBehind && queueSize >= maxMemorySize()) {
|
||||
log.debug("Dropping to read-behind for queue '{}' ({} bytes)", name, queueSize)
|
||||
journal.startReadBehind
|
||||
}
|
||||
case JournalItem.Remove ⇒ _remove(false)
|
||||
case JournalItem.RemoveTentative ⇒ _remove(true)
|
||||
case JournalItem.SavedXid(xid) ⇒ xidCounter = xid
|
||||
case JournalItem.Unremove(xid) ⇒ _unremove(xid)
|
||||
case JournalItem.ConfirmRemove(xid) ⇒ openTransactions.remove(xid)
|
||||
case x ⇒ log.warning("Unexpected item in journal: {}", x)
|
||||
}
|
||||
|
||||
log.debug("Finished transaction journal for '{}' ({} items, {} bytes)",
|
||||
name, queueLength, journal.size)
|
||||
journal.open
|
||||
|
||||
// now, any unfinished transactions must be backed out.
|
||||
for (xid ← openTransactionIds) {
|
||||
journal.unremove(xid)
|
||||
_unremove(xid)
|
||||
}
|
||||
}
|
||||
|
||||
def toList(): List[QItem] = {
|
||||
discardExpired
|
||||
queue.toList
|
||||
}
|
||||
|
||||
// ----- internal implementations
|
||||
|
||||
private def _add(item: QItem) {
|
||||
discardExpired
|
||||
if (!journal.inReadBehind) {
|
||||
queue += item
|
||||
_memoryBytes += item.data.length
|
||||
}
|
||||
_totalItems += 1
|
||||
queueSize += item.data.length
|
||||
queueLength += 1
|
||||
}
|
||||
|
||||
private def _peek(): Option[QItem] = {
|
||||
discardExpired
|
||||
if (queue.isEmpty) None else Some(queue.front)
|
||||
}
|
||||
|
||||
private def _remove(transaction: Boolean): Option[QItem] = {
|
||||
discardExpired()
|
||||
if (queue.isEmpty) return None
|
||||
|
||||
val now = System.currentTimeMillis
|
||||
val item = queue.dequeue
|
||||
val len = item.data.length
|
||||
queueSize -= len
|
||||
_memoryBytes -= len
|
||||
queueLength -= 1
|
||||
val xid = if (transaction) nextXid else 0
|
||||
|
||||
fillReadBehind
|
||||
_currentAge = now - item.addTime
|
||||
if (transaction) {
|
||||
item.xid = xid
|
||||
openTransactions(xid) = item
|
||||
}
|
||||
Some(item)
|
||||
}
|
||||
|
||||
final def discardExpired(): Int = {
|
||||
@tailrec def internalDisard(discarded: Int): Int = {
|
||||
if (queue.isEmpty || journal.isReplaying) {
|
||||
discarded
|
||||
} else {
|
||||
val realExpiry = adjustExpiry(queue.front.addTime, queue.front.expiry)
|
||||
if ((realExpiry != 0) && (realExpiry < System.currentTimeMillis)) {
|
||||
_totalExpired += 1
|
||||
val item = queue.dequeue
|
||||
val len = item.data.length
|
||||
queueSize -= len
|
||||
_memoryBytes -= len
|
||||
queueLength -= 1
|
||||
fillReadBehind
|
||||
if (keepJournal()) journal.remove()
|
||||
expiredQueue().map { _.add(item.data, 0) }
|
||||
internalDisard(discarded + 1)
|
||||
} else {
|
||||
discarded
|
||||
}
|
||||
}
|
||||
}
|
||||
internalDisard(0)
|
||||
}
|
||||
|
||||
private def _unremove(xid: Int) = {
|
||||
openTransactions.remove(xid) map { item ⇒
|
||||
queueLength += 1
|
||||
queueSize += item.data.length
|
||||
queue prepend item
|
||||
_memoryBytes += item.data.length
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object PersistentQueue {
|
||||
@volatile
|
||||
var maxItems: Int = Int.MaxValue
|
||||
@volatile
|
||||
var maxSize: Long = Long.MaxValue
|
||||
@volatile
|
||||
var maxItemSize: Long = Long.MaxValue
|
||||
@volatile
|
||||
var maxAge: Int = 0
|
||||
@volatile
|
||||
var maxJournalSize: Long = 16 * 1024 * 1024
|
||||
@volatile
|
||||
var maxMemorySize: Long = 128 * 1024 * 1024
|
||||
@volatile
|
||||
var maxJournalOverflow: Int = 10
|
||||
@volatile
|
||||
var maxJournalSizeAbsolute: Long = Long.MaxValue
|
||||
@volatile
|
||||
var discardOldWhenFull: Boolean = false
|
||||
@volatile
|
||||
var keepJournal: Boolean = true
|
||||
@volatile
|
||||
var syncJournal: Boolean = false
|
||||
@volatile
|
||||
var expiredQueue: Option[PersistentQueue] = None
|
||||
}
|
||||
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue
|
||||
|
||||
import java.nio.{ ByteBuffer, ByteOrder }
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
final case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) {
|
||||
def pack(): Array[Byte] = {
|
||||
val bytes = new Array[Byte](data.length + 16)
|
||||
val buffer = ByteBuffer.wrap(bytes)
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN)
|
||||
buffer.putLong(addTime)
|
||||
buffer.putLong(expiry)
|
||||
buffer.put(data)
|
||||
bytes
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object QItem {
|
||||
def unpack(data: Array[Byte]): QItem = {
|
||||
val buffer = ByteBuffer.wrap(data)
|
||||
val bytes = new Array[Byte](data.length - 16)
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN)
|
||||
val addTime = buffer.getLong
|
||||
val expiry = buffer.getLong
|
||||
buffer.get(bytes)
|
||||
QItem(addTime, expiry, bytes, 0)
|
||||
}
|
||||
|
||||
def unpackOldAdd(data: Array[Byte]): QItem = {
|
||||
val buffer = ByteBuffer.wrap(data)
|
||||
val bytes = new Array[Byte](data.length - 4)
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN)
|
||||
val expiry = buffer.getInt
|
||||
buffer.get(bytes)
|
||||
QItem(System.currentTimeMillis, if (expiry == 0) 0 else expiry * 1000, bytes, 0)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,227 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue
|
||||
|
||||
import java.io.File
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import scala.collection.mutable
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.actor.mailbox.filebased.FileBasedMailboxSettings
|
||||
|
||||
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class QueueCollection(queueFolder: String, settings: FileBasedMailboxSettings, log: LoggingAdapter) {
|
||||
private val path = new File(queueFolder)
|
||||
|
||||
if (!path.isDirectory) {
|
||||
path.mkdirs()
|
||||
}
|
||||
if (!path.isDirectory || !path.canWrite) {
|
||||
throw new InaccessibleQueuePath
|
||||
}
|
||||
|
||||
private val queues = new mutable.HashMap[String, PersistentQueue]
|
||||
private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]]
|
||||
private var shuttingDown = false
|
||||
|
||||
// total items added since the server started up.
|
||||
val totalAdded = new Counter()
|
||||
|
||||
// hits/misses on removing items from the queue
|
||||
val queueHits = new Counter()
|
||||
val queueMisses = new Counter()
|
||||
|
||||
// preload any queues
|
||||
def loadQueues() {
|
||||
path.list() filter { name ⇒ !(name contains "~~") } map { queue(_) }
|
||||
}
|
||||
|
||||
def queueNames: List[String] = synchronized {
|
||||
queues.keys.toList
|
||||
}
|
||||
|
||||
def currentItems = queues.values.foldLeft(0L) { _ + _.length }
|
||||
def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
|
||||
|
||||
/**
|
||||
* Get a named queue, creating it if necessary.
|
||||
* Exposed only to unit tests.
|
||||
*/
|
||||
private[akka] def queue(name: String): Option[PersistentQueue] = synchronized {
|
||||
if (shuttingDown) {
|
||||
None
|
||||
} else {
|
||||
Some(queues.get(name) getOrElse {
|
||||
// only happens when creating a queue for the first time.
|
||||
val q = if (name contains '+') {
|
||||
val master = name.split('+')(0)
|
||||
fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
|
||||
log.debug("Fanout queue {} added to {}", name, master)
|
||||
new PersistentQueue(path.getPath, name, settings, log)
|
||||
} else {
|
||||
new PersistentQueue(path.getPath, name, settings, log)
|
||||
}
|
||||
q.setup
|
||||
queues(name) = q
|
||||
q
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an item to a named queue. Will not return until the item has been
|
||||
* synchronously added and written to the queue journal file.
|
||||
*
|
||||
* @return true if the item was added; false if the server is shutting
|
||||
* down
|
||||
*/
|
||||
def add(key: String, item: Array[Byte], expiry: Int): Boolean = {
|
||||
for (fanouts ← fanout_queues.get(key); name ← fanouts) {
|
||||
add(name, item, expiry)
|
||||
}
|
||||
|
||||
queue(key) match {
|
||||
case None ⇒ false
|
||||
case Some(q) ⇒
|
||||
val now = System.currentTimeMillis
|
||||
val normalizedExpiry: Long = if (expiry == 0) {
|
||||
0
|
||||
} else if (expiry < 1000000) {
|
||||
now + expiry
|
||||
} else {
|
||||
expiry
|
||||
}
|
||||
val result = q.add(item, normalizedExpiry)
|
||||
if (result) totalAdded.incr()
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
def add(key: String, item: Array[Byte]): Boolean = add(key, item, 0)
|
||||
|
||||
/**
|
||||
* Retrieve an item from a queue and pass it to a continuation. If no item is available within
|
||||
* the requested time, or the server is shutting down, None is passed.
|
||||
*/
|
||||
def remove(key: String, timeout: Int, transaction: Boolean, peek: Boolean)(f: Option[QItem] ⇒ Unit) {
|
||||
queue(key) match {
|
||||
case None ⇒
|
||||
queueMisses.incr
|
||||
f(None)
|
||||
case Some(q) ⇒
|
||||
if (peek) {
|
||||
f(q.peek())
|
||||
} else {
|
||||
q.remove
|
||||
/* q.removeReact(if (timeout == 0) timeout else System.currentTimeMillis + timeout, transaction) {
|
||||
case None =>
|
||||
queueMisses.incr
|
||||
f(None)
|
||||
case Some(item) =>
|
||||
queueHits.incr
|
||||
f(Some(item))
|
||||
}
|
||||
*/ }
|
||||
}
|
||||
}
|
||||
|
||||
// for testing.
|
||||
def receive(key: String): Option[Array[Byte]] = {
|
||||
var rv: Option[Array[Byte]] = None
|
||||
val latch = new CountDownLatch(1)
|
||||
remove(key, 0, false, false) {
|
||||
case None ⇒
|
||||
rv = None
|
||||
latch.countDown
|
||||
case Some(v) ⇒
|
||||
rv = Some(v.data)
|
||||
latch.countDown
|
||||
}
|
||||
latch.await
|
||||
rv
|
||||
}
|
||||
|
||||
def unremove(key: String, xid: Int) {
|
||||
queue(key) map { q ⇒ q.unremove(xid) }
|
||||
}
|
||||
|
||||
def confirmRemove(key: String, xid: Int) {
|
||||
queue(key) map { q ⇒ q.confirmRemove(xid) }
|
||||
}
|
||||
|
||||
def flush(key: String) {
|
||||
queue(key) map { q ⇒ q.flush() }
|
||||
}
|
||||
|
||||
def delete(name: String): Unit = synchronized {
|
||||
if (!shuttingDown) {
|
||||
queues.get(name) map { q ⇒
|
||||
q.close()
|
||||
q.destroyJournal()
|
||||
queues.remove(name)
|
||||
}
|
||||
if (name contains '+') {
|
||||
val master = name.split('+')(0)
|
||||
fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) -= name
|
||||
log.debug("Fanout queue {} dropped from {}", name, master)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def flushExpired(name: String): Int = synchronized {
|
||||
if (shuttingDown) {
|
||||
0
|
||||
} else {
|
||||
queue(name) map { q ⇒ q.discardExpired() } getOrElse (0)
|
||||
}
|
||||
}
|
||||
|
||||
def flushAllExpired(): Int = synchronized {
|
||||
queueNames.foldLeft(0) { (sum, qName) ⇒ sum + flushExpired(qName) }
|
||||
}
|
||||
|
||||
def stats(key: String): Array[(String, String)] = queue(key) match {
|
||||
case None ⇒ Array[(String, String)]()
|
||||
case Some(q) ⇒
|
||||
q.dumpStats() ++
|
||||
fanout_queues.get(key).map { qset ⇒ ("children", qset.mkString(",")) }.toList
|
||||
}
|
||||
|
||||
def dumpConfig(key: String): Array[String] = {
|
||||
queue(key) match {
|
||||
case None ⇒ Array()
|
||||
case Some(q) ⇒ q.dumpConfig()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown this queue collection. All actors are asked to exit, and
|
||||
* any future queue requests will fail.
|
||||
*/
|
||||
def shutdown: Unit = synchronized {
|
||||
if (shuttingDown) {
|
||||
return
|
||||
}
|
||||
shuttingDown = true
|
||||
for ((name, q) ← queues) {
|
||||
// synchronous, so the journals are all officially closed before we return.
|
||||
q.close
|
||||
}
|
||||
queues.clear
|
||||
}
|
||||
}
|
||||
|
|
@ -1,158 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue.tools
|
||||
|
||||
import language.reflectiveCalls
|
||||
|
||||
import java.io.{ FileNotFoundException, IOException }
|
||||
import scala.collection.mutable
|
||||
import akka.actor.mailbox.filebased.filequeue._
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class QueueDumper(filename: String, log: LoggingAdapter) {
|
||||
var offset = 0L
|
||||
var operations = 0L
|
||||
var currentXid = 0
|
||||
|
||||
val queue = new mutable.Queue[Int] {
|
||||
def unget(item: Int) = prependElem(item)
|
||||
}
|
||||
val openTransactions = new mutable.HashMap[Int, Int]
|
||||
|
||||
def apply() {
|
||||
val journal = new Journal(filename, false, log)
|
||||
var lastDisplay = 0L
|
||||
|
||||
try {
|
||||
for ((item, itemsize) ← journal.walk()) {
|
||||
operations += 1
|
||||
dumpItem(item)
|
||||
offset += itemsize
|
||||
if (QDumper.quiet && offset - lastDisplay > 1024 * 1024) {
|
||||
print("\rReading journal: %-6s".format(Util.bytesToHuman(offset, 0)))
|
||||
Console.flush()
|
||||
lastDisplay = offset
|
||||
}
|
||||
}
|
||||
print("\r" + (" " * 30))
|
||||
|
||||
println()
|
||||
val totalItems = queue.size + openTransactions.size
|
||||
val totalBytes = queue.foldLeft(0L) { _ + _ } + openTransactions.values.foldLeft(0L) { _ + _ }
|
||||
println("Journal size: %d bytes, with %d operations.".format(offset, operations))
|
||||
println("%d items totalling %d bytes.".format(totalItems, totalBytes))
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒
|
||||
println("Can't open journal file: " + filename)
|
||||
case e: IOException ⇒
|
||||
println("Exception reading journal file: " + filename)
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
|
||||
def dumpItem(item: JournalItem) {
|
||||
val now = System.currentTimeMillis
|
||||
if (!QDumper.quiet) print("%08x ".format(offset & 0xffffffffL))
|
||||
item match {
|
||||
case JournalItem.Add(qitem) ⇒
|
||||
if (!QDumper.quiet) {
|
||||
print("ADD %-6d".format(qitem.data.size))
|
||||
if (qitem.xid > 0) {
|
||||
print(" xid=%d".format(qitem.xid))
|
||||
}
|
||||
if (qitem.expiry > 0) {
|
||||
if (qitem.expiry - now < 0) {
|
||||
print(" expired")
|
||||
} else {
|
||||
print(" exp=%d".format(qitem.expiry - now))
|
||||
}
|
||||
}
|
||||
println()
|
||||
}
|
||||
queue += qitem.data.size
|
||||
case JournalItem.Remove ⇒
|
||||
if (!QDumper.quiet) println("REM")
|
||||
queue.dequeue
|
||||
case JournalItem.RemoveTentative ⇒
|
||||
do {
|
||||
currentXid += 1
|
||||
} while (openTransactions contains currentXid)
|
||||
openTransactions(currentXid) = queue.dequeue
|
||||
if (!QDumper.quiet) println("RSV %d".format(currentXid))
|
||||
case JournalItem.SavedXid(xid) ⇒
|
||||
if (!QDumper.quiet) println("XID %d".format(xid))
|
||||
currentXid = xid
|
||||
case JournalItem.Unremove(xid) ⇒
|
||||
queue.unget(openTransactions.remove(xid).get)
|
||||
if (!QDumper.quiet) println("CAN %d".format(xid))
|
||||
case JournalItem.ConfirmRemove(xid) ⇒
|
||||
if (!QDumper.quiet) println("ACK %d".format(xid))
|
||||
openTransactions.remove(xid)
|
||||
case x ⇒
|
||||
if (!QDumper.quiet) println(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object QDumper {
|
||||
val filenames = new mutable.ListBuffer[String]
|
||||
var quiet = false
|
||||
|
||||
def usage() {
|
||||
println()
|
||||
println("usage: qdump.sh <journal-files...>")
|
||||
println(" describe the contents of a kestrel journal file")
|
||||
println()
|
||||
println("options:")
|
||||
println(" -q quiet: don't describe every line, just the summary")
|
||||
println()
|
||||
}
|
||||
|
||||
def parseArgs(args: List[String]): Unit = args match {
|
||||
case Nil ⇒
|
||||
case "--help" :: xs ⇒
|
||||
usage()
|
||||
System.exit(0)
|
||||
case "-q" :: xs ⇒
|
||||
quiet = true
|
||||
parseArgs(xs)
|
||||
case x :: xs ⇒
|
||||
filenames += x
|
||||
parseArgs(xs)
|
||||
}
|
||||
|
||||
def main(args: Array[String]) {
|
||||
parseArgs(args.toList)
|
||||
if (filenames.size == 0) {
|
||||
usage()
|
||||
System.exit(0)
|
||||
}
|
||||
|
||||
val system = ActorSystem()
|
||||
|
||||
for (filename ← filenames) {
|
||||
println("Queue: " + filename)
|
||||
new QueueDumper(filename, system.log)()
|
||||
}
|
||||
|
||||
system.shutdown()
|
||||
}
|
||||
}
|
||||
|
|
@ -1,54 +0,0 @@
|
|||
/*
|
||||
* Copyright 2009 Twitter, Inc.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package akka.actor.mailbox.filebased.filequeue.tools
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object Util {
|
||||
val KILOBYTE = 1024L
|
||||
val MEGABYTE = 1024 * KILOBYTE
|
||||
val GIGABYTE = 1024 * MEGABYTE
|
||||
|
||||
def bytesToHuman(bytes: Long, minDivisor: Long) = {
|
||||
if ((bytes == 0) && (minDivisor == 0)) {
|
||||
"0"
|
||||
} else {
|
||||
val divisor = if ((bytes >= GIGABYTE * 95 / 100) || (minDivisor == GIGABYTE)) {
|
||||
GIGABYTE
|
||||
} else if ((bytes >= MEGABYTE * 95 / 100) || (minDivisor == MEGABYTE)) {
|
||||
MEGABYTE
|
||||
} else {
|
||||
KILOBYTE
|
||||
}
|
||||
|
||||
// add 1/2 when computing the dot, to force it to round up.
|
||||
var dot = ((bytes % divisor) * 20 + divisor) / (2 * divisor)
|
||||
var base = (bytes - (bytes % divisor)) / divisor
|
||||
if (dot >= 10) {
|
||||
base += 1
|
||||
dot -= 10
|
||||
}
|
||||
|
||||
base.toString + (if (base < 100) ("." + dot) else "") + (divisor match {
|
||||
case KILOBYTE ⇒ "K"
|
||||
case MEGABYTE ⇒ "M"
|
||||
case GIGABYTE ⇒ "G"
|
||||
case _ ⇒ ""
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,48 +0,0 @@
|
|||
package akka.actor.mailbox.filebased
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.actor.mailbox._
|
||||
import scala.concurrent.duration._
|
||||
import org.apache.commons.io.FileUtils
|
||||
import akka.dispatch.Mailbox
|
||||
|
||||
object FileBasedMailboxSpec {
|
||||
val config = """
|
||||
File-dispatcher {
|
||||
mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
|
||||
throughput = 1
|
||||
file-based.directory-path = "file-based"
|
||||
file-based.circuit-breaker.max-failures = 5
|
||||
file-based.circuit-breaker.call-timeout = 5 seconds
|
||||
}
|
||||
"""
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) {
|
||||
|
||||
val settings = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher"))
|
||||
|
||||
"FileBasedMailboxSettings" must {
|
||||
"read the file-based section" in {
|
||||
settings.QueuePath should be("file-based")
|
||||
settings.CircuitBreakerMaxFailures should be(5)
|
||||
settings.CircuitBreakerCallTimeout should be(5 seconds)
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
|
||||
|
||||
def clean(): Unit = FileUtils.deleteDirectory(new java.io.File(settings.QueuePath))
|
||||
|
||||
override def atStartup() {
|
||||
clean()
|
||||
super.atStartup()
|
||||
}
|
||||
|
||||
override def afterTermination() {
|
||||
clean()
|
||||
super.afterTermination()
|
||||
}
|
||||
}
|
||||
|
|
@ -1,134 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.mailbox
|
||||
|
||||
import akka.dispatch.{ Envelope, MessageQueue }
|
||||
import akka.remote.MessageSerializer
|
||||
import akka.remote.WireFormats.{ ActorRefData, RemoteEnvelope }
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor._
|
||||
|
||||
private[akka] object DurableExecutableMailboxConfig {
|
||||
val Name = "[\\.\\/\\$\\s]".r
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActorSystem) extends MessageQueue {
|
||||
import DurableExecutableMailboxConfig._
|
||||
|
||||
def ownerPath: ActorPath = owner.path
|
||||
val ownerPathString: String = ownerPath.elements.mkString("/")
|
||||
val name: String = "mailbox_" + Name.replaceAllIn(ownerPathString, "_")
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API
|
||||
* DurableMessageQueue with functionality to serialize and deserialize Envelopes (messages)
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: ExtendedActorSystem)
|
||||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization
|
||||
|
||||
/**
|
||||
* DurableMessageSerialization can be mixed into a DurableMessageQueue and adds functionality
|
||||
* to serialize and deserialize Envelopes (messages)
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
||||
|
||||
/**
|
||||
* Serializes the given Envelope into an Array of Bytes using an efficient serialization/deserialization strategy
|
||||
*/
|
||||
def serialize(durableMessage: Envelope): Array[Byte] = {
|
||||
|
||||
// It's alright to use ref.path.toString here
|
||||
// When the sender is a LocalActorRef it should be local when deserialized also.
|
||||
// When the sender is a RemoteActorRef the path.toString already contains remote address information.
|
||||
def serializeActorRef(ref: ActorRef): ActorRefData = ActorRefData.newBuilder.setPath(ref.path.toString).build
|
||||
|
||||
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])
|
||||
val builder = RemoteEnvelope.newBuilder
|
||||
.setMessage(message)
|
||||
.setRecipient(serializeActorRef(owner))
|
||||
.setSender(serializeActorRef(durableMessage.sender))
|
||||
|
||||
builder.build.toByteArray
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes an array of Bytes that were serialized using the DurableMessageSerialization.serialize method,
|
||||
* into an Envelope.
|
||||
*/
|
||||
def deserialize(bytes: Array[Byte]): Envelope = {
|
||||
|
||||
def deserializeActorRef(refProtocol: ActorRefData): ActorRef =
|
||||
system.provider.resolveActorRef(refProtocol.getPath)
|
||||
|
||||
val durableMessage = RemoteEnvelope.parseFrom(bytes)
|
||||
val message = MessageSerializer.deserialize(system, durableMessage.getMessage)
|
||||
val sender = deserializeActorRef(durableMessage.getSender)
|
||||
|
||||
Envelope(message, sender, system)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Conventional organization of durable mailbox settings:
|
||||
*
|
||||
* {{{
|
||||
* akka {
|
||||
* actor {
|
||||
* my-durable-dispatcher {
|
||||
* mailbox-type = "my.durable.mailbox"
|
||||
* my-durable-mailbox {
|
||||
* setting1 = 1
|
||||
* setting2 = 2
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* where name=“my-durable-mailbox” in this example.
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
trait DurableMailboxSettings {
|
||||
/**
|
||||
* A reference to the enclosing actor system.
|
||||
*/
|
||||
def systemSettings: ActorSystem.Settings
|
||||
|
||||
/**
|
||||
* A reference to the config section which the user specified for this mailbox’s dispatcher.
|
||||
*/
|
||||
def userConfig: Config
|
||||
|
||||
/**
|
||||
* The extracted config section for this mailbox, which is the “name”
|
||||
* section (if that exists), falling back to system defaults. Typical
|
||||
* implementation looks like:
|
||||
*
|
||||
* {{{
|
||||
* val config = initialize
|
||||
* }}}
|
||||
*/
|
||||
def config: Config
|
||||
|
||||
/**
|
||||
* Name of this mailbox type for purposes of configuration scoping. Reference
|
||||
* defaults go into “akka.actor.mailbox.<name>”.
|
||||
*/
|
||||
def name: String
|
||||
|
||||
/**
|
||||
* Obtain default extracted mailbox config section from userConfig and system.
|
||||
*/
|
||||
def initialize: Config =
|
||||
if (userConfig.hasPath(name))
|
||||
userConfig.getConfig(name).withFallback(systemSettings.config.getConfig("akka.actor.mailbox." + name))
|
||||
else systemSettings.config.getConfig("akka.actor.mailbox." + name)
|
||||
}
|
||||
|
||||
|
|
@ -1,58 +0,0 @@
|
|||
# Define some default values that can be overridden by system properties
|
||||
zookeeper.root.logger=INFO, CONSOLE
|
||||
zookeeper.console.threshold=INFO
|
||||
zookeeper.log.dir=.
|
||||
zookeeper.log.file=zookeeper.log
|
||||
zookeeper.log.threshold=DEBUG
|
||||
zookeeper.tracelog.dir=.
|
||||
zookeeper.tracelog.file=zookeeper_trace.log
|
||||
|
||||
#
|
||||
# ZooKeeper Logging Configuration
|
||||
#
|
||||
|
||||
# Format is "<default threshold> (, <appender>)+
|
||||
|
||||
# DEFAULT: console appender only
|
||||
log4j.rootLogger=${zookeeper.root.logger}
|
||||
|
||||
# Example with rolling log file
|
||||
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
|
||||
|
||||
# Example with rolling log file and tracing
|
||||
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
|
||||
|
||||
#
|
||||
# Log INFO level and above messages to the console
|
||||
#
|
||||
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
|
||||
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
|
||||
|
||||
#
|
||||
# Add ROLLINGFILE to rootLogger to get log file output
|
||||
# Log DEBUG level and above messages to a log file
|
||||
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
|
||||
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
|
||||
|
||||
# Max log file size of 10MB
|
||||
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
|
||||
# uncomment the next line to limit number of backup files
|
||||
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
|
||||
|
||||
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
|
||||
|
||||
|
||||
#
|
||||
# Add TRACEFILE to rootLogger to get log file output
|
||||
# Log DEBUG level and above messages to a log file
|
||||
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
|
||||
log4j.appender.TRACEFILE.Threshold=TRACE
|
||||
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
|
||||
|
||||
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
|
||||
### Notice we are including log4j's NDC here (%x)
|
||||
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<!-- For assistance related to logback-translator or configuration -->
|
||||
<!-- files in general, please contact the logback user mailing list -->
|
||||
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
|
||||
<!-- -->
|
||||
<!-- For professional support please see -->
|
||||
<!-- http://www.qos.ch/shop/products/professionalSupport -->
|
||||
<!-- -->
|
||||
<configuration scan="false" debug="false">
|
||||
<!-- Errors were reported during translation. -->
|
||||
<!-- Could not find transformer for org.apache.log4j.SimpleLayout -->
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<logger name="akka" level="DEBUG"/>
|
||||
|
||||
<logger name="org.mortbay.log" level="ERROR"/>
|
||||
<logger name="org.apache.jasper" level="ERROR"/>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
</configuration>
|
||||
|
|
@ -1,178 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.mailbox
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import java.io.InputStream
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import org.scalatest.{ WordSpecLike, BeforeAndAfterAll }
|
||||
import org.scalatest.Matchers
|
||||
|
||||
import com.typesafe.config.{ ConfigFactory, Config }
|
||||
|
||||
import DurableMailboxSpecActorFactory.{ MailboxTestActor, AccumulatorActor }
|
||||
import akka.actor.{ RepointableRef, Props, ActorSystem, ActorRefWithCell, ActorRef, ActorCell, Actor }
|
||||
import akka.dispatch.Mailbox
|
||||
import akka.testkit.TestKit
|
||||
import scala.concurrent.duration._
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object DurableMailboxSpecActorFactory {
|
||||
|
||||
class MailboxTestActor extends Actor {
|
||||
def receive = { case x ⇒ sender ! x }
|
||||
}
|
||||
|
||||
class AccumulatorActor extends Actor {
|
||||
var num = 0l
|
||||
def receive = {
|
||||
case x: Int ⇒ num += x
|
||||
case "sum" ⇒ sender ! num
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object DurableMailboxSpec {
|
||||
def fallbackConfig: Config = ConfigFactory.parseString("""
|
||||
akka {
|
||||
loggers = ["akka.testkit.TestEventListener"]
|
||||
loglevel = "WARNING"
|
||||
stdout-loglevel = "WARNING"
|
||||
}
|
||||
""")
|
||||
}
|
||||
|
||||
/**
|
||||
* Reusable test fixture for durable mailboxes. Implements a few basic tests. More
|
||||
* tests can be added in concrete subclass.
|
||||
*
|
||||
* Subclass must define dispatcher in the supplied config for the specific backend.
|
||||
* The id of the dispatcher should be the same as the `<backendName>-dispatcher`.
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
|
||||
extends TestKit(system) with WordSpecLike with Matchers with BeforeAndAfterAll {
|
||||
|
||||
import DurableMailboxSpecActorFactory._
|
||||
|
||||
/**
|
||||
* Subclass must define dispatcher in the supplied config for the specific backend.
|
||||
* The id of the dispatcher should be the same as the `<backendName>-dispatcher`.
|
||||
*/
|
||||
def this(backendName: String, config: String) = {
|
||||
this(ActorSystem(backendName + "BasedDurableMailboxSpec",
|
||||
ConfigFactory.parseString(config).withFallback(DurableMailboxSpec.fallbackConfig)),
|
||||
backendName)
|
||||
}
|
||||
|
||||
final override def beforeAll {
|
||||
atStartup()
|
||||
}
|
||||
|
||||
/**
|
||||
* May be implemented in concrete subclass to do additional things once before test
|
||||
* cases are run.
|
||||
*/
|
||||
protected def atStartup() {}
|
||||
|
||||
final override def afterAll {
|
||||
TestKit.shutdownActorSystem(system)
|
||||
try system.awaitTermination(5 seconds) catch {
|
||||
case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
|
||||
}
|
||||
afterTermination()
|
||||
}
|
||||
|
||||
/**
|
||||
* May be implemented in concrete subclass to do additional things once after all
|
||||
* test cases have been run.
|
||||
*/
|
||||
def afterTermination() {}
|
||||
|
||||
protected def streamMustContain(in: InputStream, words: String): Unit = {
|
||||
val output = new Array[Byte](8192)
|
||||
|
||||
def now = System.currentTimeMillis
|
||||
|
||||
def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // don’t want parse errors
|
||||
|
||||
@tailrec def read(end: Int = 0, start: Long = now): Int =
|
||||
in.read(output, end, output.length - end) match {
|
||||
case -1 ⇒ end
|
||||
case x ⇒
|
||||
val next = end + x
|
||||
if (string(next).contains(words) || now - start > 10000 || next == output.length) next
|
||||
else read(next, start)
|
||||
}
|
||||
|
||||
val result = string(read())
|
||||
if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result)
|
||||
}
|
||||
|
||||
def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = {
|
||||
val ref = id match {
|
||||
case null | "" ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher"))
|
||||
case some ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some)
|
||||
}
|
||||
awaitCond(ref match {
|
||||
case r: RepointableRef ⇒ r.isStarted
|
||||
}, 1 second, 10 millis)
|
||||
ref
|
||||
}
|
||||
|
||||
private def isDurableMailbox(m: Mailbox): Boolean =
|
||||
m.messageQueue.isInstanceOf[DurableMessageQueue]
|
||||
|
||||
"A " + backendName + " based mailbox backed actor" must {
|
||||
|
||||
"get a new, unique, durable mailbox" in {
|
||||
val a1, a2 = createMailboxTestActor()
|
||||
val mb1 = a1.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox
|
||||
val mb2 = a2.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox
|
||||
isDurableMailbox(mb1) should be(true)
|
||||
isDurableMailbox(mb2) should be(true)
|
||||
(mb1 ne mb2) should be(true)
|
||||
}
|
||||
|
||||
"deliver messages at most once" in {
|
||||
val queueActor = createMailboxTestActor()
|
||||
implicit val sender = testActor
|
||||
|
||||
val msgs = 1 to 100 map { x ⇒ "foo" + x }
|
||||
|
||||
msgs foreach { m ⇒ queueActor ! m }
|
||||
|
||||
msgs foreach { m ⇒ expectMsg(m) }
|
||||
|
||||
expectNoMsg()
|
||||
}
|
||||
|
||||
"support having multiple actors at the same time" in {
|
||||
val actors = Vector.fill(3)(createMailboxTestActor(Props[AccumulatorActor]))
|
||||
|
||||
actors foreach { a ⇒ isDurableMailbox(a.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox) should be(true) }
|
||||
|
||||
val msgs = 1 to 3
|
||||
|
||||
val expectedResult: Long = msgs.sum
|
||||
|
||||
for (a ← actors; m ← msgs) a ! m
|
||||
|
||||
for (a ← actors) {
|
||||
implicit val sender = testActor
|
||||
a ! "sum"
|
||||
expectMsg(expectedResult)
|
||||
}
|
||||
|
||||
expectNoMsg()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -50,7 +50,6 @@ object AkkaBuild extends Build {
|
|||
settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Publish.versionSettings ++
|
||||
SphinxSupport.settings ++ Dist.settings ++ s3Settings ++ mimaSettings ++ unidocScaladocSettings ++
|
||||
Protobuf.settings ++ inConfig(JavaDoc)(Defaults.configSettings) ++ Seq(
|
||||
testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean,
|
||||
parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean,
|
||||
Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository",
|
||||
unidocExclude := Seq(samples.id, remoteTests.id),
|
||||
|
|
@ -76,7 +75,7 @@ object AkkaBuild extends Build {
|
|||
validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => }
|
||||
),
|
||||
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent,
|
||||
persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit)
|
||||
persistence, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit)
|
||||
)
|
||||
|
||||
lazy val akkaScalaNightly = Project(
|
||||
|
|
@ -85,7 +84,7 @@ object AkkaBuild extends Build {
|
|||
// remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings)
|
||||
// samples don't work with dbuild right now
|
||||
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j,
|
||||
persistence, mailboxes, kernel, osgi, contrib, multiNodeTestkit)
|
||||
persistence, kernel, osgi, contrib, multiNodeTestkit)
|
||||
)
|
||||
|
||||
// this detached pseudo-project is used for running the tests against a different Scala version than the one used for compilation
|
||||
|
|
@ -264,36 +263,6 @@ object AkkaBuild extends Build {
|
|||
)
|
||||
)
|
||||
|
||||
val testMailbox = SettingKey[Boolean]("test-mailbox")
|
||||
|
||||
lazy val mailboxes = Project(
|
||||
id = "akka-durable-mailboxes",
|
||||
base = file("akka-durable-mailboxes"),
|
||||
settings = parentSettings,
|
||||
aggregate = Seq(mailboxesCommon, fileMailbox)
|
||||
)
|
||||
|
||||
lazy val mailboxesCommon = Project(
|
||||
id = "akka-mailboxes-common",
|
||||
base = file("akka-durable-mailboxes/akka-mailboxes-common"),
|
||||
dependencies = Seq(remote, testkit % "compile;test->test"),
|
||||
settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ OSGi.mailboxesCommon ++ Seq(
|
||||
libraryDependencies ++= Dependencies.mailboxes,
|
||||
previousArtifact := akkaPreviousArtifact("akka-mailboxes-common"),
|
||||
publishArtifact in Test := true
|
||||
)
|
||||
)
|
||||
|
||||
lazy val fileMailbox = Project(
|
||||
id = "akka-file-mailbox",
|
||||
base = file("akka-durable-mailboxes/akka-file-mailbox"),
|
||||
dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"),
|
||||
settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ OSGi.fileMailbox ++ Seq(
|
||||
libraryDependencies ++= Dependencies.fileMailbox,
|
||||
previousArtifact := akkaPreviousArtifact("akka-file-mailbox")
|
||||
)
|
||||
)
|
||||
|
||||
lazy val zeroMQ = Project(
|
||||
id = "akka-zeromq",
|
||||
base = file("akka-zeromq"),
|
||||
|
|
@ -1001,10 +970,6 @@ object AkkaBuild extends Build {
|
|||
|
||||
val cluster = exports(Seq("akka.cluster.*"), imports = Seq(protobufImport()))
|
||||
|
||||
val fileMailbox = exports(Seq("akka.actor.mailbox.filebased.*"))
|
||||
|
||||
val mailboxesCommon = exports(Seq("akka.actor.mailbox.*"), imports = Seq(protobufImport()))
|
||||
|
||||
val osgi = exports(Seq("akka.osgi.*"))
|
||||
|
||||
val osgiDiningHakkersSampleApi = exports(Seq("akka.sample.osgi.api"))
|
||||
|
|
@ -1141,10 +1106,6 @@ object Dependencies {
|
|||
|
||||
val persistence = Seq(levelDB, levelDBNative, protobuf, Test.scalatest, Test.junit, Test.commonsIo)
|
||||
|
||||
val mailboxes = Seq(Test.scalatest, Test.junit)
|
||||
|
||||
val fileMailbox = Seq(Test.commonsIo, Test.scalatest, Test.junit)
|
||||
|
||||
val kernel = Seq(Test.scalatest, Test.junit)
|
||||
|
||||
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue