diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 4121c83527..f23382ba44 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -71,6 +71,12 @@ akka { remote { + ### FIXME: Temporary switch for the PoC + artery { + enabled = off + port = 20200 + } + ### General settings # Timeout after which the startup of the remoting subsystem is considered diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 7546dc7ee1..0d6065da83 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -7,16 +7,19 @@ package akka.remote import akka.Done import akka.actor._ import akka.dispatch.sysmsg._ -import akka.event.{ Logging, LoggingAdapter, EventStream } +import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe + import scala.util.control.NonFatal -import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } +import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } + import scala.util.control.Exception.Catcher import scala.concurrent.Future import akka.ConfigurationException import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.remote.artery.ArterySubsystem /** * INTERNAL API @@ -179,7 +182,7 @@ private[akka] class RemoteActorRefProvider( d }, serialization = SerializationExtension(system), - transport = new Remoting(system, this)) + transport = if (remoteSettings.EnableArtery) new ArterySubsystem(system, this) else new Remoting(system, this)) _internals = internals remotingTerminator ! internals @@ -422,6 +425,7 @@ private[akka] class RemoteActorRefProvider( /** * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. + * * @param address Address of the remote system to be quarantined * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but * the current endpoint writer will be stopped (dropping system messages) and the address will be gated @@ -448,6 +452,8 @@ private[akka] class RemoteActorRef private[akka] ( deploy: Option[Deploy]) extends InternalActorRef with RemoteRef { + @volatile var cachedAssociation: artery.Association = null + def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream s.headOption match { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index f6138bb5fb..406ced337e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -20,6 +20,9 @@ final class RemoteSettings(val config: Config) { import config._ import scala.collection.JavaConverters._ + val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") + val ArteryPort: Int = getInt("akka.remote.artery.port") + val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") val LogSend: Boolean = getBoolean("akka.remote.log-sent-messages") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 55ce049ffe..0797e2d2cf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -91,9 +91,4 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va */ def quarantine(address: Address, uid: Option[Int]): Unit - /** - * When this method returns true, some functionality will be turned off for security purposes. - */ - protected def useUntrustedMode: Boolean - } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 8b77da60ae..04a5c0e966 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -227,9 +227,6 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null) } - // Not used anywhere only to keep compatibility with RemoteTransport interface - protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode - private[akka] def boundAddresses: Map[String, Set[Address]] = { transportMapping.map { case (scheme, transports) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala new file mode 100644 index 0000000000..0b007d6415 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery + +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.{ ActorRef, Address, ExtendedActorSystem } +import akka.event.{ Logging, LoggingAdapter } +import akka.remote.EndpointManager.Send +import akka.remote.transport.AkkaPduProtobufCodec +import akka.remote.{ DefaultMessageDispatcher, RemoteActorRef, RemoteActorRefProvider, RemoteTransport } +import akka.stream.scaladsl.{ Sink, Source, SourceQueueWithComplete, Tcp } +import akka.stream.{ ActorMaterializer, Materializer, OverflowStrategy } +import akka.{ Done, NotUsed } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } + +/** + * INTERNAL API + */ +private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { + @volatile private[this] var address: Address = _ + @volatile private[this] var transport: Transport = _ + @volatile private[this] var binding: Tcp.ServerBinding = _ + @volatile private[this] var materializer: Materializer = _ + override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + + override def defaultAddress: Address = address + override def addresses: Set[Address] = Set(address) + override def localAddressForRemote(remote: Address): Address = defaultAddress + + // FIXME: This does locking on putIfAbsent, we need something smarter + private[this] val associations = new ConcurrentHashMap[Address, Association]() + + override def start(): Unit = { + // TODO: Configure materializer properly + // TODO: Have a supervisor actor + address = Address("akka.artery", system.name, "localhost", provider.remoteSettings.ArteryPort) + materializer = ActorMaterializer()(system) + transport = new Transport( + address, + system, + provider, + AkkaPduProtobufCodec, + new DefaultMessageDispatcher(system, provider, log)) + + binding = Await.result( + Tcp(system).bindAndHandle(transport.inboundFlow, address.host.get, address.port.get)(materializer), + 3.seconds) + + log.info("Artery started up with address {}", binding.localAddress) + } + + override def shutdown(): Future[Done] = { + import system.dispatcher + binding.unbind().map(_ ⇒ Done).andThen { + case _ ⇒ transport.killSwitch.abort(new Exception("System shut down")) + } + } + + override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + val cached = recipient.cachedAssociation + val remoteAddress = recipient.path.address + + val association = + if (cached ne null) cached + else { + val association = getAssociation(remoteAddress) + association.associate() + recipient.cachedAssociation = association + association + } + + association.send(message, senderOption, recipient) + } + + private def getAssociation(remoteAddress: Address): Association = { + val current = associations.get(remoteAddress) + if (current ne null) current + else { + val newAssociation = new Association(materializer, remoteAddress, transport) + val currentAssociation = associations.putIfAbsent(remoteAddress, newAssociation) + if (currentAssociation eq null) { + newAssociation + } else currentAssociation + } + } + + override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = { + getAssociation(remoteAddress).quarantine(uid) + } + +} + +/** + * INTERNAL API + * + * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific + * remote address. + */ +private[remote] class Association( + val materializer: Materializer, + val remoteAddress: Address, + val transport: Transport) { + @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ + private[this] val sink: Sink[Send, Any] = transport.outbound(remoteAddress) + + def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + // TODO: lookup subchannel + // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly + queue.offer(Send(message, senderOption, recipient, None)) + } + + def quarantine(uid: Option[Int]): Unit = () + + // Idempotent + def associate(): Unit = { + queue = Source.queue(256, OverflowStrategy.dropBuffer).to(sink).run()(materializer) + } +} + diff --git a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala new file mode 100644 index 0000000000..9957ce5d0d --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery + +import java.net.InetSocketAddress +import java.nio.ByteOrder + +import akka.NotUsed +import akka.actor.{ Address, ExtendedActorSystem } +import akka.remote.EndpointManager.Send +import akka.remote.{ InboundMessageDispatcher, MessageSerializer, RemoteActorRefProvider } +import akka.remote.transport.AkkaPduCodec +import akka.serialization.Serialization +import akka.stream.{ KillSwitches, SharedKillSwitch } +import akka.stream.scaladsl.{ Flow, Framing, Sink, Source, Tcp } +import akka.util.{ ByteString, ByteStringBuilder } + +/** + * INTERNAL API + */ +// FIXME: Replace the codec with a custom made, hi-perf one +private[remote] class Transport(val localAddress: Address, + val system: ExtendedActorSystem, + val provider: RemoteActorRefProvider, + val codec: AkkaPduCodec, + val inboundDispatcher: InboundMessageDispatcher) { + + val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") + + def outbound(remoteAddress: Address): Sink[Send, Any] = { + val remoteInetSocketAddress = new InetSocketAddress( + remoteAddress.host.get, + remoteAddress.port.get) + + Flow.fromGraph(killSwitch.flow[Send]) + .via(encoder) + .via(Tcp(system).outgoingConnection(remoteInetSocketAddress, halfClose = false)) + .to(Sink.ignore) + } + + // TODO: Try out parallelized serialization (mapAsync) for performance + val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒ + val pdu: ByteString = codec.constructMessage( + sendEnvelope.recipient.localAddressToUse, + sendEnvelope.recipient, + Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, system)) { + MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef]) + }, + sendEnvelope.senderOption, + seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec + ackOption = None) + + // TODO: Drop unserializable messages + // TODO: Drop oversized messages + (new ByteStringBuilder).putInt(pdu.size)(ByteOrder.LITTLE_ENDIAN).result() ++ pdu + } + + val decoder: Flow[ByteString, AkkaPduCodec.Message, NotUsed] = + Framing.lengthField(4, maximumFrameLength = 256000) + .map { frame ⇒ + // TODO: Drop unserializable messages + val pdu = codec.decodeMessage(frame.drop(4), provider, localAddress)._2.get + pdu + } + + val messageDispatcher: Sink[AkkaPduCodec.Message, Any] = Sink.foreach[AkkaPduCodec.Message] { m ⇒ + inboundDispatcher.dispatch(m.recipient, m.recipientAddress, m.serializedMessage, m.senderOption) + } + + val inboundFlow: Flow[ByteString, ByteString, NotUsed] = { + Flow.fromSinkAndSource( + decoder.to(messageDispatcher), + Source.maybe[ByteString].via(killSwitch.flow)) + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala new file mode 100644 index 0000000000..9c78971bdd --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySmokeTest.scala @@ -0,0 +1,57 @@ +package akka.remote.artery + +import akka.actor.{ Actor, ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, Props, RootActorPath } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import com.typesafe.config.ConfigFactory + +import ArterySmokeTest._ + +object ArterySmokeTest { + + val commonConfig = """ + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + } + """ + +} + +class ArterySmokeTest extends AkkaSpec(commonConfig) with ImplicitSender { + + val configB = ConfigFactory.parseString("akka.remote.artery.port = 20201") + val systemB = ActorSystem("systemB", configB.withFallback(system.settings.config)) + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + println(addressB) + val rootB = RootActorPath(addressB) + + "Artery" must { + + "be able to identify a remote actor and ping it" in { + val actorOnSystemB = systemB.actorOf(Props(new Actor { + def receive = { + case "ping" ⇒ sender() ! "pong" + } + }), "echo") + + val remoteRef = { + system.actorSelection(rootB / "user" / "echo") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + remoteRef ! "ping" + expectMsg("pong") + + remoteRef ! "ping" + expectMsg("pong") + + remoteRef ! "ping" + expectMsg("pong") + + } + + } + + override def afterTermination(): Unit = shutdown(systemB) + +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index c3368df85b..a2a08c475a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -120,7 +120,7 @@ object AkkaBuild extends Build { lazy val remote = Project( id = "akka-remote", base = file("akka-remote"), - dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test", protobuf) + dependencies = Seq(actor, stream, actorTests % "test->test", testkit % "test->test", protobuf) ) lazy val multiNodeTestkit = Project( diff --git a/project/MiMa.scala b/project/MiMa.scala index dcc2959cd7..8a76c58cf5 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -714,7 +714,11 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), // #19390 Add flow monitor - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor"), + + // Remove useUntrustedMode which is an internal API and not used anywhere anymore + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode") ) ) }