diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala b/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala index e73d87c994..9857db6e69 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala +++ b/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala @@ -17,6 +17,8 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.atomic.AtomicLongArray import akka.stream.ThrottleMode +import akka.remote.artery.AeronSink +import akka.remote.artery.AeronSource object AeronStreams { diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index f23382ba44..e2aa5d8538 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -75,6 +75,7 @@ akka { artery { enabled = off port = 20200 + hostname = localhost } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 406ced337e..97806381ff 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -22,6 +22,7 @@ final class RemoteSettings(val config: Config) { val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") val ArteryPort: Int = getInt("akka.remote.artery.port") + val ArteryHostname: String = getString("akka.remote.artery.hostname") val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala similarity index 93% rename from akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala rename to akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index c595075a43..637bd8e7e6 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -1,4 +1,4 @@ -package akka.aeron +package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.TimeUnit @@ -25,7 +25,7 @@ object AeronSink { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { +class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { import AeronSink._ val in: Inlet[Bytes] = Inlet("AeronSink") @@ -90,8 +90,8 @@ class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShap override protected def onTimer(timerKey: Any): Unit = { timerKey match { - case Backoff => publish() - case msg => super.onTimer(msg) + case Backoff ⇒ publish() + case msg ⇒ super.onTimer(msg) } } diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala similarity index 91% rename from akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala rename to akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index 0a7ff3f6e1..cdc2333cfa 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -1,4 +1,4 @@ -package akka.aeron +package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.TimeUnit @@ -29,7 +29,7 @@ object AeronSource { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { +class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { import AeronSource._ val out: Outlet[Bytes] = Outlet("AeronSource") @@ -47,7 +47,7 @@ class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[Source private val retries = 115 private var backoffCount = retries - val receiveMessage = getAsyncCallback[Bytes] { data => + val receiveMessage = getAsyncCallback[Bytes] { data ⇒ push(out, data) } @@ -96,8 +96,8 @@ class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[Source override protected def onTimer(timerKey: Any): Unit = { timerKey match { - case Backoff => subscriberLoop() - case msg => super.onTimer(msg) + case Backoff ⇒ subscriberLoop() + case msg ⇒ super.onTimer(msg) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala index 0b007d6415..d9bf776709 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala @@ -22,9 +22,11 @@ import scala.concurrent.{ Await, Future } * INTERNAL API */ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { + import provider.remoteSettings + @volatile private[this] var address: Address = _ @volatile private[this] var transport: Transport = _ - @volatile private[this] var binding: Tcp.ServerBinding = _ + @volatile private[this] var tcpBinding: Option[Tcp.ServerBinding] = None @volatile private[this] var materializer: Materializer = _ override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) @@ -38,27 +40,23 @@ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: R override def start(): Unit = { // TODO: Configure materializer properly // TODO: Have a supervisor actor - address = Address("akka.artery", system.name, "localhost", provider.remoteSettings.ArteryPort) + address = Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort) materializer = ActorMaterializer()(system) - transport = new Transport( - address, - system, - provider, - AkkaPduProtobufCodec, - new DefaultMessageDispatcher(system, provider, log)) - binding = Await.result( - Tcp(system).bindAndHandle(transport.inboundFlow, address.host.get, address.port.get)(materializer), - 3.seconds) - - log.info("Artery started up with address {}", binding.localAddress) + transport = + new Transport( + address, + system, + materializer, + provider, + AkkaPduProtobufCodec, + new DefaultMessageDispatcher(system, provider, log)) + transport.start() } override def shutdown(): Future[Done] = { - import system.dispatcher - binding.unbind().map(_ ⇒ Done).andThen { - case _ ⇒ transport.killSwitch.abort(new Exception("System shut down")) - } + if (transport != null) transport.shutdown() + else Future.successful(Done) } override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala index 9957ce5d0d..88c55b3ff4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala @@ -4,9 +4,9 @@ package akka.remote.artery +import scala.concurrent.duration._ import java.net.InetSocketAddress import java.nio.ByteOrder - import akka.NotUsed import akka.actor.{ Address, ExtendedActorSystem } import akka.remote.EndpointManager.Send @@ -16,28 +16,62 @@ import akka.serialization.Serialization import akka.stream.{ KillSwitches, SharedKillSwitch } import akka.stream.scaladsl.{ Flow, Framing, Sink, Source, Tcp } import akka.util.{ ByteString, ByteStringBuilder } +import scala.concurrent.Future +import akka.Done +import akka.stream.Materializer +import scala.concurrent.Await +import akka.event.LoggingAdapter +import akka.event.Logging +import io.aeron.driver.MediaDriver +import io.aeron.Aeron /** * INTERNAL API */ // FIXME: Replace the codec with a custom made, hi-perf one -private[remote] class Transport(val localAddress: Address, - val system: ExtendedActorSystem, - val provider: RemoteActorRefProvider, - val codec: AkkaPduCodec, - val inboundDispatcher: InboundMessageDispatcher) { +private[remote] class Transport( + val localAddress: Address, + val system: ExtendedActorSystem, + val materializer: Materializer, + val provider: RemoteActorRefProvider, + val codec: AkkaPduCodec, + val inboundDispatcher: InboundMessageDispatcher) { + + private val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + + private implicit val mat = materializer + // TODO support port 0 + private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}" + + private val aeron = { + val ctx = new Aeron.Context + // TODO also support external media driver + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + def start(): Unit = { + Source.fromGraph(new AeronSource(inboundChannel, () ⇒ aeron)) + .map(ByteString.apply) // TODO we should use ByteString all the way + .via(inboundFlow) + .runWith(Sink.ignore) + } + + def shutdown(): Future[Done] = { + // FIXME stop the AeronSource first? + aeron.close() + Future.successful(Done) + } val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") def outbound(remoteAddress: Address): Sink[Send, Any] = { - val remoteInetSocketAddress = new InetSocketAddress( - remoteAddress.host.get, - remoteAddress.port.get) - + val outboundChannel = s"aeron:udp?endpoint=${remoteAddress.host.get}:${remoteAddress.port.get}" Flow.fromGraph(killSwitch.flow[Send]) .via(encoder) - .via(Tcp(system).outgoingConnection(remoteInetSocketAddress, halfClose = false)) - .to(Sink.ignore) + .map(_.toArray) // TODO we should use ByteString all the way + .to(new AeronSink(outboundChannel, () ⇒ aeron)) } // TODO: Try out parallelized serialization (mapAsync) for performance diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a2a08c475a..f0594b982e 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -107,7 +107,7 @@ object AkkaBuild extends Build { dependencies = Seq( actor, http, stream, streamTests, - persistence, distributedData, + remote, persistence, distributedData, testkit ).map(_ % "compile;compile->test;provided->provided") ).disablePlugins(ValidatePullRequest) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 65438b552c..b55dc7d954 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -129,7 +129,7 @@ object Dependencies { val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck.value, Test.junitIntf) - val remote = l ++= Seq(netty, uncommonsMath, Test.junit, Test.scalatest.value) + val remote = l ++= Seq(netty, uncommonsMath, aeronDriver, aeronClient, Test.junit, Test.scalatest.value) val remoteTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.scalaXml) @@ -165,7 +165,7 @@ object Dependencies { val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, aeronDriver, aeronClient, hdrHistogram) + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, hdrHistogram) // akka stream & http