From f5b88f76205bd04e9a8910b0e84ca2acc172d2e3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 14 Apr 2016 07:30:18 +0200 Subject: [PATCH 1/3] move AeronSource, AeronSink to remote --- akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala | 2 ++ .../src/main/scala/akka/remote/artery}/AeronSink.scala | 2 +- .../src/main/scala/akka/remote/artery}/AeronSource.scala | 2 +- project/AkkaBuild.scala | 2 +- project/Dependencies.scala | 4 ++-- 5 files changed, 7 insertions(+), 5 deletions(-) rename {akka-bench-jmh/src/main/scala/akka/aeron => akka-remote/src/main/scala/akka/remote/artery}/AeronSink.scala (99%) rename {akka-bench-jmh/src/main/scala/akka/aeron => akka-remote/src/main/scala/akka/remote/artery}/AeronSource.scala (99%) diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala b/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala index e73d87c994..9857db6e69 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala +++ b/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala @@ -17,6 +17,8 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.atomic.AtomicLongArray import akka.stream.ThrottleMode +import akka.remote.artery.AeronSink +import akka.remote.artery.AeronSource object AeronStreams { diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala similarity index 99% rename from akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala rename to akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index c595075a43..c002695696 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -1,4 +1,4 @@ -package akka.aeron +package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.TimeUnit diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala similarity index 99% rename from akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala rename to akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index 0a7ff3f6e1..a2bdaee683 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -1,4 +1,4 @@ -package akka.aeron +package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.TimeUnit diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a2a08c475a..f0594b982e 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -107,7 +107,7 @@ object AkkaBuild extends Build { dependencies = Seq( actor, http, stream, streamTests, - persistence, distributedData, + remote, persistence, distributedData, testkit ).map(_ % "compile;compile->test;provided->provided") ).disablePlugins(ValidatePullRequest) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 65438b552c..b55dc7d954 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -129,7 +129,7 @@ object Dependencies { val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck.value, Test.junitIntf) - val remote = l ++= Seq(netty, uncommonsMath, Test.junit, Test.scalatest.value) + val remote = l ++= Seq(netty, uncommonsMath, aeronDriver, aeronClient, Test.junit, Test.scalatest.value) val remoteTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.scalaXml) @@ -165,7 +165,7 @@ object Dependencies { val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, aeronDriver, aeronClient, hdrHistogram) + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, hdrHistogram) // akka stream & http From 5088142307118768493f57a12cbf27e87aa30fac Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 14 Apr 2016 10:32:01 +0200 Subject: [PATCH 2/3] add Aeron to Artery --- akka-remote/src/main/resources/reference.conf | 3 + .../scala/akka/remote/RemoteSettings.scala | 2 + .../scala/akka/remote/artery/AeronSink.scala | 6 +- .../akka/remote/artery/AeronSource.scala | 8 +- .../akka/remote/artery/ArterySubsystem.scala | 42 +++--- .../scala/akka/remote/artery/Transport.scala | 129 +++++++++++++++--- .../akka/remote/artery/ArterySmokeTest.scala | 6 +- 7 files changed, 155 insertions(+), 41 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index f23382ba44..6765b5f9ec 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -75,6 +75,9 @@ akka { artery { enabled = off port = 20200 + hostname = localhost + # tcp, aeron-udp + transport = tcp } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 406ced337e..7cf64af82d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -22,6 +22,8 @@ final class RemoteSettings(val config: Config) { val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") val ArteryPort: Int = getInt("akka.remote.artery.port") + val ArteryHostname: String = getString("akka.remote.artery.hostname") + val ArteryTransport: String = getString("akka.remote.artery.transport") val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index c002695696..637bd8e7e6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -25,7 +25,7 @@ object AeronSink { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { +class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { import AeronSink._ val in: Inlet[Bytes] = Inlet("AeronSink") @@ -90,8 +90,8 @@ class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShap override protected def onTimer(timerKey: Any): Unit = { timerKey match { - case Backoff => publish() - case msg => super.onTimer(msg) + case Backoff ⇒ publish() + case msg ⇒ super.onTimer(msg) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index a2bdaee683..cdc2333cfa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -29,7 +29,7 @@ object AeronSource { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { +class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { import AeronSource._ val out: Outlet[Bytes] = Outlet("AeronSource") @@ -47,7 +47,7 @@ class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[Source private val retries = 115 private var backoffCount = retries - val receiveMessage = getAsyncCallback[Bytes] { data => + val receiveMessage = getAsyncCallback[Bytes] { data ⇒ push(out, data) } @@ -96,8 +96,8 @@ class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[Source override protected def onTimer(timerKey: Any): Unit = { timerKey match { - case Backoff => subscriberLoop() - case msg => super.onTimer(msg) + case Backoff ⇒ subscriberLoop() + case msg ⇒ super.onTimer(msg) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala index 0b007d6415..77f85bf093 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala @@ -22,9 +22,11 @@ import scala.concurrent.{ Await, Future } * INTERNAL API */ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { + import provider.remoteSettings + @volatile private[this] var address: Address = _ @volatile private[this] var transport: Transport = _ - @volatile private[this] var binding: Tcp.ServerBinding = _ + @volatile private[this] var tcpBinding: Option[Tcp.ServerBinding] = None @volatile private[this] var materializer: Materializer = _ override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) @@ -38,27 +40,35 @@ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: R override def start(): Unit = { // TODO: Configure materializer properly // TODO: Have a supervisor actor - address = Address("akka.artery", system.name, "localhost", provider.remoteSettings.ArteryPort) + address = Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort) materializer = ActorMaterializer()(system) - transport = new Transport( - address, - system, - provider, - AkkaPduProtobufCodec, - new DefaultMessageDispatcher(system, provider, log)) - binding = Await.result( - Tcp(system).bindAndHandle(transport.inboundFlow, address.host.get, address.port.get)(materializer), - 3.seconds) + transport = remoteSettings.ArteryTransport match { + case "tcp" ⇒ + new TcpTransport( + address, + system, + materializer, + provider, + AkkaPduProtobufCodec, + new DefaultMessageDispatcher(system, provider, log)) + case "aeron-udp" ⇒ + new AeronTransport( + address, + system, + materializer, + provider, + AkkaPduProtobufCodec, + new DefaultMessageDispatcher(system, provider, log)) + case unknown ⇒ throw new IllegalArgumentException(s"Unknown transport $unknown") + } - log.info("Artery started up with address {}", binding.localAddress) + transport.start() } override def shutdown(): Future[Done] = { - import system.dispatcher - binding.unbind().map(_ ⇒ Done).andThen { - case _ ⇒ transport.killSwitch.abort(new Exception("System shut down")) - } + if (transport != null) transport.shutdown() + else Future.successful(Done) } override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala index 9957ce5d0d..9c52f4901f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala @@ -4,9 +4,9 @@ package akka.remote.artery +import scala.concurrent.duration._ import java.net.InetSocketAddress import java.nio.ByteOrder - import akka.NotUsed import akka.actor.{ Address, ExtendedActorSystem } import akka.remote.EndpointManager.Send @@ -16,30 +16,29 @@ import akka.serialization.Serialization import akka.stream.{ KillSwitches, SharedKillSwitch } import akka.stream.scaladsl.{ Flow, Framing, Sink, Source, Tcp } import akka.util.{ ByteString, ByteStringBuilder } +import scala.concurrent.Future +import akka.Done +import akka.stream.Materializer +import scala.concurrent.Await +import akka.event.LoggingAdapter +import akka.event.Logging +import io.aeron.driver.MediaDriver +import io.aeron.Aeron /** * INTERNAL API */ // FIXME: Replace the codec with a custom made, hi-perf one -private[remote] class Transport(val localAddress: Address, - val system: ExtendedActorSystem, - val provider: RemoteActorRefProvider, - val codec: AkkaPduCodec, - val inboundDispatcher: InboundMessageDispatcher) { +private[remote] abstract class Transport(val localAddress: Address, + val system: ExtendedActorSystem, + val provider: RemoteActorRefProvider, + val codec: AkkaPduCodec, + val inboundDispatcher: InboundMessageDispatcher) { + + val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") - def outbound(remoteAddress: Address): Sink[Send, Any] = { - val remoteInetSocketAddress = new InetSocketAddress( - remoteAddress.host.get, - remoteAddress.port.get) - - Flow.fromGraph(killSwitch.flow[Send]) - .via(encoder) - .via(Tcp(system).outgoingConnection(remoteInetSocketAddress, halfClose = false)) - .to(Sink.ignore) - } - // TODO: Try out parallelized serialization (mapAsync) for performance val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒ val pdu: ByteString = codec.constructMessage( @@ -75,4 +74,100 @@ private[remote] class Transport(val localAddress: Address, Source.maybe[ByteString].via(killSwitch.flow)) } + def start(): Unit + + def shutdown(): Future[Done] + + def outbound(remoteAddress: Address): Sink[Send, Any] +} + +/** + * INTERNAL API + */ +private[remote] class TcpTransport( + localAddress: Address, + system: ExtendedActorSystem, + materializer: Materializer, + provider: RemoteActorRefProvider, + codec: AkkaPduCodec, + inboundDispatcher: InboundMessageDispatcher) + extends Transport(localAddress, system, provider, codec, inboundDispatcher) { + + @volatile private[this] var binding: Tcp.ServerBinding = _ + + override def start(): Unit = { + binding = Await.result( + Tcp(system).bindAndHandle(inboundFlow, localAddress.host.get, localAddress.port.get)(materializer), + 3.seconds) + log.info("Artery TCP started up with address {}", binding.localAddress) + } + + override def shutdown(): Future[Done] = { + import system.dispatcher + if (binding != null) { + binding.unbind().map(_ ⇒ Done).andThen { + case _ ⇒ killSwitch.abort(new Exception("System shut down")) + } + } else + Future.successful(Done) + } + + override def outbound(remoteAddress: Address): Sink[Send, Any] = { + val remoteInetSocketAddress = new InetSocketAddress( + remoteAddress.host.get, + remoteAddress.port.get) + + Flow.fromGraph(killSwitch.flow[Send]) + .via(encoder) + .via(Tcp(system).outgoingConnection(remoteInetSocketAddress, halfClose = false)) + .to(Sink.ignore) + } +} + +/** + * INTERNAL API + */ +private[remote] class AeronTransport( + localAddress: Address, + system: ExtendedActorSystem, + materializer: Materializer, + provider: RemoteActorRefProvider, + codec: AkkaPduCodec, + inboundDispatcher: InboundMessageDispatcher) + extends Transport(localAddress, system, provider, codec, inboundDispatcher) { + + private implicit val mat = materializer + // TODO support port 0 + private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}" + + private val aeron = { + val ctx = new Aeron.Context + // TODO also support external media driver + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + override def start(): Unit = { + Source.fromGraph(new AeronSource(inboundChannel, () ⇒ aeron)) + .map(ByteString.apply) // TODO we should use ByteString all the way + .via(inboundFlow) + .runWith(Sink.ignore) + } + + override def shutdown(): Future[Done] = { + // FIXME stop the AeronSource first? + aeron.close() + Future.successful(Done) + } + + override def outbound(remoteAddress: Address): Sink[Send, Any] = { + val outboundChannel = s"aeron:udp?endpoint=${remoteAddress.host.get}:${remoteAddress.port.get}" + Flow.fromGraph(killSwitch.flow[Send]) + .via(encoder) + .map(_.toArray) // TODO we should use ByteString all the way + .to(new AeronSink(outboundChannel, () ⇒ aeron)) + } + + // FIXME we don't need Framing for Aeron, since it has fragmentation } diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala index 9c78971bdd..5c1bed24f5 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala @@ -11,7 +11,11 @@ object ArterySmokeTest { val commonConfig = """ akka { actor.provider = "akka.remote.RemoteActorRefProvider" - remote.artery.enabled = on + remote.artery { + enabled = on + #transport = tcp + transport = aeron-udp + } } """ From 49168f6111485096c7af8cc5ea6cb1d01e920880 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 14 Apr 2016 11:01:53 +0200 Subject: [PATCH 3/3] remove TcpTransport --- akka-remote/src/main/resources/reference.conf | 2 - .../scala/akka/remote/RemoteSettings.scala | 1 - .../akka/remote/artery/ArterySubsystem.scala | 28 +--- .../scala/akka/remote/artery/Transport.scala | 143 +++++------------- .../akka/remote/artery/ArterySmokeTest.scala | 6 +- 5 files changed, 50 insertions(+), 130 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 6765b5f9ec..e2aa5d8538 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -76,8 +76,6 @@ akka { enabled = off port = 20200 hostname = localhost - # tcp, aeron-udp - transport = tcp } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 7cf64af82d..97806381ff 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -23,7 +23,6 @@ final class RemoteSettings(val config: Config) { val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") val ArteryPort: Int = getInt("akka.remote.artery.port") val ArteryHostname: String = getString("akka.remote.artery.hostname") - val ArteryTransport: String = getString("akka.remote.artery.transport") val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala index 77f85bf093..d9bf776709 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala @@ -43,26 +43,14 @@ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: R address = Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort) materializer = ActorMaterializer()(system) - transport = remoteSettings.ArteryTransport match { - case "tcp" ⇒ - new TcpTransport( - address, - system, - materializer, - provider, - AkkaPduProtobufCodec, - new DefaultMessageDispatcher(system, provider, log)) - case "aeron-udp" ⇒ - new AeronTransport( - address, - system, - materializer, - provider, - AkkaPduProtobufCodec, - new DefaultMessageDispatcher(system, provider, log)) - case unknown ⇒ throw new IllegalArgumentException(s"Unknown transport $unknown") - } - + transport = + new Transport( + address, + system, + materializer, + provider, + AkkaPduProtobufCodec, + new DefaultMessageDispatcher(system, provider, log)) transport.start() } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala index 9c52f4901f..88c55b3ff4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala @@ -29,16 +29,51 @@ import io.aeron.Aeron * INTERNAL API */ // FIXME: Replace the codec with a custom made, hi-perf one -private[remote] abstract class Transport(val localAddress: Address, - val system: ExtendedActorSystem, - val provider: RemoteActorRefProvider, - val codec: AkkaPduCodec, - val inboundDispatcher: InboundMessageDispatcher) { +private[remote] class Transport( + val localAddress: Address, + val system: ExtendedActorSystem, + val materializer: Materializer, + val provider: RemoteActorRefProvider, + val codec: AkkaPduCodec, + val inboundDispatcher: InboundMessageDispatcher) { - val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + private val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + + private implicit val mat = materializer + // TODO support port 0 + private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}" + + private val aeron = { + val ctx = new Aeron.Context + // TODO also support external media driver + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + def start(): Unit = { + Source.fromGraph(new AeronSource(inboundChannel, () ⇒ aeron)) + .map(ByteString.apply) // TODO we should use ByteString all the way + .via(inboundFlow) + .runWith(Sink.ignore) + } + + def shutdown(): Future[Done] = { + // FIXME stop the AeronSource first? + aeron.close() + Future.successful(Done) + } val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") + def outbound(remoteAddress: Address): Sink[Send, Any] = { + val outboundChannel = s"aeron:udp?endpoint=${remoteAddress.host.get}:${remoteAddress.port.get}" + Flow.fromGraph(killSwitch.flow[Send]) + .via(encoder) + .map(_.toArray) // TODO we should use ByteString all the way + .to(new AeronSink(outboundChannel, () ⇒ aeron)) + } + // TODO: Try out parallelized serialization (mapAsync) for performance val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒ val pdu: ByteString = codec.constructMessage( @@ -74,100 +109,4 @@ private[remote] abstract class Transport(val localAddress: Address, Source.maybe[ByteString].via(killSwitch.flow)) } - def start(): Unit - - def shutdown(): Future[Done] - - def outbound(remoteAddress: Address): Sink[Send, Any] -} - -/** - * INTERNAL API - */ -private[remote] class TcpTransport( - localAddress: Address, - system: ExtendedActorSystem, - materializer: Materializer, - provider: RemoteActorRefProvider, - codec: AkkaPduCodec, - inboundDispatcher: InboundMessageDispatcher) - extends Transport(localAddress, system, provider, codec, inboundDispatcher) { - - @volatile private[this] var binding: Tcp.ServerBinding = _ - - override def start(): Unit = { - binding = Await.result( - Tcp(system).bindAndHandle(inboundFlow, localAddress.host.get, localAddress.port.get)(materializer), - 3.seconds) - log.info("Artery TCP started up with address {}", binding.localAddress) - } - - override def shutdown(): Future[Done] = { - import system.dispatcher - if (binding != null) { - binding.unbind().map(_ ⇒ Done).andThen { - case _ ⇒ killSwitch.abort(new Exception("System shut down")) - } - } else - Future.successful(Done) - } - - override def outbound(remoteAddress: Address): Sink[Send, Any] = { - val remoteInetSocketAddress = new InetSocketAddress( - remoteAddress.host.get, - remoteAddress.port.get) - - Flow.fromGraph(killSwitch.flow[Send]) - .via(encoder) - .via(Tcp(system).outgoingConnection(remoteInetSocketAddress, halfClose = false)) - .to(Sink.ignore) - } -} - -/** - * INTERNAL API - */ -private[remote] class AeronTransport( - localAddress: Address, - system: ExtendedActorSystem, - materializer: Materializer, - provider: RemoteActorRefProvider, - codec: AkkaPduCodec, - inboundDispatcher: InboundMessageDispatcher) - extends Transport(localAddress, system, provider, codec, inboundDispatcher) { - - private implicit val mat = materializer - // TODO support port 0 - private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}" - - private val aeron = { - val ctx = new Aeron.Context - // TODO also support external media driver - val driver = MediaDriver.launchEmbedded() - ctx.aeronDirectoryName(driver.aeronDirectoryName) - Aeron.connect(ctx) - } - - override def start(): Unit = { - Source.fromGraph(new AeronSource(inboundChannel, () ⇒ aeron)) - .map(ByteString.apply) // TODO we should use ByteString all the way - .via(inboundFlow) - .runWith(Sink.ignore) - } - - override def shutdown(): Future[Done] = { - // FIXME stop the AeronSource first? - aeron.close() - Future.successful(Done) - } - - override def outbound(remoteAddress: Address): Sink[Send, Any] = { - val outboundChannel = s"aeron:udp?endpoint=${remoteAddress.host.get}:${remoteAddress.port.get}" - Flow.fromGraph(killSwitch.flow[Send]) - .via(encoder) - .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel, () ⇒ aeron)) - } - - // FIXME we don't need Framing for Aeron, since it has fragmentation } diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala index 5c1bed24f5..9c78971bdd 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala @@ -11,11 +11,7 @@ object ArterySmokeTest { val commonConfig = """ akka { actor.provider = "akka.remote.RemoteActorRefProvider" - remote.artery { - enabled = on - #transport = tcp - transport = aeron-udp - } + remote.artery.enabled = on } """