diff --git a/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala b/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala index cf2f573fd3..e0cf257dd0 100644 --- a/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala +++ b/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala @@ -14,6 +14,8 @@ trait BufferPool { } /** + * INTERNAL API + * * A buffer pool which keeps a free list of direct buffers of a specified default * size in a simple fixed size stack. * diff --git a/akka-actor/src/main/scala/akka/io/Pipelines.scala b/akka-actor/src/main/scala/akka/io/Pipelines.scala index 0126547bbf..3219b6bd78 100644 --- a/akka-actor/src/main/scala/akka/io/Pipelines.scala +++ b/akka-actor/src/main/scala/akka/io/Pipelines.scala @@ -15,6 +15,7 @@ import scala.concurrent.duration.FiniteDuration import scala.collection.mutable.WrappedArray import scala.concurrent.duration.Deadline import scala.beans.BeanProperty +import akka.event.LoggingAdapter /** * Scala API: A pair of pipes, one for commands and one for events, plus a @@ -34,21 +35,22 @@ import scala.beans.BeanProperty */ trait PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { - type Mgmt = PartialFunction[AnyRef, Iterable[Either[EvtAbove, CmdBelow]]] + type Result = Either[EvtAbove, CmdBelow] + type Mgmt = PartialFunction[AnyRef, Iterable[Result]] /** * The command pipeline transforms injected commands from the upper stage * into commands for the stage below, but it can also emit events for the * upper stage. Any number of each can be generated. */ - def commandPipeline: CmdAbove ⇒ Iterable[Either[EvtAbove, CmdBelow]] + def commandPipeline: CmdAbove ⇒ Iterable[Result] /** * The event pipeline transforms injected event from the lower stage * into event for the stage above, but it can also emit commands for the * stage below. Any number of each can be generated. */ - def eventPipeline: EvtBelow ⇒ Iterable[Either[EvtAbove, CmdBelow]] + def eventPipeline: EvtBelow ⇒ Iterable[Result] /** * The management port allows sending broadcast messages to all stages @@ -817,6 +819,17 @@ class LengthFieldFrame(maxSize: Int, } //#length-field-frame +/** + * This trait expresses that the pipeline’s context needs to provide a logging + * facility. + */ +trait HasLogging extends PipelineContext { + /** + * Retrieve the [[LoggingAdapter]] for this pipeline’s context. + */ + def getLogger: LoggingAdapter +} + //#tick-generator /** * This trait expresses that the pipeline’s context needs to live within an diff --git a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala new file mode 100644 index 0000000000..d90325a9fb --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala @@ -0,0 +1,233 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +// adapted from +// https://github.com/spray/spray/blob/eef5c4f54a0cadaf9e98298faf5b337f9adc04bb/spray-io/src/main/scala/spray/io/SslTlsSupport.scala +// original copyright notice follows: + +/* + * Copyright (C) 2011-2013 spray.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.io + +import java.nio.ByteBuffer +import javax.net.ssl.{ SSLContext, SSLException, SSLEngineResult, SSLEngine } +import javax.net.ssl.SSLEngineResult.HandshakeStatus._ +import javax.net.ssl.SSLEngineResult.Status._ +import scala.collection.immutable +import scala.annotation.tailrec +import akka.util.ByteString +import Tcp.{ Command, Event } + +object SslTlsSupport { + + // we are using Nettys default values: + // 16665 + 1024 (room for compressed data) + 1024 (for OpenJDK compatibility) + private final val MaxPacketSize = 16665 + 2048 + + private final val EmptyByteArray = new Array[Byte](0) + +} + +/** + * This pipeline stage implements SSL / TLS support, using an externally + * configured [[SSLEngine]]. It operates on the level of [[Tcp.Event]] and + * [[Tcp.Command]] messages, which means that it will typically be one of + * the lowest stages in a protocol stack. + * + * Each instance of this stage has a scratch [[ByteBuffer]] of approx. 18kiB + * allocated which is used by the SSLEngine. + */ +class SslTlsSupport(engine: SSLEngine) extends PipelineStage[HasLogging, Command, Command, Event, Event] { + + override def apply(ctx: HasLogging) = + new PipePair[Command, Command, Event, Event] { + var pendingSends = immutable.Queue.empty[Send] + var inboundReceptacle: ByteBuffer = _ // holds incoming data that are too small to be decrypted yet + val log = ctx.getLogger + // TODO: should this be a ThreadLocal? + val tempBuf = ByteBuffer.allocate(SslTlsSupport.MaxPacketSize) + + override val commandPipeline = (cmd: Command) ⇒ cmd match { + case x: Tcp.Write ⇒ + if (pendingSends.isEmpty) encrypt(Send(x)) + else { + pendingSends = pendingSends enqueue Send(x) + Nil + } + + case x @ (Tcp.Close | Tcp.ConfirmedClose) ⇒ + log.debug("Closing SSLEngine due to reception of [{}]", x) + engine.closeOutbound() + closeEngine() :+ Right(x) + + case cmd ⇒ ctx.singleCommand(cmd) + } + + val eventPipeline = (evt: Event) ⇒ evt match { + case Tcp.Received(data) ⇒ + val buf = if (inboundReceptacle != null) { + try ByteBuffer.allocate(inboundReceptacle.remaining + data.length).put(inboundReceptacle) + finally inboundReceptacle = null + } else ByteBuffer allocate data.length + data copyToBuffer buf + buf.flip() + decrypt(buf) + + case x: Tcp.ConnectionClosed ⇒ + if (!engine.isOutboundDone) { + try engine.closeInbound() + catch { case e: SSLException ⇒ } // ignore warning about possible truncation attacks + } + ctx.singleEvent(x) + + case ev ⇒ ctx.singleEvent(ev) + } + + /** + * Encrypts the given buffers and dispatches the results as Tcp.Write commands. + */ + @tailrec + def encrypt(send: Send, fromQueue: Boolean = false, commands: Vector[Result] = Vector.empty): Vector[Result] = { + import send.{ ack, buffer } + + tempBuf.clear() + val ackDefinedAndPreContentLeft = ack != Tcp.NoAck && buffer.remaining > 0 + val result = engine.wrap(buffer, tempBuf) + val postContentLeft = buffer.remaining > 0 + tempBuf.flip() + + val nextCmds = + if (tempBuf.remaining > 0) { + val writeAck = if (ackDefinedAndPreContentLeft && !postContentLeft) ack else Tcp.NoAck + commands :+ Right(Tcp.Write(ByteString(tempBuf), writeAck)) + } else commands + + result.getStatus match { + case OK ⇒ result.getHandshakeStatus match { + case NOT_HANDSHAKING | FINISHED ⇒ + if (postContentLeft) encrypt(send, fromQueue, nextCmds) + else nextCmds + case NEED_WRAP ⇒ + encrypt(send, fromQueue, nextCmds) + case NEED_UNWRAP ⇒ + pendingSends = + if (fromQueue) send +: pendingSends // output coming from the queue needs to go to the front + else pendingSends enqueue send // "new" output to the back of the queue + nextCmds + case NEED_TASK ⇒ + runDelegatedTasks() + encrypt(send, fromQueue, nextCmds) + } + case CLOSED ⇒ + if (postContentLeft) { + log.warning("SSLEngine closed prematurely while sending") + nextCmds :+ Right(Tcp.Close) + } else nextCmds + case BUFFER_OVERFLOW ⇒ + throw new IllegalStateException("BUFFER_OVERFLOW: the SslBufferPool should make sure that buffers are never too small") + case BUFFER_UNDERFLOW ⇒ + throw new IllegalStateException("BUFFER_UNDERFLOW should never appear as a result of a wrap") + } + } + + /** + * Decrypts the given buffer and dispatches the results as Tcp.Received events. + */ + @tailrec + def decrypt(buffer: ByteBuffer, output: Vector[Result] = Vector.empty): Vector[Result] = { + tempBuf.clear() + val result = engine.unwrap(buffer, tempBuf) + tempBuf.flip() + + val nextOutput = + if (tempBuf.remaining > 0) output :+ Left(Tcp.Received(ByteString(tempBuf))) + else output + + result.getStatus match { + case OK ⇒ result.getHandshakeStatus match { + case NOT_HANDSHAKING | FINISHED ⇒ + if (buffer.remaining > 0) decrypt(buffer, nextOutput) + else nextOutput ++ processPendingSends(tempBuf) + case NEED_UNWRAP ⇒ + decrypt(buffer, nextOutput) + case NEED_WRAP ⇒ + val n = nextOutput ++ ( + if (pendingSends.isEmpty) encrypt(Send.Empty) + else processPendingSends(tempBuf)) + if (buffer.remaining > 0) decrypt(buffer, n) + else n + case NEED_TASK ⇒ + runDelegatedTasks() + decrypt(buffer, nextOutput) + } + case CLOSED ⇒ + if (!engine.isOutboundDone) { + log.warning("SSLEngine closed prematurely while receiving") + nextOutput :+ Right(Tcp.Close) + } else nextOutput + case BUFFER_UNDERFLOW ⇒ + inboundReceptacle = buffer // save buffer so we can append the next one to it + nextOutput + case BUFFER_OVERFLOW ⇒ + throw new IllegalStateException("BUFFER_OVERFLOW: the SslBufferPool should make sure that buffers are never too small") + } + } + + @tailrec + def runDelegatedTasks() { + val task = engine.getDelegatedTask + if (task != null) { + task.run() + runDelegatedTasks() + } + } + + @tailrec + def processPendingSends(tempBuf: ByteBuffer, commands: Vector[Result] = Vector.empty): Vector[Result] = { + if (pendingSends.nonEmpty) { + val next = pendingSends.head + pendingSends = pendingSends.tail + val nextCmds = commands ++ encrypt(next, fromQueue = true) + // it may be that the send we just passed to `encrypt` was put back into the queue because + // the SSLEngine demands a `NEED_UNWRAP`, in this case we want to stop looping + if (pendingSends.nonEmpty && pendingSends.head != next) + processPendingSends(tempBuf) + else nextCmds + } else commands + } + + @tailrec + def closeEngine(commands: Vector[Result] = Vector.empty): Vector[Result] = { + if (!engine.isOutboundDone) { + closeEngine(commands ++ encrypt(Send.Empty)) + } else commands + } + } + + private final class Send(val buffer: ByteBuffer, val ack: Any) + + private object Send { + val Empty = new Send(ByteBuffer wrap SslTlsSupport.EmptyByteArray, Tcp.NoAck) + def apply(write: Tcp.Write) = { + val buffer = ByteBuffer allocate write.data.length + write.data copyToBuffer buffer + buffer.flip() + new Send(buffer, write.ack) + } + } +} diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index ba5bc51529..2bdda1daf3 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -82,7 +82,7 @@ object Tcp extends ExtensionKey[TcpExt] { localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command case class Bind(handler: ActorRef, - endpoint: InetSocketAddress, + localAddress: InetSocketAddress, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 6c7f2e68fe..a78b11dc2e 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -44,19 +44,19 @@ private[io] class TcpListener(val selectorRouter: ActorRef, val socket = serverSocketChannel.socket options.foreach(_.beforeServerSocketBind(socket)) try { - socket.bind(endpoint, backlog) + socket.bind(localAddress, backlog) require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress], s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]") } catch { case NonFatal(e) ⇒ bindCommander ! bind.failureMessage - log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) + log.error(e, "Bind failed for TCP channel on endpoint [{}]", localAddress) context.stop(self) } serverSocketChannel } context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) - log.debug("Successfully bound to {}", endpoint) + log.debug("Successfully bound to {}", localAddress) override def supervisorStrategy = IO.connectionSupervisorStrategy @@ -78,10 +78,10 @@ private[io] class TcpListener(val selectorRouter: ActorRef, } case Unbind ⇒ - log.debug("Unbinding endpoint {}", endpoint) + log.debug("Unbinding endpoint {}", localAddress) channel.close() sender ! Unbound - log.debug("Unbound endpoint {}, stopping listener", endpoint) + log.debug("Unbound endpoint {}, stopping listener", localAddress) context.stop(self) } diff --git a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala new file mode 100644 index 0000000000..5d2cb7cedc --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import akka.actor.Actor +import akka.actor.ActorContext +import scala.beans.BeanProperty +import akka.actor.ActorRef +import scala.util.Success +import scala.util.Failure +import akka.actor.Terminated +import akka.actor.Props + +object TcpPipelineHandler { + + /** + * This class wraps up a pipeline with its external (i.e. “top”) command and + * event types and providing unique wrappers for sending commands and + * receiving events (nested and non-static classes which are specific to each + * instance of [[Init]]). All events emitted by the pipeline will be sent to + * the registered handler wrapped in an Event. + */ + abstract class Init[Ctx <: PipelineContext, Cmd, Evt](val stages: PipelineStage[Ctx, Cmd, Tcp.Command, Evt, Tcp.Event]) { + def makeContext(actorContext: ActorContext): Ctx + + def command(cmd: Cmd): Command = Command(cmd) + def event(evt: AnyRef): Evt = evt match { + case Event(evt) ⇒ evt + } + + final case class Command(@BeanProperty cmd: Cmd) + final case class Event(@BeanProperty evt: Evt) + } + + /** + * Wrapper around acknowledgements: if a Tcp.Write is generated which + * request an ACK then it is wrapped such that the ACK can flow back up the + * pipeline later, allowing you to use arbitrary ACK messages (not just + * subtypes of Tcp.Event). + */ + case class Ack(ack: Any) extends Tcp.Event + + /** + * This is a new Tcp.Command which the pipeline can emit to effect the + * sending a message to another actor. Using this instead of doing the send + * directly has the advantage that other pipeline stages can also see and + * possibly transform the send. + */ + case class Tell(receiver: ActorRef, msg: Any, sender: ActorRef) extends Tcp.Command + + /** + * Scala API: create [[Props]] for a pipeline handler + */ + def apply[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) = + Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler) + + /** + * Java API: create [[Props]] for a pipeline handler + */ + def create[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) = + Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler) + +} + +/** + * This actor wraps a pipeline and forwards commands and events between that + * one and a [[Tcp]] connection actor. In order to inject commands into the + * pipeline send an [[Init.Command]] message to this actor; events will be sent + * to the designated handler wrapped in [[Init.Event]] messages. + * + * When the designated handler terminates the TCP connection is aborted. When + * the connection actor terminates this actor terminates as well; the designated + * handler may want to watch this actor’s lifecycle. + * + * FIXME WARNING: + * + * This actor does currently not handle back-pressure from the TCP socket; it + * is meant only as a demonstration and will be fleshed out in full before the + * 2.2 release. + */ +class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt]( + init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], + connection: ActorRef, + handler: ActorRef) + extends Actor { + + import init._ + import TcpPipelineHandler._ + + // sign death pact + context watch connection + // watch so we can Close + context watch handler + + val ctx = init.makeContext(context) + + val pipes = PipelineFactory.buildWithSinkFunctions(ctx, init.stages)({ + case Success(cmd) ⇒ + cmd match { + case Tcp.Write(data, Tcp.NoAck) ⇒ connection ! cmd + case Tcp.Write(data, ack) ⇒ connection ! Tcp.Write(data, Ack(ack)) + case Tell(receiver, msg, sender) ⇒ receiver.tell(msg, sender) + case _ ⇒ connection ! cmd + } + case Failure(ex) ⇒ throw ex + }, { + case Success(evt) ⇒ handler ! Event(evt) + case Failure(ex) ⇒ throw ex + }) + + def receive = { + case Command(cmd) ⇒ pipes.injectCommand(cmd) + case evt: Tcp.Event ⇒ pipes.injectEvent(evt) + case Terminated(`handler`) ⇒ connection ! Tcp.Abort + } + +} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/io/japi/EchoManager.java b/akka-docs/rst/java/code/docs/io/japi/EchoManager.java index 5670723c90..6d13af7a74 100644 --- a/akka-docs/rst/java/code/docs/io/japi/EchoManager.java +++ b/akka-docs/rst/java/code/docs/io/japi/EchoManager.java @@ -58,7 +58,7 @@ public class EchoManager extends UntypedActor { } else if (msg instanceof Tcp.CommandFailed) { final CommandFailed failed = (CommandFailed) msg; if (failed.cmd() instanceof Bind) { - log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).endpoint()); + log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).localAddress()); getContext().stop(getSelf()); } else { log.warning("unknown command failed [{}]", failed.cmd()); diff --git a/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java b/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java new file mode 100644 index 0000000000..4d0cd6d79b --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.net.InetSocketAddress; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.io.AbstractPipelineContext; +import akka.io.HasLogging; +import akka.io.SslTlsSupport; +import akka.io.Tcp; +import akka.io.Tcp.Bound; +import akka.io.Tcp.Command; +import akka.io.Tcp.CommandFailed; +import akka.io.Tcp.Connected; +import akka.io.Tcp.Event; +import akka.io.Tcp.Received; +import akka.io.TcpMessage; +import akka.io.TcpPipelineHandler; +import akka.io.TcpPipelineHandler.Init; +import akka.io.ssl.SslTlsSupportSpec; +import akka.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; +import akka.util.ByteString; + +public class SslDocTest { + + static + //#client + public class SslClient extends UntypedActor { + final InetSocketAddress remote; + final SSLContext sslContext; + final ActorRef listener; + + final LoggingAdapter log = Logging + .getLogger(getContext().system(), getSelf()); + + public SslClient(InetSocketAddress remote, SSLContext sslContext, ActorRef listener) { + this.remote = remote; + this.sslContext = sslContext; + this.listener = listener; + + // open a connection to the remote TCP port + Tcp.get(getContext().system()).getManager() + .tell(TcpMessage.connect(remote), getSelf()); + } + + class Context extends AbstractPipelineContext implements HasLogging { + @Override + public LoggingAdapter getLogger() { + return log; + } + } + + Init init = null; + + @Override + public void onReceive(Object msg) { + if (msg instanceof CommandFailed) { + getContext().stop(getSelf()); + + } else if (msg instanceof Connected) { + // create a javax.net.ssl.SSLEngine for our peer in client mode + final SSLEngine engine = sslContext.createSSLEngine( + remote.getHostName(), remote.getPort()); + engine.setUseClientMode(true); + final SslTlsSupport ssl = new SslTlsSupport(engine); + + // set up the context for communicating with TcpPipelineHandler + init = new Init(ssl) { + @Override + public HasLogging makeContext(ActorContext ctx) { + return new Context(); + } + }; + // create handler for pipeline, setting ourselves as payload recipient + final ActorRef handler = getContext().actorOf( + TcpPipelineHandler.create(init, getSender(), getSelf())); + + // register the SSL handler with the connection + getSender().tell(TcpMessage.register(handler), getSelf()); + // and send a message across the SSL channel + handler.tell( + init.command(TcpMessage.write(ByteString.fromString("hello"))), + getSelf()); + + } else if (msg instanceof Init.Event) { + // unwrap TcpPipelineHandler’s event into a Tcp.Event + final Event recv = init.event(msg); + if (recv instanceof Received) { + // and inform someone of the received payload + listener.tell(((Received) recv).data().utf8String(), getSelf()); + } + } + } + } + //#client + + static + //#server + public class SslServer extends UntypedActor { + final SSLContext sslContext; + final ActorRef listener; + + final LoggingAdapter log = Logging + .getLogger(getContext().system(), getSelf()); + + public SslServer(SSLContext sslContext, ActorRef listener) { + this.sslContext = sslContext; + this.listener = listener; + + // bind to a socket, registering ourselves as incoming connection handler + Tcp.get(getContext().system()).getManager().tell( + TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100), + getSelf()); + } + + class Context extends AbstractPipelineContext implements HasLogging { + @Override + public LoggingAdapter getLogger() { + return log; + } + } + + Init init = null; + + @Override + public void onReceive(Object msg) { + if (msg instanceof CommandFailed) { + getContext().stop(getSelf()); + + } else if (msg instanceof Bound) { + listener.tell(msg, getSelf()); + + } else if (msg instanceof Connected) { + // create a javax.net.ssl.SSLEngine for our peer in server mode + final InetSocketAddress remote = ((Connected) msg).remoteAddress(); + final SSLEngine engine = sslContext.createSSLEngine( + remote.getHostName(), remote.getPort()); + engine.setUseClientMode(false); + final SslTlsSupport ssl = new SslTlsSupport(engine); + + // set up the context for communicating with TcpPipelineHandler + init = new Init(ssl) { + @Override + public HasLogging makeContext(ActorContext ctx) { + return new Context(); + } + }; + // create handler for pipeline, setting ourselves as payload recipient + final ActorRef handler = getContext().actorOf( + TcpPipelineHandler.create(init, getSender(), getSelf())); + + // register the SSL handler with the connection + getSender().tell(TcpMessage.register(handler), getSelf()); + + } else if (msg instanceof Init.Event) { + // unwrap TcpPipelineHandler’s event to get a Tcp.Event + final Event recv = init.event(msg); + if (recv instanceof Received) { + // inform someone of the received message + listener.tell(((Received) recv).data().utf8String(), getSelf()); + // and reply (sender is the SSL handler created above) + getSender().tell(init.command( + TcpMessage.write(ByteString.fromString("world"))), getSelf()); + } + } + } + } + //#server + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("IODocTest", AkkaSpec.testConf()); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateSslClient() { + new JavaTestKit(system) { + { + final SSLContext ctx = SslTlsSupportSpec.createSslContext("/keystore", "/truststore", "changeme"); + + final ActorRef server = system.actorOf(Props.create(SslServer.class, ctx, getRef())); + final Bound bound = expectMsgClass(Bound.class); + assert getLastSender() == server; + + final ActorRef client = system.actorOf(Props.create(SslClient.class, bound.localAddress(), ctx, getRef())); + expectMsgEquals("hello"); + assert getLastSender() == server; + expectMsgEquals("world"); + assert getLastSender() == client; + } + }; + } + +} diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index a96758ebd2..cfe68bb19c 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -582,6 +582,53 @@ The helper functions are very similar to the ACK-based case: .. includecode:: code/docs/io/japi/EchoHandler.java#helpers +Usage Example: TcpPipelineHandler and SSL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This example shows the different parts described above working together. Let us +first look at the SSL server: + +.. includecode:: code/docs/io/japi/SslDocTest.java#server + +The actor above binds to a local port and registers itself as the handler for +new connections. When a new connection comes in it will create a +:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly +for different setups, please refer to the JDK documentation) and wrap that in +an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``). +This single-stage pipeline will be driven by a :class:`TcpPipelineHandler` +actor which is also included in ``akka-actor``. In order to capture the generic +command and event types consumed and emitted by that actor we need to create a +wrapper—the nested :class:`Init` class—which also provides the +:meth:`makeContext` method for creating the pipeline context needed by the +supplied pipeline. With those things bundled up all that remains is creating a +:class:`TcpPipelineHandler` and registering that one as the recipient of +inbound traffic from the TCP connection. + +Since we instructed that handler actor to send any events which are emitted by +the SSL pipeline to ourselves, we can then just wait for the reception of the +decrypted payload messages, compute a response—just ``"world"`` in this +case—and reply by sending back a ``Tcp.Write``. It should be noted that +communication with the handler wraps commands and events in the inner types of +the ``init`` object in order to keep things well separated. To ease handling of +such path-dependent types there exist two helper methods, namely +:class:`Init.command` for creating a command and :class:`Init.event` for +unwrapping an event. + +.. warning:: + + The :class:`TcpPipelineHandler` does currently not handle back-pressure from + the TCP socket, i.e. it will just lose data when the kernel buffer + overflows. This will be fixed before Akka 2.2 final. + +Looking at the client side we see that not much needs to be changed: + +.. includecode:: code/docs/io/japi/SslDocTest.java#client + +Once the connection is established we again create a +:class:`TcpPipelineHandler` wrapping an :class:`SslTlsSupport` (in client mode) +and register that as the recipient of inbound traffic and ourselves as +recipient for the decrypted payload data. The we send a greeting to the server +and forward any replies to some ``listener`` actor. Using UDP --------- diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index c44fb4e9c1..bf965bfe54 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -601,6 +601,40 @@ The helper functions are very similar to the ACK-based case: .. includecode:: code/docs/io/EchoServer.scala#helpers +Usage Example: TcpPipelineHandler and SSL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This example shows the different parts described above working together: + +.. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#server + +The actor above is meant to be registered as the inbound connection handler for +a listen socket. When a new connection comes in it will create a +:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly +for different setups, please refer to the JDK documentation) and wrap that in +an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``). +This single-stage pipeline will be driven by a :class:`TcpPipelineHandler` +actor which is also included in ``akka-actor``. In order to capture the generic +command and event types consumed and emitted by that actor we need to create a +wrapper—the nested :class:`Init` class—which also provides the +:meth:`makeContext` method for creating the pipeline context needed by the +supplied pipeline. With those things bundled up all that remains is creating a +:class:`TcpPipelineHandler` and registering that one as the recipient of +inbound traffic from the TCP connection. + +Since we instructed that handler actor to send any events which are emitted by +the SSL pipeline to ourselves, we can then just switch behavior to receive the +decrypted payload message, compute a response and reply by sending back a +``Tcp.Write``. It should be noted that communication with the handler wraps +commands and events in the inner types of the ``init`` object in order to keep +things well separated. + +.. warning:: + + The :class:`TcpPipelineHandler` does currently not handle back-pressure from + the TCP socket, i.e. it will just lose data when the kernel buffer + overflows. This will be fixed before Akka 2.2 final. + Using UDP --------- diff --git a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala new file mode 100644 index 0000000000..c98bdba7cf --- /dev/null +++ b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala @@ -0,0 +1,261 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +// adapted from +// https://github.com/spray/spray/blob/eef5c4f54a0cadaf9e98298faf5b337f9adc04bb/spray-io-tests/src/test/scala/spray/io/SslTlsSupportSpec.scala +// original copyright notice follows: + +/* + * Copyright (C) 2011-2013 spray.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.io.ssl + +import java.io.{ BufferedWriter, OutputStreamWriter, InputStreamReader, BufferedReader } +import javax.net.ssl._ +import java.net.{ InetSocketAddress, SocketException } +import java.security.{ KeyStore, SecureRandom } +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.duration._ +import com.typesafe.config.{ ConfigFactory, Config } +import akka.actor._ +import akka.event.LoggingAdapter +import akka.testkit.{ TestProbe, AkkaSpec } +import akka.util.{ ByteString, Timeout } +import akka.io.{ IO, Tcp, PipelineContext } +import akka.TestUtils +import akka.event.Logging +import akka.io.TcpPipelineHandler +import akka.io.SslTlsSupport +import akka.io.HasLogging +import akka.remote.security.provider.AkkaProvider + +// TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies +class SslTlsSupportSpec extends AkkaSpec { + + implicit val timeOut: Timeout = 1.second + + val sslContext = SslTlsSupportSpec.createSslContext("/keystore", "/truststore", "changeme") + + "The SslTlsSupport" should { + + "work between a Java client and a Java server" in { + val server = new JavaSslServer + val client = new JavaSslClient(server.address) + client.run() + client.close() + server.close() + } + + "work between a akka client and a Java server" in { + val server = new JavaSslServer + val client = new AkkaSslClient(server.address) + client.run() + client.close() + server.close() + } + + "work between a Java client and a akka server" in { + val serverAddress = TestUtils.temporaryServerAddress() + val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this)) + val probe = TestProbe() + probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) + probe.expectMsgType[Tcp.Bound] + + val client = new JavaSslClient(serverAddress) + client.run() + client.close() + } + + "work between a akka client and a akka server" in { + val serverAddress = TestUtils.temporaryServerAddress() + val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this)) + val probe = TestProbe() + probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) + probe.expectMsgType[Tcp.Bound] + + val client = new AkkaSslClient(serverAddress) + client.run() + client.close() + } + } + + val counter = new AtomicInteger + + class AkkaSslClient(address: InetSocketAddress) { + + val probe = TestProbe() + probe.send(IO(Tcp), Tcp.Connect(address)) + + val connected = probe.expectMsgType[Tcp.Connected] + val connection = probe.sender + + val init = new TcpPipelineHandler.Init(new SslTlsSupport(sslEngine(connected.remoteAddress, client = true))) { + override def makeContext(actorContext: ActorContext): HasLogging = new HasLogging { + override def getLogger = system.log + } + } + import init._ + val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref), + "client" + counter.incrementAndGet()) + probe.send(connection, Tcp.Register(handler)) + + def run() { + probe.send(handler, Command(Tcp.Write(ByteString("3+4\n")))) + probe.expectMsg(Event(Tcp.Received(ByteString("7\n")))) + probe.send(handler, Command(Tcp.Write(ByteString("20+22\n")))) + probe.expectMsg(Event(Tcp.Received(ByteString("42\n")))) + } + + def close() { + probe.send(handler, Command(Tcp.Close)) + probe.expectMsgType[Event].evt match { + case _: Tcp.ConnectionClosed ⇒ true + } + TestUtils.verifyActorTermination(handler) + } + + } + + //#server + class AkkaSslServer extends Actor with ActorLogging { + + import Tcp.{ Connected, Received } + + def receive: Receive = { + case Connected(remote, _) ⇒ + val init = + new TcpPipelineHandler.Init( + new SslTlsSupport(sslEngine(remote, client = false))) { + override def makeContext(actorContext: ActorContext): HasLogging = + new HasLogging { + override def getLogger = log + } + } + import init._ + + val connection = sender + val handler = system.actorOf( + TcpPipelineHandler(init, sender, self), "server" + counter.incrementAndGet()) + + connection ! Tcp.Register(handler) + + context become { + case Event(Received(data)) ⇒ + val input = data.utf8String.dropRight(1) + log.debug("akka-io Server received {} from {}", input, sender) + val response = serverResponse(input) + sender ! Command(Tcp.Write(ByteString(response))) + log.debug("akka-io Server sent: {}", response.dropRight(1)) + } + } + } + //#server + + class JavaSslServer extends Thread { + val log: LoggingAdapter = Logging(system, getClass) + val address = TestUtils.temporaryServerAddress() + private val serverSocket = + sslContext.getServerSocketFactory.createServerSocket(address.getPort).asInstanceOf[SSLServerSocket] + @volatile private var socket: SSLSocket = _ + start() + + def close() { + serverSocket.close() + if (socket != null) socket.close() + } + + override def run() { + try { + socket = serverSocket.accept().asInstanceOf[SSLSocket] + val (reader, writer) = readerAndWriter(socket) + while (true) { + val line = reader.readLine() + log.debug("SSLServerSocket Server received: {}", line) + if (line == null) throw new SocketException("closed") + val result = serverResponse(line) + writer.write(result) + writer.flush() + log.debug("SSLServerSocket Server sent: {}", result.dropRight(1)) + } + } catch { + case _: SocketException ⇒ // expected during shutdown + } finally close() + } + } + + class JavaSslClient(address: InetSocketAddress) { + val socket = sslContext.getSocketFactory.createSocket(address.getHostName, address.getPort).asInstanceOf[SSLSocket] + val (reader, writer) = readerAndWriter(socket) + val log: LoggingAdapter = Logging(system, getClass) + + def run() { + write("1+2") + readLine() === "3" + write("12+24") + readLine() === "36" + } + + def write(string: String) { + writer.write(string + "\n") + writer.flush() + log.debug("SSLSocket Client sent: {}", string) + } + + def readLine() = { + val string = reader.readLine() + log.debug("SSLSocket Client received: {}", string) + string + } + + def close() { socket.close() } + } + + def readerAndWriter(socket: SSLSocket) = { + val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) + val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream)) + reader -> writer + } + + def serverResponse(input: String): String = input.split('+').map(_.toInt).reduceLeft(_ + _).toString + '\n' + + def sslEngine(address: InetSocketAddress, client: Boolean) = { + val engine = sslContext.createSSLEngine(address.getHostName, address.getPort) + engine.setUseClientMode(client) + engine + } + +} + +object SslTlsSupportSpec { + + def createSslContext(keyStoreResource: String, trustStoreResource: String, password: String): SSLContext = { + val keyStore = KeyStore.getInstance("jks") + keyStore.load(getClass.getResourceAsStream(keyStoreResource), password.toCharArray) + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(keyStore, password.toCharArray) + val trustStore = KeyStore.getInstance("jks") + trustStore.load(getClass.getResourceAsStream(trustStoreResource), password.toCharArray()) + val trustManagerFactory = TrustManagerFactory.getInstance("SunX509") + trustManagerFactory.init(trustStore) + val context = SSLContext.getInstance("SSL") + val rng = SecureRandom.getInstance("AES128CounterSecureRNG", AkkaProvider) + rng.nextInt() // if it stalls then it stalls here + context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, rng) + context + } + +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 1d99d70894..965d891dee 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -472,7 +472,7 @@ object AkkaBuild extends Build { id = "akka-docs", base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels, - remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries), + remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries), settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( sourceDirectory in Sphinx <<= baseDirectory / "rst", sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" },