From a5c55fd017c972cdeaab659c106871cab4917d19 Mon Sep 17 00:00:00 2001 From: Ivan Porto Carrero Date: Sat, 14 Jan 2012 03:16:39 +0100 Subject: [PATCH] Adds initial version of zeromq support for akka 2.0 --- akka-actor/src/main/resources/reference.conf | 6 + .../akka/zeromq/ConcurrentSocketActor.scala | 207 ++++++++++++++++++ .../src/main/scala/akka/zeromq/Context.scala | 20 ++ .../main/scala/akka/zeromq/Deserializer.scala | 15 ++ .../src/main/scala/akka/zeromq/Requests.scala | 104 +++++++++ .../main/scala/akka/zeromq/Responses.scala | 8 + .../main/scala/akka/zeromq/SocketType.scala | 14 ++ .../scala/akka/zeromq/ZeroMQExtension.scala | 65 ++++++ .../zeromq/ConcurrentSocketActorSpec.scala | 149 +++++++++++++ project/AkkaBuild.scala | 14 ++ 10 files changed, 602 insertions(+) create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/Context.scala create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/Deserializer.scala create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/Requests.scala create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/Responses.scala create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/SocketType.scala create mode 100644 akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala create mode 100644 akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 94efc34176..68b614ed28 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -146,6 +146,12 @@ akka { } } + zeromq-dispatcher { + # A zeromq socket needs to be pinned to the thread that created it. + # Changing this value results in weird errors and race conditions within zeromq + type = "PinnedDispatcher" + } + default-dispatcher { # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala new file mode 100644 index 0000000000..a1a340e282 --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +import org.zeromq.ZMQ.{ Socket, Poller } +import org.zeromq.{ ZMQ ⇒ JZMQ } +import akka.actor._ +import akka.dispatch.{ Promise, Dispatchers, Future } + +private[zeromq] sealed trait PollLifeCycle +private[zeromq] case object NoResults extends PollLifeCycle +private[zeromq] case object Results extends PollLifeCycle +private[zeromq] case object Closing extends PollLifeCycle + +private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Actor { + + private val noBytes = Array[Byte]() + private val socket: Socket = params.context.socket(params.socketType) + private val poller: Poller = params.context.poller + + override def receive: Receive = { + case Send(frames) ⇒ + sendFrames(frames) + pollAndReceiveFrames() + case ZMQMessage(frames) ⇒ + sendFrames(frames) + pollAndReceiveFrames() + case Connect(endpoint) ⇒ + socket.connect(endpoint) + notifyListener(Connecting) + pollAndReceiveFrames() + case Bind(endpoint) ⇒ + socket.bind(endpoint) + pollAndReceiveFrames() + case Subscribe(topic) ⇒ + socket.subscribe(topic.toArray) + pollAndReceiveFrames() + case Unsubscribe(topic) ⇒ + socket.unsubscribe(topic.toArray) + pollAndReceiveFrames() + case Linger(value) ⇒ + socket.setLinger(value) + case Linger ⇒ + sender ! socket.getLinger + case ReconnectIVL ⇒ + sender ! socket.getReconnectIVL + case ReconnectIVL(value) ⇒ + socket.setReconnectIVL(value) + case Backlog ⇒ + sender ! socket.getBacklog + case Backlog(value) ⇒ + socket.setBacklog(value) + case ReconnectIVLMax ⇒ + sender ! socket.getReconnectIVLMax + case ReconnectIVLMax(value) ⇒ + socket.setReconnectIVLMax(value) + case MaxMsgSize ⇒ + sender ! socket.getMaxMsgSize + case MaxMsgSize(value) ⇒ + socket.setMaxMsgSize(value) + case SndHWM ⇒ + sender ! socket.getSndHWM + case SndHWM(value) ⇒ + socket.setSndHWM(value) + case RcvHWM ⇒ + sender ! socket.getRcvHWM + case RcvHWM(value) ⇒ + socket.setRcvHWM(value) + case HWM(value) ⇒ + socket.setHWM(value) + case Swap ⇒ + sender ! socket.getSwap + case Swap(value) ⇒ + socket.setSwap(value) + case Affinity ⇒ + sender ! socket.getAffinity + case Affinity(value) ⇒ + socket.setAffinity(value) + case Identity ⇒ + sender ! socket.getIdentity + case Identity(value) ⇒ + socket.setIdentity(value) + case Rate ⇒ + sender ! socket.getRate + case Rate(value) ⇒ + socket.setRate(value) + case RecoveryInterval ⇒ + sender ! socket.getRecoveryInterval + case RecoveryInterval(value) ⇒ + socket.setRecoveryInterval(value) + case MulticastLoop ⇒ + sender ! socket.hasMulticastLoop + case MulticastLoop(value) ⇒ + socket.setMulticastLoop(value) + case MulticastHops ⇒ + sender ! socket.getMulticastHops + case MulticastHops(value) ⇒ + socket.setMulticastHops(value) + case ReceiveTimeOut ⇒ + sender ! socket.getReceiveTimeOut + case ReceiveTimeOut(value) ⇒ + socket.setReceiveTimeOut(value) + case SendTimeOut ⇒ + sender ! socket.getSendTimeOut + case SendTimeOut(value) ⇒ + socket.setSendTimeOut(value) + case SendBufferSize ⇒ + sender ! socket.getSendBufferSize + case SendBufferSize(value) ⇒ + socket.setSendBufferSize(value) + case ReceiveBufferSize ⇒ + sender ! socket.getReceiveBufferSize + case ReceiveBufferSize(value) ⇒ + socket.setReceiveBufferSize(value) + case ReceiveMore ⇒ + sender ! socket.hasReceiveMore + case FileDescriptor ⇒ + sender ! socket.getFD + case 'poll ⇒ { + currentPoll = None + pollAndReceiveFrames() + } + case 'receiveFrames ⇒ { + receiveFrames() match { + case Seq() ⇒ + case frames ⇒ notifyListener(params.deserializer(frames)) + } + self ! 'poll + } + } + + override def preStart { + poller.register(socket, Poller.POLLIN) + } + + override def postStop { + currentPoll foreach { _ complete Right(Closing) } + poller.unregister(socket) + socket.close + notifyListener(Closed) + } + + private def sendFrames(frames: Seq[Frame]) { + def sendBytes(bytes: Seq[Byte], flags: Int) { + socket.send(bytes.toArray, flags) + } + val iter = frames.iterator + while (iter.hasNext) { + val payload = iter.next.payload + val flags = if (iter.hasNext) JZMQ.SNDMORE else 0 + sendBytes(payload, flags) + } + } + + private var currentPoll: Option[Promise[PollLifeCycle]] = None + private def pollAndReceiveFrames() { + currentPoll = currentPoll orElse Some(newEventLoop) + } + + private def newEventLoop: Promise[PollLifeCycle] = { + implicit val executor = context.system.dispatchers.defaultGlobalDispatcher + (Future { + if (poller.poll(params.pollTimeoutDuration.toMillis) > 0 && poller.pollin(0)) Results else NoResults + }).asInstanceOf[Promise[PollLifeCycle]] onSuccess { + case Results ⇒ if (!self.isTerminated) self ! 'receiveFrames + case NoResults ⇒ if (!self.isTerminated) self ! 'poll + case _ ⇒ currentPoll = None + } onFailure { + case ex ⇒ { + if (context.system != null) { + context.system.log.error(ex, "There was an error receiving messages on the zeromq socket") + } + if (!self.isTerminated) self ! 'poll + } + } + } + + private def receiveFrames(): Seq[Frame] = { + + @inline def receiveBytes(): Array[Byte] = socket.recv(0) match { + case null ⇒ noBytes + case bytes: Array[Byte] if bytes.length > 0 ⇒ bytes + case _ ⇒ noBytes + } + + receiveBytes() match { + case `noBytes` ⇒ Vector.empty + case someBytes ⇒ + var frames = Vector(Frame(someBytes)) + while (socket.hasReceiveMore) receiveBytes() match { + case `noBytes` ⇒ + case someBytes ⇒ frames :+= Frame(someBytes) + } + frames + } + } + + private def notifyListener(message: Any) { + params.listener.foreach { listener ⇒ + if (listener.isTerminated) + context stop self + else + listener ! message + } + } +} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/Context.scala b/akka-zeromq/src/main/scala/akka/zeromq/Context.scala new file mode 100644 index 0000000000..073270436f --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/Context.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +import org.zeromq.{ ZMQ ⇒ JZMQ } +import akka.zeromq.SocketType._ + +class Context(numIoThreads: Int) { + private var context = JZMQ.context(numIoThreads) + def socket(socketType: SocketType) = { + context.socket(socketType.id) + } + def poller = { + context.poller + } + def term = { + context.term + } +} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/Deserializer.scala b/akka-zeromq/src/main/scala/akka/zeromq/Deserializer.scala new file mode 100644 index 0000000000..6430a5a9c6 --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/Deserializer.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +case class Frame(payload: Seq[Byte]) +object Frame { def apply(s: String): Frame = Frame(s.getBytes) } + +trait Deserializer { + def apply(frames: Seq[Frame]): Any +} + +class ZMQMessageDeserializer extends Deserializer { + def apply(frames: Seq[Frame]) = ZMQMessage(frames) +} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/Requests.scala b/akka-zeromq/src/main/scala/akka/zeromq/Requests.scala new file mode 100644 index 0000000000..a1ce8f6c5e --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/Requests.scala @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +import com.google.protobuf.Message + +sealed trait Request +sealed trait SocketOption extends Request +sealed trait SocketOptionQuery extends Request + +case class Connect(endpoint: String) extends Request +case class Bind(endpoint: String) extends Request +private[zeromq] case object Close extends Request + +case class Subscribe(payload: Seq[Byte]) extends Request +object Subscribe { + def apply(topic: String): Subscribe = { + Subscribe(topic.getBytes) + } +} + +case class Unsubscribe(payload: Seq[Byte]) extends Request +object Unsubscribe { + def apply(topic: String): Unsubscribe = { + Unsubscribe(topic.getBytes) + } +} + +case class Send(frames: Seq[Frame]) extends Request + +case class ZMQMessage(frames: Seq[Frame]) { + def firstFrameAsString = { + new String(frames.head.payload.toArray) + } +} +object ZMQMessage { + def apply(bytes: Array[Byte]): ZMQMessage = { + ZMQMessage(Seq(Frame(bytes))) + } + def apply(message: Message): ZMQMessage = { + ZMQMessage(message.toByteArray) + } +} + +case class Linger(value: Long) extends SocketOption +object Linger extends SocketOptionQuery + +case class ReconnectIVL(value: Long) extends SocketOption +object ReconnectIVL extends SocketOptionQuery + +case class Backlog(value: Long) extends SocketOption +object Backlog extends SocketOptionQuery + +case class ReconnectIVLMax(value: Long) extends SocketOption +object ReconnectIVLMax extends SocketOptionQuery + +case class MaxMsgSize(value: Long) extends SocketOption +object MaxMsgSize extends SocketOptionQuery + +case class SndHWM(value: Long) extends SocketOption +object SndHWM extends SocketOptionQuery + +case class RcvHWM(value: Long) extends SocketOption +object RcvHWM extends SocketOptionQuery + +case class HWM(value: Long) extends SocketOption +/* object HWM extends SocketOptionQuery */ + +case class Swap(value: Long) extends SocketOption +object Swap extends SocketOptionQuery + +case class Affinity(value: Long) extends SocketOption +object Affinity extends SocketOptionQuery + +case class Identity(value: Array[Byte]) extends SocketOption +object Identity extends SocketOptionQuery + +case class Rate(value: Long) extends SocketOption +object Rate extends SocketOptionQuery + +case class RecoveryInterval(value: Long) extends SocketOption +object RecoveryInterval extends SocketOptionQuery + +case class MulticastLoop(value: Boolean) extends SocketOption +object MulticastLoop extends SocketOptionQuery + +case class MulticastHops(value: Long) extends SocketOption +object MulticastHops extends SocketOptionQuery + +case class ReceiveTimeOut(value: Long) extends SocketOption +object ReceiveTimeOut extends SocketOptionQuery + +case class SendTimeOut(value: Long) extends SocketOption +object SendTimeOut extends SocketOptionQuery + +case class SendBufferSize(value: Long) extends SocketOption +object SendBufferSize extends SocketOptionQuery + +case class ReceiveBufferSize(value: Long) extends SocketOption +object ReceiveBufferSize extends SocketOptionQuery + +object ReceiveMore extends SocketOptionQuery +object FileDescriptor extends SocketOptionQuery diff --git a/akka-zeromq/src/main/scala/akka/zeromq/Responses.scala b/akka-zeromq/src/main/scala/akka/zeromq/Responses.scala new file mode 100644 index 0000000000..43200a959c --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/Responses.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +sealed trait Response +case object Connecting extends Response +case object Closed extends Response diff --git a/akka-zeromq/src/main/scala/akka/zeromq/SocketType.scala b/akka-zeromq/src/main/scala/akka/zeromq/SocketType.scala new file mode 100644 index 0000000000..aba0c1608a --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/SocketType.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +import org.zeromq.{ ZMQ ⇒ JZMQ } + +object SocketType extends Enumeration { + type SocketType = Value + val Pub = Value(JZMQ.PUB) + val Sub = Value(JZMQ.SUB) + val Dealer = Value(JZMQ.DEALER) + val Router = Value(JZMQ.ROUTER) +} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala new file mode 100644 index 0000000000..2515647050 --- /dev/null +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +import akka.util.Duration +import akka.util.duration._ +import akka.zeromq.SocketType._ +import org.zeromq.{ ZMQ ⇒ JZMQ } +import akka.actor._ +import akka.dispatch.Await + +case class SocketParameters( + socketType: SocketType, + context: Context, + listener: Option[ActorRef] = None, + deserializer: Deserializer = new ZMQMessageDeserializer, + pollTimeoutDuration: Duration = 100 millis) + +case class ZeroMQVersion(major: Int, minor: Int, patch: Int) { + override def toString = "%d.%d.%d".format(major, minor, patch) +} + +object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new ZeroMQExtension(system) +} +class ZeroMQExtension(system: ActorSystem) extends Extension { + + def version = { + ZeroMQVersion(JZMQ.getMajorVersion, JZMQ.getMinorVersion, JZMQ.getPatchVersion) + } + + lazy val DefaultContext = newContext() + + def newContext(numIoThreads: Int = 1) = { + verifyZeroMQVersion + new Context(numIoThreads) + } + + def newSocket(socketType: SocketType, + listener: Option[ActorRef] = None, + context: Context = DefaultContext, // For most applications you want to use the default context + deserializer: Deserializer = new ZMQMessageDeserializer, + pollTimeoutDuration: Duration = 100 millis) = { + verifyZeroMQVersion + val params = SocketParameters(socketType, context, listener, deserializer, pollTimeoutDuration) + implicit val timeout = system.settings.ActorTimeout + val req = (zeromq ? Props(new ConcurrentSocketActor(params)).withDispatcher("zmqdispatcher")).mapTo[ActorRef] + Await.result(req, timeout.duration) + } + + val zeromq: ActorRef = { + verifyZeroMQVersion + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + protected def receive = { case p: Props ⇒ sender ! context.actorOf(p) } + }), "zeromq") + } + + private def verifyZeroMQVersion = { + require( + JZMQ.getFullVersion > JZMQ.makeVersion(2, 1, 0), + "Unsupported ZeroMQ version: %s".format(JZMQ.getVersionString)) + } +} diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala new file mode 100644 index 0000000000..3b7af45196 --- /dev/null +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.zeromq + +import org.scalatest.matchers.MustMatchers +import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec } +import akka.actor.{ Actor, Props, ActorRef } +import akka.util.Timeout +import akka.util.duration._ +import java.net.{ SocketException, ConnectException, Socket } +import util.Random + +object ConcurrentSocketActorSpec { + val config = """ +akka { + extensions = ["akka.zeromq.ZeroMQExtension$"] + actor { + zmqdispatcher { + type = "PinnedDispatcher" + } + } +} +""" +} + +class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.config) with MustMatchers with DefaultTimeout { + + val endpoint = "tcp://127.0.0.1:%s" format FreePort.randomFreePort() + + def zmq = system.extension(ZeroMQExtension) + + "ConcurrentSocketActor" should { + "support pub-sub connections" in { + checkZeroMQInstallation + val (publisherProbe, subscriberProbe) = (TestProbe(), TestProbe()) + val context = zmq.newContext() + val publisher = newPublisher(context, publisherProbe.ref) + val subscriber = newSubscriber(context, subscriberProbe.ref) + val msgGenerator = newMessageGenerator(publisher) + + try { + subscriberProbe.expectMsg(Connecting) + val msgNumbers = subscriberProbe.receiveWhile(2 seconds) { + case msg: ZMQMessage ⇒ { + println("RECV: " + msg.firstFrameAsString) + msg + } + }.map(_.firstFrameAsString.toInt) + msgNumbers.length must be > 0 + msgNumbers must equal(for (i ← msgNumbers.head to msgNumbers.last) yield i) + } finally { + system stop msgGenerator + within(2 seconds) { awaitCond(msgGenerator.isTerminated) } + system stop subscriber + system stop publisher + subscriberProbe.receiveWhile(1 seconds) { + case msg ⇒ msg + }.last must equal(Closed) + context.term + } + } + "support zero-length message frames" in { + checkZeroMQInstallation + val publisherProbe = TestProbe() + val context = zmq.newContext() + val publisher = newPublisher(context, publisherProbe.ref) + + try { + publisher ! ZMQMessage(Seq[Frame]()) + } finally { + system stop publisher + publisherProbe.within(5 seconds) { + publisherProbe.expectMsg(Closed) + } + context.term + } + } + def newPublisher(context: Context, listener: ActorRef) = { + val publisher = zmq.newSocket(SocketType.Pub, context = context, listener = Some(listener)) + publisher ! Bind(endpoint) + publisher + } + def newSubscriber(context: Context, listener: ActorRef) = { + val subscriber = zmq.newSocket(SocketType.Sub, context = context, listener = Some(listener)) + subscriber ! Connect(endpoint) + subscriber ! Subscribe(Seq()) + subscriber + } + def newMessageGenerator(actorRef: ActorRef) = { + system.actorOf(Props(new MessageGeneratorActor(actorRef)).withTimeout(Timeout(10 millis))) + } + def checkZeroMQInstallation = try { + zmq.version match { + case ZeroMQVersion(2, 1, _) ⇒ Unit + case version ⇒ invalidZeroMQVersion(version) + } + } catch { + case e: LinkageError ⇒ zeroMQNotInstalled + } + def invalidZeroMQVersion(version: ZeroMQVersion) { + info("WARNING: The tests are not run because invalid ZeroMQ version: %s. Version >= 2.1.x required.".format(version)) + pending + } + def zeroMQNotInstalled { + info("WARNING: The tests are not run because ZeroMQ is not installed. Version >= 2.1.x required.") + pending + } + } + class MessageGeneratorActor(actorRef: ActorRef) extends Actor { + var messageNumber: Int = 0 + + protected def receive = { + case _ ⇒ + val payload = "%s".format(messageNumber) + messageNumber = messageNumber + 1 + actorRef ! ZMQMessage(payload.getBytes) + } + } + + object FreePort { + + def isPortFree(port: Int) = { + try { + val socket = new Socket("127.0.0.1", port) + socket.close() + false + } catch { + case e: ConnectException ⇒ true + case e: SocketException if e.getMessage == "Connection reset by peer" ⇒ true + } + } + + private def newPort = Random.nextInt(55365) + 10000 + + def randomFreePort(maxRetries: Int = 50) = { + var count = 0 + var freePort = newPort + while (!isPortFree(freePort)) { + freePort = newPort + count += 1 + if (count >= maxRetries) { + throw new RuntimeException("Couldn't determine a free port") + } + } + freePort + } + } +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 091346de34..9d45b1d888 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -195,6 +195,17 @@ object AkkaBuild extends Build { ) ) + + lazy val zeroMQ = Project( + id = "akka-zeromq", + base = file("akka-zeromq"), + dependencies = Seq(actor, testkit % "test;test->test"), + settings = defaultSettings ++ Seq( + libraryDependencies ++= Dependencies.zeroMQ + ) + ) + + // lazy val spring = Project( // id = "akka-spring", // base = file("akka-spring"), @@ -434,6 +445,8 @@ object Dependencies { val tutorials = Seq(Test.scalatest, Test.junit) val docs = Seq(Test.scalatest, Test.junit) + + val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ) } object Dependency { @@ -489,6 +502,7 @@ object Dependency { val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2 val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 + val zeroMQ = "org.zeromq" %% "zeromq-scala-binding" % "0.0.3" // ApacheV2 // Provided