From 24566c167c4958fad82ca6c4ca1e470705abb618 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 14 Dec 2012 18:25:04 +0100 Subject: [PATCH] #2673 + #2674 - Seq[Byte] becomes ByteString and Frame gets removed, lots of cleanup in ZMQ API --- .../akka/serialization/Serialization.scala | 8 +-- .../src/main/scala/akka/util/ByteString.scala | 21 ++++++ .../code/docs/zeromq/ZeromqDocTestBase.java | 41 +++++++----- .../project/migration-guide-2.1.x-2.2.x.rst | 11 +++- .../code/docs/zeromq/ZeromqDocSpec.scala | 26 ++++---- .../akka/zeromq/ConcurrentSocketActor.scala | 12 ++-- .../main/scala/akka/zeromq/SocketOption.scala | 64 +++++++++++-------- .../akka/zeromq/ZMQMessageDeserializer.scala | 17 +---- .../zeromq/ConcurrentSocketActorSpec.scala | 16 ++--- 9 files changed, 124 insertions(+), 92 deletions(-) diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 2fb6a37469..e0e52e8189 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -58,16 +58,16 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * using the optional type hint to the Serializer and the optional ClassLoader ot load it into. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize(bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_]]): Try[AnyRef] = - Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz)) + def deserialize[T](bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_ <: T]]): Try[T] = + Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz).asInstanceOf[T]) /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. * You can specify an optional ClassLoader to load the object into. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] = - Try(serializerFor(clazz).fromBinary(bytes, Some(clazz))) + def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = + Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T]) /** * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index afffa75ac4..70f929c11a 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -45,6 +45,11 @@ object ByteString { */ def apply(string: String, charset: String): ByteString = CompactByteString(string, charset) + /** + * Creates a new ByteString by copying a byte array. + */ + def fromArray(array: Array[Byte]): ByteString = apply(array) + /** * Creates a new ByteString by copying length bytes starting at offset from * an Array. @@ -52,6 +57,16 @@ object ByteString { def fromArray(array: Array[Byte], offset: Int, length: Int): ByteString = CompactByteString.fromArray(array, offset, length) + /** + * Creates a new ByteString which will contain the UTF-8 representation of the given String + */ + def fromString(string: String): ByteString = apply(string) + + /** + * Creates a new ByteString which will contain the representation of the given String in the given charset + */ + def fromString(string: String, charset: String): ByteString = apply(string, charset) + val empty: ByteString = CompactByteString(Array.empty[Byte]) def newBuilder: ByteStringBuilder = new ByteStringBuilder @@ -282,6 +297,12 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz override def indexWhere(p: Byte ⇒ Boolean): Int = iterator.indexWhere(p) override def indexOf[B >: Byte](elem: B): Int = iterator.indexOf(elem) + /** + * JAVA API + * @return this ByteString copied into a byte array + */ + protected[ByteString] def toArray: Array[Byte] = toArray[Byte] // protected[ByteString] == public to Java but hidden to Scala * fnizz * + override def toArray[B >: Byte](implicit arg0: ClassTag[B]): Array[B] = iterator.toArray override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = iterator.copyToArray(xs, start, len) diff --git a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java index 9ec3bc49f9..8c54ed1058 100644 --- a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java +++ b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java @@ -16,7 +16,7 @@ import akka.zeromq.Subscribe; import akka.zeromq.Unsubscribe; //#import-unsub-topic-socket //#import-pub-topic -import akka.zeromq.Frame; +import akka.util.ByteString; import akka.zeromq.ZMQMessage; //#import-pub-topic @@ -96,7 +96,7 @@ public class ZeromqDocTestBase { byte[] payload = new byte[0]; //#pub-topic - pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload)), null); + pubSocket.tell(ZMQMessage.withFrames(ByteString.fromString("foo.bar"), ByteString.fromArray(payload)), null); //#pub-topic system.stop(subSocket); @@ -136,7 +136,7 @@ public class ZeromqDocTestBase { private boolean checkZeroMQInstallation() { try { ZeroMQVersion v = ZeroMQExtension.get(system).version(); - return (v.major() == 2 && v.minor() == 1); + return (v.major() >= 3 || (v.major() >= 2 && v.minor() >= 1)); } catch (LinkageError e) { return false; } @@ -213,18 +213,23 @@ public class ZeromqDocTestBase { long timestamp = System.currentTimeMillis(); // use akka SerializationExtension to convert to bytes - byte[] heapPayload = ser.serializerFor(Heap.class).toBinary( - new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax())); + ByteString heapTopic = ByteString.fromString("health.heap", "UTF-8"); + ByteString heapPayload = ByteString.fromArray( + ser.serialize( + new Heap(timestamp, + currentHeap.getUsed(), + currentHeap.getMax()) + ).get()); // the first frame is the topic, second is the message - pubSocket.tell(new ZMQMessage(new Frame("health.heap"), - new Frame(heapPayload)), getSelf()); + pubSocket.tell(ZMQMessage.withFrames(heapTopic, heapPayload), getSelf()); // use akka SerializationExtension to convert to bytes - byte[] loadPayload = ser.serializerFor(Load.class).toBinary( - new Load(timestamp, os.getSystemLoadAverage())); + ByteString loadTopic = ByteString.fromString("health.load", "UTF-8"); + ByteString loadPayload = ByteString.fromArray( + ser.serialize(new Load(timestamp, os.getSystemLoadAverage())).get() + ); // the first frame is the topic, second is the message - pubSocket.tell(new ZMQMessage(new Frame("health.load"), - new Frame(loadPayload)), getSelf()); + pubSocket.tell(ZMQMessage.withFrames(loadTopic, loadPayload), getSelf()); } else { unhandled(message); } @@ -248,13 +253,14 @@ public class ZeromqDocTestBase { public void onReceive(Object message) { if (message instanceof ZMQMessage) { ZMQMessage m = (ZMQMessage) message; + String topic = m.frame(0).utf8String(); // the first frame is the topic, second is the message - if (m.firstFrameAsString().equals("health.heap")) { - Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1)); + if ("health.heap".equals(topic)) { + Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get(); log.info("Used heap {} bytes, at {}", heap.used, timestampFormat.format(new Date(heap.timestamp))); - } else if (m.firstFrameAsString().equals("health.load")) { - Load load = (Load) ser.serializerFor(Load.class).fromBinary(m.payload(1)); + } else if ("health.load".equals(topic)) { + Load load = ser.deserialize(m.frame(1).toArray(), Load.class).get(); log.info("Load average {}, at {}", load.loadAverage, timestampFormat.format(new Date(load.timestamp))); } @@ -282,9 +288,10 @@ public class ZeromqDocTestBase { public void onReceive(Object message) { if (message instanceof ZMQMessage) { ZMQMessage m = (ZMQMessage) message; + String topic = m.frame(0).utf8String(); // the first frame is the topic, second is the message - if (m.firstFrameAsString().equals("health.heap")) { - Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1)); + if ("health.heap".equals(topic)) { + Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get(); if (((double) heap.used / heap.max) > 0.9) { count += 1; } else { diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index 155d1dd1ab..1431c954f8 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -32,10 +32,19 @@ The ``timerActive_?`` method has been deprecated in both the ``FSM`` trait and t class. You should now use the ``isTimerActive`` method instead. The old method will remain throughout 2.2.x. It will be removed in Akka 2.3. + ThreadPoolConfigBuilder ======================= ``akka.dispatch.ThreadPoolConfigBuilder`` companion object has been removed, and with it the ``conf_?`` method that was essentially only a type-inferencer aid for creation of optional transformations on ``ThreadPoolConfigBuilder``. -Instead use: ``option.map(o => (t: ThreadPoolConfigBuilder) => t.op(o))``. \ No newline at end of file +Instead use: ``option.map(o => (t: ThreadPoolConfigBuilder) => t.op(o))``. + + +ZeroMQ ByteString +================= + +``akka.zeromq.Frame`` and the use of ``Seq[Byte]`` in the API has been removed and is replaced by ``akka.util.ByteString``. + +``ZMQMessage.firstFrameAsString`` has been removed, please use ``ZMQMessage.frames`` or ``ZMQMessage.frame(int)`` to access the frames. diff --git a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala index ab2e4f4d27..ca2db8f675 100644 --- a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala @@ -6,8 +6,8 @@ package docs.zeromq import language.postfixOps import scala.concurrent.duration._ -import scala.collection.immutable import akka.actor.{ Actor, Props } +import akka.util.ByteString import akka.testkit._ import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind } import java.text.SimpleDateFormat @@ -53,12 +53,12 @@ object ZeromqDocSpec { val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get // the first frame is the topic, second is the message - pubSocket ! ZMQMessage(immutable.Seq(Frame("health.heap"), Frame(heapPayload))) + pubSocket ! ZMQMessage(ByteString("health.heap"), ByteString(heapPayload)) // use akka SerializationExtension to convert to bytes val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get // the first frame is the topic, second is the message - pubSocket ! ZMQMessage(immutable.Seq(Frame("health.load"), Frame(loadPayload))) + pubSocket ! ZMQMessage(ByteString("health.load"), ByteString(loadPayload)) } } //#health @@ -73,14 +73,14 @@ object ZeromqDocSpec { def receive = { // the first frame is the topic, second is the message - case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒ - val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), + case m: ZMQMessage if m.frames(0).utf8String == "health.heap" ⇒ + val Heap(timestamp, used, max) = ser.deserialize(m.frames(1).toArray, classOf[Heap]).get log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp))) - case m: ZMQMessage if m.firstFrameAsString == "health.load" ⇒ - val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1), + case m: ZMQMessage if m.frames(0).utf8String == "health.load" ⇒ + val Load(timestamp, loadAverage) = ser.deserialize(m.frames(1).toArray, classOf[Load]).get log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp))) @@ -98,9 +98,8 @@ object ZeromqDocSpec { def receive = { // the first frame is the topic, second is the message - case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒ - val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), - classOf[Heap]).get + case m: ZMQMessage if m.frames(0).utf8String == "health.heap" ⇒ + val Heap(timestamp, used, max) = ser.deserialize(m.frames(1).toArray, classOf[Heap]).get if ((used.toDouble / max) > 0.9) count += 1 else count = 0 if (count > 10) log.warning("Need more memory, using {} %", @@ -147,7 +146,7 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") { val payload = Array.empty[Byte] //#pub-topic - pubSocket ! ZMQMessage(Frame("foo.bar"), Frame(payload)) + pubSocket ! ZMQMessage(ByteString("foo.bar"), ByteString(payload)) //#pub-topic system.stop(subSocket) @@ -188,8 +187,9 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") { def checkZeroMQInstallation() = try { ZeroMQExtension(system).version match { - case ZeroMQVersion(2, 1, _) ⇒ Unit - case version ⇒ pending + case ZeroMQVersion(2, x, _) if x >= 1 ⇒ Unit + case ZeroMQVersion(y, _, _) if y >= 3 ⇒ Unit + case version ⇒ pending } } catch { case e: LinkageError ⇒ pending diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index a9efa56c1e..f91ea2e318 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -14,6 +14,7 @@ import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal import akka.event.Logging import java.util.concurrent.TimeUnit +import akka.util.ByteString private[zeromq] object ConcurrentSocketActor { private sealed trait PollMsg @@ -29,7 +30,6 @@ private[zeromq] object ConcurrentSocketActor { private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) extends Actor { import ConcurrentSocketActor._ - private val noBytes = Array[Byte]() private val zmqContext = params collectFirst { case c: Context ⇒ c } getOrElse DefaultContext private var deserializer = params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer @@ -41,7 +41,7 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) private val socket: Socket = zmqContext.socket(socketType) private val poller: Poller = zmqContext.poller - private val pendingSends = new ListBuffer[immutable.Seq[Frame]] + private val pendingSends = new ListBuffer[immutable.Seq[ByteString]] def receive = { case m: PollMsg ⇒ doPoll(m) @@ -152,13 +152,13 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) } } finally notifyListener(Closed) - @tailrec private def flushMessage(i: immutable.Seq[Frame]): Boolean = + @tailrec private def flushMessage(i: immutable.Seq[ByteString]): Boolean = if (i.isEmpty) true else { val head = i.head val tail = i.tail - if (socket.send(head.payload.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail) + if (socket.send(head.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail) else { pendingSends.prepend(i) // Reenqueue the rest of the message so the next flush takes care of it self ! Flush @@ -199,7 +199,7 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) case frames ⇒ notifyListener(deserializer(frames)); doPoll(mode, togo - 1) } - @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): immutable.Seq[Frame] = + @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[ByteString] = Vector.empty): immutable.Seq[ByteString] = if (mode == PollCareful && (poller.poll(0) <= 0)) { if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!") } else { @@ -207,7 +207,7 @@ private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) case null ⇒ /*EAGAIN*/ if (currentFrames.isEmpty) currentFrames else receiveMessage(mode, currentFrames) case bytes ⇒ - val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes) + val frames = currentFrames :+ ByteString(bytes) if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames } } diff --git a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala index b70c245327..b74760b5c3 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala @@ -9,6 +9,10 @@ import scala.concurrent.duration._ import scala.collection.immutable import org.zeromq.{ ZMQ ⇒ JZMQ } import org.zeromq.ZMQ.{ Poller, Socket } +import akka.japi.Util.immutableSeq +import akka.util.ByteString +import akka.util.Collections.EmptyImmutableSeq +import annotation.varargs /** * Marker trait representing request messages for zeromq @@ -37,7 +41,7 @@ sealed trait SocketConnectOption extends SocketOption { * A base trait for pubsub options for the ZeroMQ socket */ sealed trait PubSubOption extends SocketOption { - def payload: immutable.Seq[Byte] + def payload: ByteString } /** @@ -80,7 +84,7 @@ class Context(numIoThreads: Int) extends SocketMeta { * A base trait for message deserializers */ trait Deserializer extends SocketOption { - def apply(frames: immutable.Seq[Frame]): Any + def apply(frames: immutable.Seq[ByteString]): Any } /** @@ -173,12 +177,15 @@ case class Bind(endpoint: String) extends SocketConnectOption * * @param payload the topic to subscribe to */ -case class Subscribe(payload: immutable.Seq[Byte]) extends PubSubOption { - def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq]) +case class Subscribe(payload: ByteString) extends PubSubOption { + def this(topic: String) = this(ByteString(topic)) } object Subscribe { - def apply(topic: String): Subscribe = new Subscribe(topic) - val all = Subscribe("") + val all = Subscribe(ByteString.empty) + def apply(topic: String): Subscribe = topic match { + case null | "" ⇒ all + case t ⇒ new Subscribe(t) + } } /** @@ -190,8 +197,8 @@ object Subscribe { * * @param payload */ -case class Unsubscribe(payload: immutable.Seq[Byte]) extends PubSubOption { - def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq]) +case class Unsubscribe(payload: ByteString) extends PubSubOption { + def this(topic: String) = this(ByteString(topic)) } object Unsubscribe { def apply(topic: String): Unsubscribe = new Unsubscribe(topic) @@ -201,33 +208,34 @@ object Unsubscribe { * Send a message over the zeromq socket * @param frames */ -case class Send(frames: immutable.Seq[Frame]) extends Request +case class Send(frames: immutable.Seq[ByteString]) extends Request /** * A message received over the zeromq socket * @param frames */ -case class ZMQMessage(frames: immutable.Seq[Frame]) { - - def this(frame: Frame) = this(List(frame)) - def this(frame1: Frame, frame2: Frame) = this(List(frame1, frame2)) - def this(frameArray: Array[Frame]) = this(frameArray.to[immutable.Seq]) - - /** - * Convert the bytes in the first frame to a String, using specified charset. - */ - def firstFrameAsString(charsetName: String): String = new String(frames.head.payload.toArray, charsetName) - /** - * Convert the bytes in the first frame to a String, using "UTF-8" charset. - */ - def firstFrameAsString: String = firstFrameAsString("UTF-8") - - def payload(frameIndex: Int): Array[Byte] = frames(frameIndex).payload.toArray +case class ZMQMessage(frames: immutable.Seq[ByteString]) { + def frame(frameIndex: Int): ByteString = frames(frameIndex) } object ZMQMessage { - def apply(bytes: Array[Byte]): ZMQMessage = new ZMQMessage(List(Frame(bytes))) - def apply(frames: Frame*): ZMQMessage = new ZMQMessage(frames.to[immutable.Seq]) - def apply(message: Message): ZMQMessage = apply(message.toByteArray) + val empty = new ZMQMessage(EmptyImmutableSeq) + + /** + * Scala API + * @param frames the frames of the returned ZMQMessage + * @return a ZMQMessage with the given frames + */ + def apply(frames: ByteString*): ZMQMessage = + if ((frames eq null) || frames.length == 0) empty else new ZMQMessage(frames.to[immutable.Seq]) + + /** + * Java API + * @param frames the frames of the returned ZMQMessage + * @return a ZMQMessage with the given frames + */ + @varargs def withFrames(frames: ByteString*): ZMQMessage = apply(frames: _*) + + def apply[T](frames: T*)(implicit converter: T ⇒ ByteString): ZMQMessage = apply(frames map converter: _*) } /** diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala index d0141bf515..3325fc2c4b 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala @@ -4,24 +4,11 @@ package akka.zeromq import scala.collection.immutable - -object Frame { - def apply(bytes: Array[Byte]): Frame = new Frame(bytes) - def apply(text: String): Frame = new Frame(text) -} - -/** - * A single message frame of a zeromq message - * @param payload - */ -case class Frame(payload: immutable.Seq[Byte]) { - def this(bytes: Array[Byte]) = this(bytes.to[immutable.Seq]) - def this(text: String) = this(text.getBytes("UTF-8")) -} +import akka.util.ByteString /** * Deserializes ZeroMQ messages into an immutable sequence of frames */ class ZMQMessageDeserializer extends Deserializer { - def apply(frames: immutable.Seq[Frame]): ZMQMessage = ZMQMessage(frames) + def apply(frames: immutable.Seq[ByteString]): ZMQMessage = ZMQMessage(frames) } diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index 6feaffd6d6..e17ca6fb1f 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -9,7 +9,7 @@ import org.scalatest.matchers.MustMatchers import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec } import scala.concurrent.duration._ import akka.actor.{ Cancellable, Actor, Props, ActorRef } -import akka.util.Timeout +import akka.util.{ ByteString, Timeout } class ConcurrentSocketActorSpec extends AkkaSpec { @@ -51,7 +51,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable { var number = 0 def run() { - publisher ! ZMQMessage(Frame(number.toString), Frame(Nil)) + publisher ! ZMQMessage(ByteString(number.toString), ByteString.empty) number += 1 } }) @@ -60,9 +60,9 @@ class ConcurrentSocketActorSpec extends AkkaSpec { subscriberProbe.expectMsg(Connecting) val msgNumbers = subscriberProbe.receiveWhile(2 seconds) { case msg: ZMQMessage if msg.frames.size == 2 ⇒ - msg.payload(1).length must be(0) + msg.frames(1).length must be(0) msg - }.map(_.firstFrameAsString.toInt) + }.map(m ⇒ m.frames(0).utf8String.toInt) msgNumbers.length must be > 0 msgNumbers must equal(for (i ← msgNumbers.head to msgNumbers.last) yield i) } finally { @@ -88,8 +88,8 @@ class ConcurrentSocketActorSpec extends AkkaSpec { try { replierProbe.expectMsg(Connecting) - val request = ZMQMessage(Frame("Request")) - val reply = ZMQMessage(Frame("Reply")) + val request = ZMQMessage(ByteString("Request")) + val reply = ZMQMessage(ByteString("Reply")) requester ! request replierProbe.expectMsg(request) @@ -112,7 +112,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { try { pullerProbe.expectMsg(Connecting) - val message = ZMQMessage(Frame("Pushed message")) + val message = ZMQMessage(ByteString("Pushed message")) pusher ! message pullerProbe.expectMsg(message) @@ -146,7 +146,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { case _ ⇒ val payload = "%s".format(messageNumber) messageNumber += 1 - actorRef ! ZMQMessage(payload.getBytes) + actorRef ! ZMQMessage(ByteString(payload)) } } }