Making akka-mailbox* compile and tests green
This commit is contained in:
parent
ed84bcf9d2
commit
7fdb2c72e4
3 changed files with 11 additions and 5 deletions
|
|
@ -17,9 +17,9 @@ import akka.actor.ExtendedActorSystem
|
|||
|
||||
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||
private val settings = new FileBasedMailboxSettings(systemSettings, config)
|
||||
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = owner zip system headOption match {
|
||||
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = (owner zip system).headOption match {
|
||||
case Some((o, s: ExtendedActorSystem)) ⇒ new FileBasedMessageQueue(o, s, settings)
|
||||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||
case _ ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,6 +34,10 @@ class OverlaySetting[T](base: ⇒ T) {
|
|||
def apply() = local.getOrElse(base)
|
||||
}
|
||||
|
||||
trait Prependable[T] {
|
||||
def prepend(t: T): Unit
|
||||
}
|
||||
|
||||
class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) {
|
||||
|
||||
private case object ItemArrived
|
||||
|
|
@ -56,9 +60,9 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
|
|||
// # of items in the queue (including those not in memory)
|
||||
private var queueLength: Long = 0
|
||||
|
||||
private var queue = new mutable.Queue[QItem] {
|
||||
private var queue: mutable.Queue[QItem] with Prependable[QItem] = new mutable.Queue[QItem] with Prependable[QItem] {
|
||||
// scala's Queue doesn't (yet?) have a way to put back.
|
||||
def unget(item: QItem) = prependElem(item)
|
||||
def prepend(item: QItem) = prependElem(item)
|
||||
}
|
||||
private var _memoryBytes: Long = 0
|
||||
|
||||
|
|
@ -435,7 +439,7 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
|
|||
openTransactions.remove(xid) map { item ⇒
|
||||
queueLength += 1
|
||||
queueSize += item.data.length
|
||||
queue unget item
|
||||
queue prepend item
|
||||
_memoryBytes += item.data.length
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@
|
|||
*/
|
||||
package akka.actor.mailbox
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import java.io.InputStream
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue