Merge pull request #332 from jboner/wip-1834-durable-nolog-√

Removing durable mailbox logging
This commit is contained in:
viktorklang 2012-02-19 04:07:23 -08:00
commit 9901861910
6 changed files with 8 additions and 28 deletions

View file

@ -35,7 +35,6 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
// ===== For MessageQueue =====
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in beanstalk-based mailbox [%s]".format(envelope))
Some(queue.get.put(65536, messageSubmitDelaySeconds, messageTimeToLiveSeconds, serialize(envelope)).toInt)
}
@ -46,9 +45,7 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
val bytes = job.getData
if (bytes ne null) {
queue.get.delete(job.getJobId)
val envelope = deserialize(bytes)
log.debug("DEQUEUING message in beanstalk-based mailbox [%s]".format(envelope))
envelope
deserialize(bytes)
} else null: Envelope
}
} catch {

View file

@ -37,7 +37,6 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with
}
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in file-based mailbox [{}]", envelope)
queue.add(serialize(envelope))
}
@ -45,9 +44,7 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with
val item = queue.remove
if (item.isDefined) {
queue.confirmRemove(item.get.xid)
val envelope = deserialize(item.get.data)
log.debug("DEQUEUING message in file-based mailbox [{}]", envelope)
envelope
deserialize(item.get.data)
} else null
} catch {
case e: java.util.NoSuchElementException null

View file

@ -8,7 +8,6 @@ import java.io.InputStream
import org.bson.collection.BSONDocument
import org.bson.io.BasicOutputBuffer
import org.bson.io.OutputBuffer
import org.bson.util.Logging
import org.bson.SerializableBSONObject
import org.bson.BSONSerializer
import org.bson.DefaultBSONDeserializer
@ -18,7 +17,7 @@ import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.MessageSerializer
import akka.actor.ExtendedActorSystem
class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] with Logging {
class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] {
protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = {
@ -35,7 +34,6 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB
val msgData = MessageSerializer.serialize(system, msg.message.asInstanceOf[AnyRef])
b += "message" -> new org.bson.types.Binary(0, msgData.toByteArray)
val doc = b.result
system.log.debug("Serialized Document: {}", doc)
serializer.putObject(doc)
}
@ -63,7 +61,6 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB
val deserializer = new DefaultBSONDeserializer
// TODO - Skip the whole doc step for performance, fun, and profit! (Needs Salat / custom Deser)
val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
system.log.debug("Deserializing a durable message from MongoDB: {}", doc)
val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
val msg = MessageSerializer.deserialize(system, msgData)
val ownerPath = doc.as[String]("ownerPath")

View file

@ -45,7 +45,6 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
private var mongo = connect()
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in mongodb-based mailbox [{}]", envelope)
/* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */
val durableMessage = MongoDurableMessage(ownerPathString, envelope.message, envelope.sender)
// todo - do we need to filter the actor name at all for safe collection naming?
@ -70,11 +69,9 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
val envelopePromise = Promise[Envelope]()(dispatcher)
mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage]
doc match {
case Some(msg) {
log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg)
case Some(msg)
envelopePromise.success(msg.envelope(system))
log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise)
}
()
case None
log.info("No matching document found. Not an error, just an empty queue.")
envelopePromise.success(null)

View file

@ -29,7 +29,6 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit
val log = Logging(system, "RedisBasedMailbox")
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in redis-based mailbox [%s]".format(envelope))
withErrorHandling {
clients.withClient { client
client.rpush(name, serialize(envelope))
@ -40,12 +39,8 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit
def dequeue(): Envelope = withErrorHandling {
try {
import serialization.Parse.Implicits.parseByteArray
val item = clients.withClient { client
client.lpop[Array[Byte]](name).getOrElse(throw new NoSuchElementException(name + " not present"))
}
val envelope = deserialize(item)
log.debug("DEQUEUING message in redis-based mailbox [%s]".format(envelope))
envelope
val item = clients.withClient { _.lpop[Array[Byte]](name).getOrElse(throw new NoSuchElementException(name + " not present")) }
deserialize(item)
} catch {
case e: java.util.NoSuchElementException null
case NonFatal(e)

View file

@ -35,14 +35,11 @@ class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), settings.BlockingQueue)
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in zookeeper-based mailbox [%s]".format(envelope))
queue.enqueue(serialize(envelope))
}
def dequeue: Envelope = try {
val messageInvocation = deserialize(queue.dequeue.asInstanceOf[Array[Byte]])
log.debug("DEQUEUING message in zookeeper-based mailbox [%s]".format(messageInvocation))
messageInvocation
deserialize(queue.dequeue.asInstanceOf[Array[Byte]])
} catch {
case e: java.util.NoSuchElementException null
case e: InterruptedException null