From 3a61b2f4326ae551737b4db0f8fc98075aae691f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 16 Feb 2012 21:58:02 +0100 Subject: [PATCH 1/2] Removing durable mailbox logging --- .../scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala | 5 +---- .../scala/akka/actor/mailbox/FiledBasedMailbox.scala | 5 +---- .../scala/akka/actor/mailbox/MongoBasedMailbox.scala | 6 ++---- .../scala/akka/actor/mailbox/RedisBasedMailbox.scala | 9 ++------- .../scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala | 5 +---- 5 files changed, 7 insertions(+), 23 deletions(-) diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 489d97d176..f42efd1da1 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -35,7 +35,6 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) // ===== For MessageQueue ===== def enqueue(receiver: ActorRef, envelope: Envelope) { - log.debug("ENQUEUING message in beanstalk-based mailbox [%s]".format(envelope)) Some(queue.get.put(65536, messageSubmitDelaySeconds, messageTimeToLiveSeconds, serialize(envelope)).toInt) } @@ -46,9 +45,7 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) val bytes = job.getData if (bytes ne null) { queue.get.delete(job.getJobId) - val envelope = deserialize(bytes) - log.debug("DEQUEUING message in beanstalk-based mailbox [%s]".format(envelope)) - envelope + deserialize(bytes) } else null: Envelope } } catch { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index ccdbdc4145..1d9f72a579 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -37,7 +37,6 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with } def enqueue(receiver: ActorRef, envelope: Envelope) { - log.debug("ENQUEUING message in file-based mailbox [{}]", envelope) queue.add(serialize(envelope)) } @@ -45,9 +44,7 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with val item = queue.remove if (item.isDefined) { queue.confirmRemove(item.get.xid) - val envelope = deserialize(item.get.data) - log.debug("DEQUEUING message in file-based mailbox [{}]", envelope) - envelope + deserialize(item.get.data) } else null } catch { case e: java.util.NoSuchElementException ⇒ null diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index d17a1221a8..e402a8ae61 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -70,11 +70,9 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { val envelopePromise = Promise[Envelope]()(dispatcher) mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage] ⇒ doc match { - case Some(msg) ⇒ { - log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg) + case Some(msg) ⇒ envelopePromise.success(msg.envelope(system)) - log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise) - } + () case None ⇒ log.info("No matching document found. Not an error, just an empty queue.") envelopePromise.success(null) diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index b6cf3febc6..f5248ac635 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -29,7 +29,6 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit val log = Logging(system, "RedisBasedMailbox") def enqueue(receiver: ActorRef, envelope: Envelope) { - log.debug("ENQUEUING message in redis-based mailbox [%s]".format(envelope)) withErrorHandling { clients.withClient { client ⇒ client.rpush(name, serialize(envelope)) @@ -40,12 +39,8 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit def dequeue(): Envelope = withErrorHandling { try { import serialization.Parse.Implicits.parseByteArray - val item = clients.withClient { client ⇒ - client.lpop[Array[Byte]](name).getOrElse(throw new NoSuchElementException(name + " not present")) - } - val envelope = deserialize(item) - log.debug("DEQUEUING message in redis-based mailbox [%s]".format(envelope)) - envelope + val item = clients.withClient { _.lpop[Array[Byte]](name).getOrElse(throw new NoSuchElementException(name + " not present")) } + deserialize(item) } catch { case e: java.util.NoSuchElementException ⇒ null case NonFatal(e) ⇒ diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 90fd381af1..2a88c565e6 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -35,14 +35,11 @@ class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), settings.BlockingQueue) def enqueue(receiver: ActorRef, envelope: Envelope) { - log.debug("ENQUEUING message in zookeeper-based mailbox [%s]".format(envelope)) queue.enqueue(serialize(envelope)) } def dequeue: Envelope = try { - val messageInvocation = deserialize(queue.dequeue.asInstanceOf[Array[Byte]]) - log.debug("DEQUEUING message in zookeeper-based mailbox [%s]".format(messageInvocation)) - messageInvocation + deserialize(queue.dequeue.asInstanceOf[Array[Byte]]) } catch { case e: java.util.NoSuchElementException ⇒ null case e: InterruptedException ⇒ null From b532211e713fc946443837180491b6b77a3d3dd3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 19 Feb 2012 13:03:46 +0100 Subject: [PATCH 2/2] Removing logging from MongoBasedMailbox and BSON serialization --- .../main/scala/akka/actor/mailbox/BSONSerialization.scala | 5 +---- .../main/scala/akka/actor/mailbox/MongoBasedMailbox.scala | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index 5aa314eb55..86e190a5f7 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -8,7 +8,6 @@ import java.io.InputStream import org.bson.collection.BSONDocument import org.bson.io.BasicOutputBuffer import org.bson.io.OutputBuffer -import org.bson.util.Logging import org.bson.SerializableBSONObject import org.bson.BSONSerializer import org.bson.DefaultBSONDeserializer @@ -18,7 +17,7 @@ import akka.remote.RemoteProtocol.MessageProtocol import akka.remote.MessageSerializer import akka.actor.ExtendedActorSystem -class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] with Logging { +class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] { protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = { @@ -35,7 +34,6 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB val msgData = MessageSerializer.serialize(system, msg.message.asInstanceOf[AnyRef]) b += "message" -> new org.bson.types.Binary(0, msgData.toByteArray) val doc = b.result - system.log.debug("Serialized Document: {}", doc) serializer.putObject(doc) } @@ -63,7 +61,6 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB val deserializer = new DefaultBSONDeserializer // TODO - Skip the whole doc step for performance, fun, and profit! (Needs Salat / custom Deser) val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument] - system.log.debug("Deserializing a durable message from MongoDB: {}", doc) val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData) val msg = MessageSerializer.deserialize(system, msgData) val ownerPath = doc.as[String]("ownerPath") diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index e402a8ae61..f2ca76df77 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -45,7 +45,6 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { private var mongo = connect() def enqueue(receiver: ActorRef, envelope: Envelope) { - log.debug("ENQUEUING message in mongodb-based mailbox [{}]", envelope) /* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */ val durableMessage = MongoDurableMessage(ownerPathString, envelope.message, envelope.sender) // todo - do we need to filter the actor name at all for safe collection naming?