#2673 + #2674 - Seq[Byte] becomes ByteString and Frame gets removed, lots of cleanup in ZMQ API

This commit is contained in:
Viktor Klang 2012-12-14 18:25:04 +01:00
parent 5b477e99e7
commit 24566c167c
9 changed files with 124 additions and 92 deletions

View file

@ -58,16 +58,16 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_]]): Try[AnyRef] =
Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
def deserialize[T](bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_ <: T]]): Try[T] =
Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz).asInstanceOf[T])
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] =
Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] =
Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T])
/**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.

View file

@ -45,6 +45,11 @@ object ByteString {
*/
def apply(string: String, charset: String): ByteString = CompactByteString(string, charset)
/**
* Creates a new ByteString by copying a byte array.
*/
def fromArray(array: Array[Byte]): ByteString = apply(array)
/**
* Creates a new ByteString by copying length bytes starting at offset from
* an Array.
@ -52,6 +57,16 @@ object ByteString {
def fromArray(array: Array[Byte], offset: Int, length: Int): ByteString =
CompactByteString.fromArray(array, offset, length)
/**
* Creates a new ByteString which will contain the UTF-8 representation of the given String
*/
def fromString(string: String): ByteString = apply(string)
/**
* Creates a new ByteString which will contain the representation of the given String in the given charset
*/
def fromString(string: String, charset: String): ByteString = apply(string, charset)
val empty: ByteString = CompactByteString(Array.empty[Byte])
def newBuilder: ByteStringBuilder = new ByteStringBuilder
@ -282,6 +297,12 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
override def indexWhere(p: Byte Boolean): Int = iterator.indexWhere(p)
override def indexOf[B >: Byte](elem: B): Int = iterator.indexOf(elem)
/**
* JAVA API
* @return this ByteString copied into a byte array
*/
protected[ByteString] def toArray: Array[Byte] = toArray[Byte] // protected[ByteString] == public to Java but hidden to Scala * fnizz *
override def toArray[B >: Byte](implicit arg0: ClassTag[B]): Array[B] = iterator.toArray
override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit =
iterator.copyToArray(xs, start, len)

View file

