Merge pull request #20310 from akka/wip-artery-aeron-patriknw

add Aeron to Artery
This commit is contained in:
drewhk 2016-04-14 13:12:27 +02:00
commit 85ed7b171e
9 changed files with 77 additions and 41 deletions

View file

@ -17,6 +17,8 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicLongArray
import akka.stream.ThrottleMode
import akka.remote.artery.AeronSink
import akka.remote.artery.AeronSource
object AeronStreams {

View file

@ -75,6 +75,7 @@ akka {
artery {
enabled = off
port = 20200
hostname = localhost
}
### General settings

View file

@ -22,6 +22,7 @@ final class RemoteSettings(val config: Config) {
val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled")
val ArteryPort: Int = getInt("akka.remote.artery.port")
val ArteryHostname: String = getString("akka.remote.artery.hostname")
val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages")

View file

@ -1,4 +1,4 @@
package akka.aeron
package akka.remote.artery
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
@ -25,7 +25,7 @@ object AeronSink {
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] {
class AeronSink(channel: String, aeron: () Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] {
import AeronSink._
val in: Inlet[Bytes] = Inlet("AeronSink")
@ -90,8 +90,8 @@ class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShap
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case Backoff => publish()
case msg => super.onTimer(msg)
case Backoff publish()
case msg super.onTimer(msg)
}
}

View file

@ -1,4 +1,4 @@
package akka.aeron
package akka.remote.artery
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
@ -29,7 +29,7 @@ object AeronSource {
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] {
class AeronSource(channel: String, aeron: () Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] {
import AeronSource._
val out: Outlet[Bytes] = Outlet("AeronSource")
@ -47,7 +47,7 @@ class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[Source
private val retries = 115
private var backoffCount = retries
val receiveMessage = getAsyncCallback[Bytes] { data =>
val receiveMessage = getAsyncCallback[Bytes] { data
push(out, data)
}
@ -96,8 +96,8 @@ class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[Source
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case Backoff => subscriberLoop()
case msg => super.onTimer(msg)
case Backoff subscriberLoop()
case msg super.onTimer(msg)
}
}

View file

@ -22,9 +22,11 @@ import scala.concurrent.{ Await, Future }
* INTERNAL API
*/
private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
import provider.remoteSettings
@volatile private[this] var address: Address = _
@volatile private[this] var transport: Transport = _
@volatile private[this] var binding: Tcp.ServerBinding = _
@volatile private[this] var tcpBinding: Option[Tcp.ServerBinding] = None
@volatile private[this] var materializer: Materializer = _
override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
@ -38,27 +40,23 @@ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: R
override def start(): Unit = {
// TODO: Configure materializer properly
// TODO: Have a supervisor actor
address = Address("akka.artery", system.name, "localhost", provider.remoteSettings.ArteryPort)
address = Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort)
materializer = ActorMaterializer()(system)
transport = new Transport(
address,
system,
provider,
AkkaPduProtobufCodec,
new DefaultMessageDispatcher(system, provider, log))
binding = Await.result(
Tcp(system).bindAndHandle(transport.inboundFlow, address.host.get, address.port.get)(materializer),
3.seconds)
log.info("Artery started up with address {}", binding.localAddress)
transport =
new Transport(
address,
system,
materializer,
provider,
AkkaPduProtobufCodec,
new DefaultMessageDispatcher(system, provider, log))
transport.start()
}
override def shutdown(): Future[Done] = {
import system.dispatcher
binding.unbind().map(_ Done).andThen {
case _ transport.killSwitch.abort(new Exception("System shut down"))
}
if (transport != null) transport.shutdown()
else Future.successful(Done)
}
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {

View file

@ -4,9 +4,9 @@
package akka.remote.artery
import scala.concurrent.duration._
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.NotUsed
import akka.actor.{ Address, ExtendedActorSystem }
import akka.remote.EndpointManager.Send
@ -16,28 +16,62 @@ import akka.serialization.Serialization
import akka.stream.{ KillSwitches, SharedKillSwitch }
import akka.stream.scaladsl.{ Flow, Framing, Sink, Source, Tcp }
import akka.util.{ ByteString, ByteStringBuilder }
import scala.concurrent.Future
import akka.Done
import akka.stream.Materializer
import scala.concurrent.Await
import akka.event.LoggingAdapter
import akka.event.Logging
import io.aeron.driver.MediaDriver
import io.aeron.Aeron
/**
* INTERNAL API
*/
// FIXME: Replace the codec with a custom made, hi-perf one
private[remote] class Transport(val localAddress: Address,
val system: ExtendedActorSystem,
val provider: RemoteActorRefProvider,
val codec: AkkaPduCodec,
val inboundDispatcher: InboundMessageDispatcher) {
private[remote] class Transport(
val localAddress: Address,
val system: ExtendedActorSystem,
val materializer: Materializer,
val provider: RemoteActorRefProvider,
val codec: AkkaPduCodec,
val inboundDispatcher: InboundMessageDispatcher) {
private val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
private implicit val mat = materializer
// TODO support port 0
private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}"
private val aeron = {
val ctx = new Aeron.Context
// TODO also support external media driver
val driver = MediaDriver.launchEmbedded()
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
def start(): Unit = {
Source.fromGraph(new AeronSource(inboundChannel, () aeron))
.map(ByteString.apply) // TODO we should use ByteString all the way
.via(inboundFlow)
.runWith(Sink.ignore)
}
def shutdown(): Future[Done] = {
// FIXME stop the AeronSource first?
aeron.close()
Future.successful(Done)
}
val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
def outbound(remoteAddress: Address): Sink[Send, Any] = {
val remoteInetSocketAddress = new InetSocketAddress(
remoteAddress.host.get,
remoteAddress.port.get)
val outboundChannel = s"aeron:udp?endpoint=${remoteAddress.host.get}:${remoteAddress.port.get}"
Flow.fromGraph(killSwitch.flow[Send])
.via(encoder)
.via(Tcp(system).outgoingConnection(remoteInetSocketAddress, halfClose = false))
.to(Sink.ignore)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel, () aeron))
}
// TODO: Try out parallelized serialization (mapAsync) for performance

View file

@ -107,7 +107,7 @@ object AkkaBuild extends Build {
dependencies = Seq(
actor,
http, stream, streamTests,
persistence, distributedData,
remote, persistence, distributedData,
testkit
).map(_ % "compile;compile->test;provided->provided")
).disablePlugins(ValidatePullRequest)

View file

@ -129,7 +129,7 @@ object Dependencies {
val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck.value, Test.junitIntf)
val remote = l ++= Seq(netty, uncommonsMath, Test.junit, Test.scalatest.value)
val remote = l ++= Seq(netty, uncommonsMath, aeronDriver, aeronClient, Test.junit, Test.scalatest.value)
val remoteTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.scalaXml)
@ -165,7 +165,7 @@ object Dependencies {
val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, aeronDriver, aeronClient, hdrHistogram)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, hdrHistogram)
// akka stream & http