@ -16,7 +16,7 @@ import akka.zeromq.Subscribe;
import akka.zeromq.Unsubscribe;
//#import-unsub-topic-socket
//#import-pub-topic
import akka.zeromq.Frame;
import akka.util.ByteString;
import akka.zeromq.ZMQMessage;
//#import-pub-topic
@ -96,7 +96,7 @@ public class ZeromqDocTestBase {
byte[] payload = new byte[0];
//#pub-topic
pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload)), null);
pubSocket.tell(ZMQMessage.withFrames(ByteString.fromString("foo.bar"), ByteString.fromArray(payload)), null);
//#pub-topic
system.stop(subSocket);
@ -136,7 +136,7 @@ public class ZeromqDocTestBase {
private boolean checkZeroMQInstallation() {
try {
ZeroMQVersion v = ZeroMQExtension.get(system).version();
return (v.major() == 2 && v.minor() == 1);
return (v.major() >= 3 || (v.major() >= 2 && v.minor() >= 1));
} catch (LinkageError e) {
return false;
}
@ -213,18 +213,23 @@ public class ZeromqDocTestBase {
long timestamp = System.currentTimeMillis();
// use akka SerializationExtension to convert to bytes
byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
ByteString heapTopic = ByteString.fromString("health.heap", "UTF-8");
ByteString heapPayload = ByteString.fromArray(
ser.serialize(
new Heap(timestamp,
currentHeap.getUsed(),
currentHeap.getMax())
).get());
// the first frame is the topic, second is the message
pubSocket.tell(new ZMQMessage(new Frame("health.heap"),
new Frame(heapPayload)), getSelf());
pubSocket.tell(ZMQMessage.withFrames(heapTopic, heapPayload), getSelf());
// use akka SerializationExtension to convert to bytes
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(
new Load(timestamp, os.getSystemLoadAverage()));
ByteString loadTopic = ByteString.fromString("health.load", "UTF-8");
ByteString loadPayload = ByteString.fromArray(
ser.serialize(new Load(timestamp, os.getSystemLoadAverage())).get()
);
// the first frame is the topic, second is the message
pubSocket.tell(new ZMQMessage(new Frame("health.load"),
new Frame(loadPayload)), getSelf());
pubSocket.tell(ZMQMessage.withFrames(loadTopic, loadPayload), getSelf());
} else {
unhandled(message);
}
@ -248,13 +253,14 @@ public class ZeromqDocTestBase {
public void onReceive(Object message) {
if (message instanceof ZMQMessage) {
ZMQMessage m = (ZMQMessage) message;
String topic = m.frame(0).utf8String();
// the first frame is the topic, second is the message
if (m.firstFrameAsString().equals("health.heap")) {
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
if ("health.heap".equals(topic)) {
Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get();
log.info("Used heap {} bytes, at {}", heap.used,
timestampFormat.format(new Date(heap.timestamp)));
} else if (m.firstFrameAsString().equals("health.load")) {
Load load = (Load) ser.serializerFor(Load.class).fromBinary(m.payload(1));
} else if ("health.load".equals(topic)) {
Load load = ser.deserialize(m.frame(1).toArray(), Load.class).get();
log.info("Load average {}, at {}", load.loadAverage,
timestampFormat.format(new Date(load.timestamp)));
}
@ -282,9 +288,10 @@ public class ZeromqDocTestBase {
public void onReceive(Object message) {
if (message instanceof ZMQMessage) {
ZMQMessage m = (ZMQMessage) message;
String topic = m.frame(0).utf8String();
// the first frame is the topic, second is the message
if (m.firstFrameAsString().equals("health.heap")) {
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
if ("health.heap".equals(topic)) {
Heap heap = ser.<Heap>deserialize(m.frame(1).toArray(), Heap.class).get();
if (((double) heap.used / heap.max) > 0.9) {
count += 1;
} else {

View file

@ -32,6 +32,7 @@ The ``timerActive_?`` method has been deprecated in both the ``FSM`` trait and t
class. You should now use the ``isTimerActive`` method instead. The old method will remain
throughout 2.2.x. It will be removed in Akka 2.3.
ThreadPoolConfigBuilder
=======================
@ -39,3 +40,11 @@ ThreadPoolConfigBuilder
and with it the ``conf_?`` method that was essentially only a type-inferencer aid for creation
of optional transformations on ``ThreadPoolConfigBuilder``.
Instead use: ``option.map(o => (t: ThreadPoolConfigBuilder) => t.op(o))``.
ZeroMQ ByteString
=================
``akka.zeromq.Frame`` and the use of ``Seq[Byte]`` in the API has been removed and is replaced by ``akka.util.ByteString``.
``ZMQMessage.firstFrameAsString`` has been removed, please use ``ZMQMessage.frames`` or ``ZMQMessage.frame(int)`` to access the frames.

View file

@ -6,8 +6,8 @@ package docs.zeromq
import language.postfixOps
import scala.concurrent.duration._
import scala.collection.immutable
import akka.actor.{ Actor, Props }
import akka.util.ByteString
import akka.testkit._
import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind }
import java.text.SimpleDateFormat
@ -53,12 +53,12 @@ object ZeromqDocSpec {
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed,
currentHeap.getMax)).get
// the first frame is the topic, second is the message
pubSocket ! ZMQMessage(immutable.Seq(Frame("health.heap"), Frame(heapPayload)))
pubSocket ! ZMQMessage(ByteString("health.heap"), ByteString(heapPayload))
// use akka SerializationExtension to convert to bytes
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get
// the first frame is the topic, second is the message
pubSocket ! ZMQMessage(immutable.Seq(Frame("health.load"), Frame(loadPayload)))
pubSocket ! ZMQMessage(ByteString("health.load"), ByteString(loadPayload))
}
}
//#health
@ -73,14 +73,14 @@ object ZeromqDocSpec {
def receive = {
// the first frame is the topic, second is the message
case m: ZMQMessage if m.firstFrameAsString == "health.heap"
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1),
case m: ZMQMessage if m.frames(0).utf8String == "health.heap"
val Heap(timestamp, used, max) = ser.deserialize(m.frames(1).toArray,
classOf[Heap]).get
log.info("Used heap {} bytes, at {}", used,
timestampFormat.format(new Date(timestamp)))
case m: ZMQMessage if m.firstFrameAsString == "health.load"
val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1),
case m: ZMQMessage if m.frames(0).utf8String == "health.load"
val Load(timestamp, loadAverage) = ser.deserialize(m.frames(1).toArray,
classOf[Load]).get
log.info("Load average {}, at {}", loadAverage,
timestampFormat.format(new Date(timestamp)))
@ -98,9 +98,8 @@ object ZeromqDocSpec {
def receive = {
// the first frame is the topic, second is the message
case m: ZMQMessage if m.firstFrameAsString == "health.heap"
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1),
classOf[Heap]).get
case m: ZMQMessage if m.frames(0).utf8String == "health.heap"
val Heap(timestamp, used, max) = ser.deserialize(m.frames(1).toArray, classOf[Heap]).get
if ((used.toDouble / max) > 0.9) count += 1
else count = 0
if (count > 10) log.warning("Need more memory, using {} %",
@ -147,7 +146,7 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
val payload = Array.empty[Byte]
//#pub-topic
pubSocket ! ZMQMessage(Frame("foo.bar"), Frame(payload))
pubSocket ! ZMQMessage(ByteString("foo.bar"), ByteString(payload))
//#pub-topic
system.stop(subSocket)
@ -188,7 +187,8 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
def checkZeroMQInstallation() = try {
ZeroMQExtension(system).version match {
case ZeroMQVersion(2, 1, _) Unit
case ZeroMQVersion(2, x, _) if x >= 1 Unit
case ZeroMQVersion(y, _, _) if y >= 3 Unit
case version pending
}
} catch {

View file

@ -14,6 +14,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import akka.event.Logging
import java.util.concurrent.TimeUnit
import akka.util.ByteString
private[zeromq] object ConcurrentSocketActor {
private sealed trait PollMsg
@ -29,7 +30,6 @@ private[zeromq] object ConcurrentSocketActor {
private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) extends Actor {
import ConcurrentSocketActor._
private val noBytes = Array[Byte]()
private val zmqContext = params collectFirst { case c: Context c } getOrElse DefaultContext
private var deserializer = params collectFirst { case d: Deserializer d } getOrElse new ZMQMessageDeserializer
@ -41,7 +41,7 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption])
private val socket: Socket = zmqContext.socket(socketType)
private val poller: Poller = zmqContext.poller
private val pendingSends = new ListBuffer[immutable.Seq[Frame]]
private val pendingSends = new ListBuffer[immutable.Seq[ByteString]]
def receive = {
case m: PollMsg doPoll(m)
@ -152,13 +152,13 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption])
}
} finally notifyListener(Closed)
@tailrec private def flushMessage(i: immutable.Seq[Frame]): Boolean =
@tailrec private def flushMessage(i: immutable.Seq[ByteString]): Boolean =
if (i.isEmpty)
true
else {
val head = i.head
val tail = i.tail
if (socket.send(head.payload.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail)
if (socket.send(head.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail)
else {
pendingSends.prepend(i) // Reenqueue the rest of the message so the next flush takes care of it
self ! Flush
@ -199,7 +199,7 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption])
case frames notifyListener(deserializer(frames)); doPoll(mode, togo - 1)
}
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): immutable.Seq[Frame] =
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[ByteString] = Vector.empty): immutable.Seq[ByteString] =
if (mode == PollCareful && (poller.poll(0) <= 0)) {
if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!")
} else {
@ -207,7 +207,7 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption])
case null /*EAGAIN*/
if (currentFrames.isEmpty) currentFrames else receiveMessage(mode, currentFrames)
case bytes
val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes)
val frames = currentFrames :+ ByteString(bytes)
if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames
}
}

View file

@ -9,6 +9,10 @@ import scala.concurrent.duration._
import scala.collection.immutable
import org.zeromq.{ ZMQ JZMQ }
import org.zeromq.ZMQ.{ Poller, Socket }
import akka.japi.Util.immutableSeq
import akka.util.ByteString
import akka.util.Collections.EmptyImmutableSeq
import annotation.varargs
/**
* Marker trait representing request messages for zeromq
@ -37,7 +41,7 @@ sealed trait SocketConnectOption extends SocketOption {
* A base trait for pubsub options for the ZeroMQ socket
*/
sealed trait PubSubOption extends SocketOption {
def payload: immutable.Seq[Byte]
def payload: ByteString
}
/**
@ -80,7 +84,7 @@ class Context(numIoThreads: Int) extends SocketMeta {
* A base trait for message deserializers
*/
trait Deserializer extends SocketOption {
def apply(frames: immutable.Seq[Frame]): Any
def apply(frames: immutable.Seq[ByteString]): Any
}
/**
@ -173,12 +177,15 @@ case class Bind(endpoint: String) extends SocketConnectOption
*
* @param payload the topic to subscribe to
*/
case class Subscribe(payload: immutable.Seq[Byte]) extends PubSubOption {
def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq])
case class Subscribe(payload: ByteString) extends PubSubOption {
def this(topic: String) = this(ByteString(topic))
}
object Subscribe {
def apply(topic: String): Subscribe = new Subscribe(topic)
val all = Subscribe("")
val all = Subscribe(ByteString.empty)
def apply(topic: String): Subscribe = topic match {
case null | "" all
case t new Subscribe(t)
}
}
/**
@ -190,8 +197,8 @@ object Subscribe {
*
* @param payload
*/
case class Unsubscribe(payload: immutable.Seq[Byte]) extends PubSubOption {
def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq])
case class Unsubscribe(payload: ByteString) extends PubSubOption {
def this(topic: String) = this(ByteString(topic))
}
object Unsubscribe {
def apply(topic: String): Unsubscribe = new Unsubscribe(topic)
@ -201,33 +208,34 @@ object Unsubscribe {
* Send a message over the zeromq socket
* @param frames
*/
case class Send(frames: immutable.Seq[Frame]) extends Request
case class Send(frames: immutable.Seq[ByteString]) extends Request
/**
* A message received over the zeromq socket
* @param frames
*/
case class ZMQMessage(frames: immutable.Seq[Frame]) {
def this(frame: Frame) = this(List(frame))
def this(frame1: Frame, frame2: Frame) = this(List(frame1, frame2))
def this(frameArray: Array[Frame]) = this(frameArray.to[immutable.Seq])
/**
* Convert the bytes in the first frame to a String, using specified charset.
*/
def firstFrameAsString(charsetName: String): String = new String(frames.head.payload.toArray, charsetName)
/**
* Convert the bytes in the first frame to a String, using "UTF-8" charset.
*/
def firstFrameAsString: String = firstFrameAsString("UTF-8")
def payload(frameIndex: Int): Array[Byte] = frames(frameIndex).payload.toArray
case class ZMQMessage(frames: immutable.Seq[ByteString]) {
def frame(frameIndex: Int): ByteString = frames(frameIndex)
}
object ZMQMessage {
def apply(bytes: Array[Byte]): ZMQMessage = new ZMQMessage(List(Frame(bytes)))
def apply(frames: Frame*): ZMQMessage = new ZMQMessage(frames.to[immutable.Seq])
def apply(message: Message): ZMQMessage = apply(message.toByteArray)
val empty = new ZMQMessage(EmptyImmutableSeq)
/**
* Scala API
* @param frames the frames of the returned ZMQMessage
* @return a ZMQMessage with the given frames
*/
def apply(frames: ByteString*): ZMQMessage =
if ((frames eq null) || frames.length == 0) empty else new ZMQMessage(frames.to[immutable.Seq])
/**
* Java API
* @param frames the frames of the returned ZMQMessage
* @return a ZMQMessage with the given frames
*/
@varargs def withFrames(frames: ByteString*): ZMQMessage = apply(frames: _*)
def apply[T](frames: T*)(implicit converter: T ByteString): ZMQMessage = apply(frames map converter: _*)
}
/**

View file

@ -4,24 +4,11 @@
package akka.zeromq
import scala.collection.immutable
object Frame {
def apply(bytes: Array[Byte]): Frame = new Frame(bytes)
def apply(text: String): Frame = new Frame(text)
}
/**
* A single message frame of a zeromq message
* @param payload
*/
case class Frame(payload: immutable.Seq[Byte]) {
def this(bytes: Array[Byte]) = this(bytes.to[immutable.Seq])
def this(text: String) = this(text.getBytes("UTF-8"))
}
import akka.util.ByteString
/**
* Deserializes ZeroMQ messages into an immutable sequence of frames
*/
class ZMQMessageDeserializer extends Deserializer {
def apply(frames: immutable.Seq[Frame]): ZMQMessage = ZMQMessage(frames)
def apply(frames: immutable.Seq[ByteString]): ZMQMessage = ZMQMessage(frames)
}

View file

@ -9,7 +9,7 @@ import org.scalatest.matchers.MustMatchers
import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec }
import scala.concurrent.duration._
import akka.actor.{ Cancellable, Actor, Props, ActorRef }
import akka.util.Timeout
import akka.util.{ ByteString, Timeout }
class ConcurrentSocketActorSpec extends AkkaSpec {
@ -51,7 +51,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable {
var number = 0
def run() {
publisher ! ZMQMessage(Frame(number.toString), Frame(Nil))
publisher ! ZMQMessage(ByteString(number.toString), ByteString.empty)
number += 1
}
})
@ -60,9 +60,9 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
subscriberProbe.expectMsg(Connecting)
val msgNumbers = subscriberProbe.receiveWhile(2 seconds) {
case msg: ZMQMessage if msg.frames.size == 2
msg.payload(1).length must be(0)
msg.frames(1).length must be(0)
msg
}.map(_.firstFrameAsString.toInt)
}.map(m m.frames(0).utf8String.toInt)
msgNumbers.length must be > 0
msgNumbers must equal(for (i msgNumbers.head to msgNumbers.last) yield i)
} finally {
@ -88,8 +88,8 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
try {
replierProbe.expectMsg(Connecting)
val request = ZMQMessage(Frame("Request"))
val reply = ZMQMessage(Frame("Reply"))
val request = ZMQMessage(ByteString("Request"))
val reply = ZMQMessage(ByteString("Reply"))
requester ! request
replierProbe.expectMsg(request)
@ -112,7 +112,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
try {
pullerProbe.expectMsg(Connecting)
val message = ZMQMessage(Frame("Pushed message"))
val message = ZMQMessage(ByteString("Pushed message"))
pusher ! message
pullerProbe.expectMsg(message)
@ -146,7 +146,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
case _
val payload = "%s".format(messageNumber)
messageNumber += 1
actorRef ! ZMQMessage(payload.getBytes)
actorRef ! ZMQMessage(ByteString(payload))
}
}
}