add SslTlsSupport (ported from spray-io), see #3236

- also added TcpPipelineHandler for wrapping a pipeline
- added Java & Scala docs with a complete example
- test verify interop with standard blocking java SSL client and server
- test is placed in akka-remote to benefit from AkkaProvider for
  SecureRandom; should be moved into akka-actor eventually
This commit is contained in:
Roland 2013-04-19 11:19:31 +02:00
parent 3569886bbe
commit 9ba8b115ec
12 changed files with 938 additions and 11 deletions

View file

@ -14,6 +14,8 @@ trait BufferPool {
} }
/** /**
* INTERNAL API
*
* A buffer pool which keeps a free list of direct buffers of a specified default * A buffer pool which keeps a free list of direct buffers of a specified default
* size in a simple fixed size stack. * size in a simple fixed size stack.
* *

View file

@ -15,6 +15,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.collection.mutable.WrappedArray import scala.collection.mutable.WrappedArray
import scala.concurrent.duration.Deadline import scala.concurrent.duration.Deadline
import scala.beans.BeanProperty import scala.beans.BeanProperty
import akka.event.LoggingAdapter
/** /**
* Scala API: A pair of pipes, one for commands and one for events, plus a * Scala API: A pair of pipes, one for commands and one for events, plus a
@ -34,21 +35,22 @@ import scala.beans.BeanProperty
*/ */
trait PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { trait PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] {
type Mgmt = PartialFunction[AnyRef, Iterable[Either[EvtAbove, CmdBelow]]] type Result = Either[EvtAbove, CmdBelow]
type Mgmt = PartialFunction[AnyRef, Iterable[Result]]
/** /**
* The command pipeline transforms injected commands from the upper stage * The command pipeline transforms injected commands from the upper stage
* into commands for the stage below, but it can also emit events for the * into commands for the stage below, but it can also emit events for the
* upper stage. Any number of each can be generated. * upper stage. Any number of each can be generated.
*/ */
def commandPipeline: CmdAbove Iterable[Either[EvtAbove, CmdBelow]] def commandPipeline: CmdAbove Iterable[Result]
/** /**
* The event pipeline transforms injected event from the lower stage * The event pipeline transforms injected event from the lower stage
* into event for the stage above, but it can also emit commands for the * into event for the stage above, but it can also emit commands for the
* stage below. Any number of each can be generated. * stage below. Any number of each can be generated.
*/ */
def eventPipeline: EvtBelow Iterable[Either[EvtAbove, CmdBelow]] def eventPipeline: EvtBelow Iterable[Result]
/** /**
* The management port allows sending broadcast messages to all stages * The management port allows sending broadcast messages to all stages
@ -817,6 +819,17 @@ class LengthFieldFrame(maxSize: Int,
} }
//#length-field-frame //#length-field-frame
/**
* This trait expresses that the pipelines context needs to provide a logging
* facility.
*/
trait HasLogging extends PipelineContext {
/**
* Retrieve the [[LoggingAdapter]] for this pipelines context.
*/
def getLogger: LoggingAdapter
}
//#tick-generator //#tick-generator
/** /**
* This trait expresses that the pipelines context needs to live within an * This trait expresses that the pipelines context needs to live within an

View file

@ -0,0 +1,233 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
// adapted from
// https://github.com/spray/spray/blob/eef5c4f54a0cadaf9e98298faf5b337f9adc04bb/spray-io/src/main/scala/spray/io/SslTlsSupport.scala
// original copyright notice follows:
/*
* Copyright (C) 2011-2013 spray.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.io
import java.nio.ByteBuffer
import javax.net.ssl.{ SSLContext, SSLException, SSLEngineResult, SSLEngine }
import javax.net.ssl.SSLEngineResult.HandshakeStatus._
import javax.net.ssl.SSLEngineResult.Status._
import scala.collection.immutable
import scala.annotation.tailrec
import akka.util.ByteString
import Tcp.{ Command, Event }
object SslTlsSupport {
// we are using Nettys default values:
// 16665 + 1024 (room for compressed data) + 1024 (for OpenJDK compatibility)
private final val MaxPacketSize = 16665 + 2048
private final val EmptyByteArray = new Array[Byte](0)
}
/**
* This pipeline stage implements SSL / TLS support, using an externally
* configured [[SSLEngine]]. It operates on the level of [[Tcp.Event]] and
* [[Tcp.Command]] messages, which means that it will typically be one of
* the lowest stages in a protocol stack.
*
* Each instance of this stage has a scratch [[ByteBuffer]] of approx. 18kiB
* allocated which is used by the SSLEngine.
*/
class SslTlsSupport(engine: SSLEngine) extends PipelineStage[HasLogging, Command, Command, Event, Event] {
override def apply(ctx: HasLogging) =
new PipePair[Command, Command, Event, Event] {
var pendingSends = immutable.Queue.empty[Send]
var inboundReceptacle: ByteBuffer = _ // holds incoming data that are too small to be decrypted yet
val log = ctx.getLogger
// TODO: should this be a ThreadLocal?
val tempBuf = ByteBuffer.allocate(SslTlsSupport.MaxPacketSize)
override val commandPipeline = (cmd: Command) cmd match {
case x: Tcp.Write
if (pendingSends.isEmpty) encrypt(Send(x))
else {
pendingSends = pendingSends enqueue Send(x)
Nil
}
case x @ (Tcp.Close | Tcp.ConfirmedClose)
log.debug("Closing SSLEngine due to reception of [{}]", x)
engine.closeOutbound()
closeEngine() :+ Right(x)
case cmd ctx.singleCommand(cmd)
}
val eventPipeline = (evt: Event) evt match {
case Tcp.Received(data)
val buf = if (inboundReceptacle != null) {
try ByteBuffer.allocate(inboundReceptacle.remaining + data.length).put(inboundReceptacle)
finally inboundReceptacle = null
} else ByteBuffer allocate data.length
data copyToBuffer buf
buf.flip()
decrypt(buf)
case x: Tcp.ConnectionClosed
if (!engine.isOutboundDone) {
try engine.closeInbound()
catch { case e: SSLException } // ignore warning about possible truncation attacks
}
ctx.singleEvent(x)
case ev ctx.singleEvent(ev)
}
/**
* Encrypts the given buffers and dispatches the results as Tcp.Write commands.
*/
@tailrec
def encrypt(send: Send, fromQueue: Boolean = false, commands: Vector[Result] = Vector.empty): Vector[Result] = {
import send.{ ack, buffer }
tempBuf.clear()
val ackDefinedAndPreContentLeft = ack != Tcp.NoAck && buffer.remaining > 0
val result = engine.wrap(buffer, tempBuf)
val postContentLeft = buffer.remaining > 0
tempBuf.flip()
val nextCmds =
if (tempBuf.remaining > 0) {
val writeAck = if (ackDefinedAndPreContentLeft && !postContentLeft) ack else Tcp.NoAck
commands :+ Right(Tcp.Write(ByteString(tempBuf), writeAck))
} else commands
result.getStatus match {
case OK result.getHandshakeStatus match {
case NOT_HANDSHAKING | FINISHED
if (postContentLeft) encrypt(send, fromQueue, nextCmds)
else nextCmds
case NEED_WRAP
encrypt(send, fromQueue, nextCmds)
case NEED_UNWRAP
pendingSends =
if (fromQueue) send +: pendingSends // output coming from the queue needs to go to the front
else pendingSends enqueue send // "new" output to the back of the queue
nextCmds
case NEED_TASK
runDelegatedTasks()
encrypt(send, fromQueue, nextCmds)
}
case CLOSED
if (postContentLeft) {
log.warning("SSLEngine closed prematurely while sending")
nextCmds :+ Right(Tcp.Close)
} else nextCmds
case BUFFER_OVERFLOW
throw new IllegalStateException("BUFFER_OVERFLOW: the SslBufferPool should make sure that buffers are never too small")
case BUFFER_UNDERFLOW
throw new IllegalStateException("BUFFER_UNDERFLOW should never appear as a result of a wrap")
}
}
/**
* Decrypts the given buffer and dispatches the results as Tcp.Received events.
*/
@tailrec
def decrypt(buffer: ByteBuffer, output: Vector[Result] = Vector.empty): Vector[Result] = {
tempBuf.clear()
val result = engine.unwrap(buffer, tempBuf)
tempBuf.flip()
val nextOutput =
if (tempBuf.remaining > 0) output :+ Left(Tcp.Received(ByteString(tempBuf)))
else output
result.getStatus match {
case OK result.getHandshakeStatus match {
case NOT_HANDSHAKING | FINISHED
if (buffer.remaining > 0) decrypt(buffer, nextOutput)
else nextOutput ++ processPendingSends(tempBuf)
case NEED_UNWRAP
decrypt(buffer, nextOutput)
case NEED_WRAP
val n = nextOutput ++ (
if (pendingSends.isEmpty) encrypt(Send.Empty)
else processPendingSends(tempBuf))
if (buffer.remaining > 0) decrypt(buffer, n)
else n
case NEED_TASK
runDelegatedTasks()
decrypt(buffer, nextOutput)
}
case CLOSED
if (!engine.isOutboundDone) {
log.warning("SSLEngine closed prematurely while receiving")
nextOutput :+ Right(Tcp.Close)
} else nextOutput
case BUFFER_UNDERFLOW
inboundReceptacle = buffer // save buffer so we can append the next one to it
nextOutput
case BUFFER_OVERFLOW
throw new IllegalStateException("BUFFER_OVERFLOW: the SslBufferPool should make sure that buffers are never too small")
}
}
@tailrec
def runDelegatedTasks() {
val task = engine.getDelegatedTask
if (task != null) {
task.run()
runDelegatedTasks()
}
}
@tailrec
def processPendingSends(tempBuf: ByteBuffer, commands: Vector[Result] = Vector.empty): Vector[Result] = {
if (pendingSends.nonEmpty) {
val next = pendingSends.head
pendingSends = pendingSends.tail
val nextCmds = commands ++ encrypt(next, fromQueue = true)
// it may be that the send we just passed to `encrypt` was put back into the queue because
// the SSLEngine demands a `NEED_UNWRAP`, in this case we want to stop looping
if (pendingSends.nonEmpty && pendingSends.head != next)
processPendingSends(tempBuf)
else nextCmds
} else commands
}
@tailrec
def closeEngine(commands: Vector[Result] = Vector.empty): Vector[Result] = {
if (!engine.isOutboundDone) {
closeEngine(commands ++ encrypt(Send.Empty))
} else commands
}
}
private final class Send(val buffer: ByteBuffer, val ack: Any)
private object Send {
val Empty = new Send(ByteBuffer wrap SslTlsSupport.EmptyByteArray, Tcp.NoAck)
def apply(write: Tcp.Write) = {
val buffer = ByteBuffer allocate write.data.length
write.data copyToBuffer buffer
buffer.flip()
new Send(buffer, write.ack)
}
}
}

View file

@ -82,7 +82,7 @@ object Tcp extends ExtensionKey[TcpExt] {
localAddress: Option[InetSocketAddress] = None, localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil) extends Command options: immutable.Traversable[SocketOption] = Nil) extends Command
case class Bind(handler: ActorRef, case class Bind(handler: ActorRef,
endpoint: InetSocketAddress, localAddress: InetSocketAddress,
backlog: Int = 100, backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil) extends Command options: immutable.Traversable[SocketOption] = Nil) extends Command

View file

@ -44,19 +44,19 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
val socket = serverSocketChannel.socket val socket = serverSocketChannel.socket
options.foreach(_.beforeServerSocketBind(socket)) options.foreach(_.beforeServerSocketBind(socket))
try { try {
socket.bind(endpoint, backlog) socket.bind(localAddress, backlog)
require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress], require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress],
s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]") s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]")
} catch { } catch {
case NonFatal(e) case NonFatal(e)
bindCommander ! bind.failureMessage bindCommander ! bind.failureMessage
log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) log.error(e, "Bind failed for TCP channel on endpoint [{}]", localAddress)
context.stop(self) context.stop(self)
} }
serverSocketChannel serverSocketChannel
} }
context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT)
log.debug("Successfully bound to {}", endpoint) log.debug("Successfully bound to {}", localAddress)
override def supervisorStrategy = IO.connectionSupervisorStrategy override def supervisorStrategy = IO.connectionSupervisorStrategy
@ -78,10 +78,10 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
} }
case Unbind case Unbind
log.debug("Unbinding endpoint {}", endpoint) log.debug("Unbinding endpoint {}", localAddress)
channel.close() channel.close()
sender ! Unbound sender ! Unbound
log.debug("Unbound endpoint {}, stopping listener", endpoint) log.debug("Unbound endpoint {}, stopping listener", localAddress)
context.stop(self) context.stop(self)
} }

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import akka.actor.Actor
import akka.actor.ActorContext
import scala.beans.BeanProperty
import akka.actor.ActorRef
import scala.util.Success
import scala.util.Failure
import akka.actor.Terminated
import akka.actor.Props
object TcpPipelineHandler {
/**
* This class wraps up a pipeline with its external (i.e. top) command and
* event types and providing unique wrappers for sending commands and
* receiving events (nested and non-static classes which are specific to each
* instance of [[Init]]). All events emitted by the pipeline will be sent to
* the registered handler wrapped in an Event.
*/
abstract class Init[Ctx <: PipelineContext, Cmd, Evt](val stages: PipelineStage[Ctx, Cmd, Tcp.Command, Evt, Tcp.Event]) {
def makeContext(actorContext: ActorContext): Ctx
def command(cmd: Cmd): Command = Command(cmd)
def event(evt: AnyRef): Evt = evt match {
case Event(evt) evt
}
final case class Command(@BeanProperty cmd: Cmd)
final case class Event(@BeanProperty evt: Evt)
}
/**
* Wrapper around acknowledgements: if a Tcp.Write is generated which
* request an ACK then it is wrapped such that the ACK can flow back up the
* pipeline later, allowing you to use arbitrary ACK messages (not just
* subtypes of Tcp.Event).
*/
case class Ack(ack: Any) extends Tcp.Event
/**
* This is a new Tcp.Command which the pipeline can emit to effect the
* sending a message to another actor. Using this instead of doing the send
* directly has the advantage that other pipeline stages can also see and
* possibly transform the send.
*/
case class Tell(receiver: ActorRef, msg: Any, sender: ActorRef) extends Tcp.Command
/**
* Scala API: create [[Props]] for a pipeline handler
*/
def apply[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) =
Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler)
/**
* Java API: create [[Props]] for a pipeline handler
*/
def create[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) =
Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler)
}
/**
* This actor wraps a pipeline and forwards commands and events between that
* one and a [[Tcp]] connection actor. In order to inject commands into the
* pipeline send an [[Init.Command]] message to this actor; events will be sent
* to the designated handler wrapped in [[Init.Event]] messages.
*
* When the designated handler terminates the TCP connection is aborted. When
* the connection actor terminates this actor terminates as well; the designated
* handler may want to watch this actors lifecycle.
*
* <b>FIXME WARNING:</b>
*
* This actor does currently not handle back-pressure from the TCP socket; it
* is meant only as a demonstration and will be fleshed out in full before the
* 2.2 release.
*/
class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt](
init: TcpPipelineHandler.Init[Ctx, Cmd, Evt],
connection: ActorRef,
handler: ActorRef)
extends Actor {
import init._
import TcpPipelineHandler._
// sign death pact
context watch connection
// watch so we can Close
context watch handler
val ctx = init.makeContext(context)
val pipes = PipelineFactory.buildWithSinkFunctions(ctx, init.stages)({
case Success(cmd)
cmd match {
case Tcp.Write(data, Tcp.NoAck) connection ! cmd
case Tcp.Write(data, ack) connection ! Tcp.Write(data, Ack(ack))
case Tell(receiver, msg, sender) receiver.tell(msg, sender)
case _ connection ! cmd
}
case Failure(ex) throw ex
}, {
case Success(evt) handler ! Event(evt)
case Failure(ex) throw ex
})
def receive = {
case Command(cmd) pipes.injectCommand(cmd)
case evt: Tcp.Event pipes.injectEvent(evt)
case Terminated(`handler`) connection ! Tcp.Abort
}
}

View file

@ -58,7 +58,7 @@ public class EchoManager extends UntypedActor {
} else if (msg instanceof Tcp.CommandFailed) { } else if (msg instanceof Tcp.CommandFailed) {
final CommandFailed failed = (CommandFailed) msg; final CommandFailed failed = (CommandFailed) msg;
if (failed.cmd() instanceof Bind) { if (failed.cmd() instanceof Bind) {
log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).endpoint()); log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).localAddress());
getContext().stop(getSelf()); getContext().stop(getSelf());
} else { } else {
log.warning("unknown command failed [{}]", failed.cmd()); log.warning("unknown command failed [{}]", failed.cmd());

View file

@ -0,0 +1,218 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.net.InetSocketAddress;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.io.AbstractPipelineContext;
import akka.io.HasLogging;
import akka.io.SslTlsSupport;
import akka.io.Tcp;
import akka.io.Tcp.Bound;
import akka.io.Tcp.Command;
import akka.io.Tcp.CommandFailed;
import akka.io.Tcp.Connected;
import akka.io.Tcp.Event;
import akka.io.Tcp.Received;
import akka.io.TcpMessage;
import akka.io.TcpPipelineHandler;
import akka.io.TcpPipelineHandler.Init;
import akka.io.ssl.SslTlsSupportSpec;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
public class SslDocTest {
static
//#client
public class SslClient extends UntypedActor {
final InetSocketAddress remote;
final SSLContext sslContext;
final ActorRef listener;
final LoggingAdapter log = Logging
.getLogger(getContext().system(), getSelf());
public SslClient(InetSocketAddress remote, SSLContext sslContext, ActorRef listener) {
this.remote = remote;
this.sslContext = sslContext;
this.listener = listener;
// open a connection to the remote TCP port
Tcp.get(getContext().system()).getManager()
.tell(TcpMessage.connect(remote), getSelf());
}
class Context extends AbstractPipelineContext implements HasLogging {
@Override
public LoggingAdapter getLogger() {
return log;
}
}
Init<HasLogging, Command, Event> init = null;
@Override
public void onReceive(Object msg) {
if (msg instanceof CommandFailed) {
getContext().stop(getSelf());
} else if (msg instanceof Connected) {
// create a javax.net.ssl.SSLEngine for our peer in client mode
final SSLEngine engine = sslContext.createSSLEngine(
remote.getHostName(), remote.getPort());
engine.setUseClientMode(true);
final SslTlsSupport ssl = new SslTlsSupport(engine);
// set up the context for communicating with TcpPipelineHandler
init = new Init<HasLogging, Command, Event>(ssl) {
@Override
public HasLogging makeContext(ActorContext ctx) {
return new Context();
}
};
// create handler for pipeline, setting ourselves as payload recipient
final ActorRef handler = getContext().actorOf(
TcpPipelineHandler.create(init, getSender(), getSelf()));
// register the SSL handler with the connection
getSender().tell(TcpMessage.register(handler), getSelf());
// and send a message across the SSL channel
handler.tell(
init.command(TcpMessage.write(ByteString.fromString("hello"))),
getSelf());
} else if (msg instanceof Init.Event) {
// unwrap TcpPipelineHandlers event into a Tcp.Event
final Event recv = init.event(msg);
if (recv instanceof Received) {
// and inform someone of the received payload
listener.tell(((Received) recv).data().utf8String(), getSelf());
}
}
}
}
//#client
static
//#server
public class SslServer extends UntypedActor {
final SSLContext sslContext;
final ActorRef listener;
final LoggingAdapter log = Logging
.getLogger(getContext().system(), getSelf());
public SslServer(SSLContext sslContext, ActorRef listener) {
this.sslContext = sslContext;
this.listener = listener;
// bind to a socket, registering ourselves as incoming connection handler
Tcp.get(getContext().system()).getManager().tell(
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100),
getSelf());
}
class Context extends AbstractPipelineContext implements HasLogging {
@Override
public LoggingAdapter getLogger() {
return log;
}
}
Init<HasLogging, Command, Event> init = null;
@Override
public void onReceive(Object msg) {
if (msg instanceof CommandFailed) {
getContext().stop(getSelf());
} else if (msg instanceof Bound) {
listener.tell(msg, getSelf());
} else if (msg instanceof Connected) {
// create a javax.net.ssl.SSLEngine for our peer in server mode
final InetSocketAddress remote = ((Connected) msg).remoteAddress();
final SSLEngine engine = sslContext.createSSLEngine(
remote.getHostName(), remote.getPort());
engine.setUseClientMode(false);
final SslTlsSupport ssl = new SslTlsSupport(engine);
// set up the context for communicating with TcpPipelineHandler
init = new Init<HasLogging, Command, Event>(ssl) {
@Override
public HasLogging makeContext(ActorContext ctx) {
return new Context();
}
};
// create handler for pipeline, setting ourselves as payload recipient
final ActorRef handler = getContext().actorOf(
TcpPipelineHandler.create(init, getSender(), getSelf()));
// register the SSL handler with the connection
getSender().tell(TcpMessage.register(handler), getSelf());
} else if (msg instanceof Init.Event) {
// unwrap TcpPipelineHandlers event to get a Tcp.Event
final Event recv = init.event(msg);
if (recv instanceof Received) {
// inform someone of the received message
listener.tell(((Received) recv).data().utf8String(), getSelf());
// and reply (sender is the SSL handler created above)
getSender().tell(init.command(
TcpMessage.write(ByteString.fromString("world"))), getSelf());
}
}
}
}
//#server
private static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("IODocTest", AkkaSpec.testConf());
}
@AfterClass
public static void teardown() {
system.shutdown();
}
@Test
public void demonstrateSslClient() {
new JavaTestKit(system) {
{
final SSLContext ctx = SslTlsSupportSpec.createSslContext("/keystore", "/truststore", "changeme");
final ActorRef server = system.actorOf(Props.create(SslServer.class, ctx, getRef()));
final Bound bound = expectMsgClass(Bound.class);
assert getLastSender() == server;
final ActorRef client = system.actorOf(Props.create(SslClient.class, bound.localAddress(), ctx, getRef()));
expectMsgEquals("hello");
assert getLastSender() == server;
expectMsgEquals("world");
assert getLastSender() == client;
}
};
}
}

View file

@ -582,6 +582,53 @@ The helper functions are very similar to the ACK-based case:
.. includecode:: code/docs/io/japi/EchoHandler.java#helpers .. includecode:: code/docs/io/japi/EchoHandler.java#helpers
Usage Example: TcpPipelineHandler and SSL
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This example shows the different parts described above working together. Let us
first look at the SSL server:
.. includecode:: code/docs/io/japi/SslDocTest.java#server
The actor above binds to a local port and registers itself as the handler for
new connections. When a new connection comes in it will create a
:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly
for different setups, please refer to the JDK documentation) and wrap that in
an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``).
This single-stage pipeline will be driven by a :class:`TcpPipelineHandler`
actor which is also included in ``akka-actor``. In order to capture the generic
command and event types consumed and emitted by that actor we need to create a
wrapper—the nested :class:`Init` class—which also provides the
:meth:`makeContext` method for creating the pipeline context needed by the
supplied pipeline. With those things bundled up all that remains is creating a
:class:`TcpPipelineHandler` and registering that one as the recipient of
inbound traffic from the TCP connection.
Since we instructed that handler actor to send any events which are emitted by
the SSL pipeline to ourselves, we can then just wait for the reception of the
decrypted payload messages, compute a response—just ``"world"`` in this
case—and reply by sending back a ``Tcp.Write``. It should be noted that
communication with the handler wraps commands and events in the inner types of
the ``init`` object in order to keep things well separated. To ease handling of
such path-dependent types there exist two helper methods, namely
:class:`Init.command` for creating a command and :class:`Init.event` for
unwrapping an event.
.. warning::
The :class:`TcpPipelineHandler` does currently not handle back-pressure from
the TCP socket, i.e. it will just lose data when the kernel buffer
overflows. This will be fixed before Akka 2.2 final.
Looking at the client side we see that not much needs to be changed:
.. includecode:: code/docs/io/japi/SslDocTest.java#client
Once the connection is established we again create a
:class:`TcpPipelineHandler` wrapping an :class:`SslTlsSupport` (in client mode)
and register that as the recipient of inbound traffic and ourselves as
recipient for the decrypted payload data. The we send a greeting to the server
and forward any replies to some ``listener`` actor.
Using UDP Using UDP
--------- ---------

View file

@ -601,6 +601,40 @@ The helper functions are very similar to the ACK-based case:
.. includecode:: code/docs/io/EchoServer.scala#helpers .. includecode:: code/docs/io/EchoServer.scala#helpers
Usage Example: TcpPipelineHandler and SSL
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This example shows the different parts described above working together:
.. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#server
The actor above is meant to be registered as the inbound connection handler for
a listen socket. When a new connection comes in it will create a
:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly
for different setups, please refer to the JDK documentation) and wrap that in
an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``).
This single-stage pipeline will be driven by a :class:`TcpPipelineHandler`
actor which is also included in ``akka-actor``. In order to capture the generic
command and event types consumed and emitted by that actor we need to create a
wrapper—the nested :class:`Init` class—which also provides the
:meth:`makeContext` method for creating the pipeline context needed by the
supplied pipeline. With those things bundled up all that remains is creating a
:class:`TcpPipelineHandler` and registering that one as the recipient of
inbound traffic from the TCP connection.
Since we instructed that handler actor to send any events which are emitted by
the SSL pipeline to ourselves, we can then just switch behavior to receive the
decrypted payload message, compute a response and reply by sending back a
``Tcp.Write``. It should be noted that communication with the handler wraps
commands and events in the inner types of the ``init`` object in order to keep
things well separated.
.. warning::
The :class:`TcpPipelineHandler` does currently not handle back-pressure from
the TCP socket, i.e. it will just lose data when the kernel buffer
overflows. This will be fixed before Akka 2.2 final.
Using UDP Using UDP
--------- ---------

View file

@ -0,0 +1,261 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
// adapted from
// https://github.com/spray/spray/blob/eef5c4f54a0cadaf9e98298faf5b337f9adc04bb/spray-io-tests/src/test/scala/spray/io/SslTlsSupportSpec.scala
// original copyright notice follows:
/*
* Copyright (C) 2011-2013 spray.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.io.ssl
import java.io.{ BufferedWriter, OutputStreamWriter, InputStreamReader, BufferedReader }
import javax.net.ssl._
import java.net.{ InetSocketAddress, SocketException }
import java.security.{ KeyStore, SecureRandom }
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor._
import akka.event.LoggingAdapter
import akka.testkit.{ TestProbe, AkkaSpec }
import akka.util.{ ByteString, Timeout }
import akka.io.{ IO, Tcp, PipelineContext }
import akka.TestUtils
import akka.event.Logging
import akka.io.TcpPipelineHandler
import akka.io.SslTlsSupport
import akka.io.HasLogging
import akka.remote.security.provider.AkkaProvider
// TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies
class SslTlsSupportSpec extends AkkaSpec {
implicit val timeOut: Timeout = 1.second
val sslContext = SslTlsSupportSpec.createSslContext("/keystore", "/truststore", "changeme")
"The SslTlsSupport" should {
"work between a Java client and a Java server" in {
val server = new JavaSslServer
val client = new JavaSslClient(server.address)
client.run()
client.close()
server.close()
}
"work between a akka client and a Java server" in {
val server = new JavaSslServer
val client = new AkkaSslClient(server.address)
client.run()
client.close()
server.close()
}
"work between a Java client and a akka server" in {
val serverAddress = TestUtils.temporaryServerAddress()
val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this))
val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress))
probe.expectMsgType[Tcp.Bound]
val client = new JavaSslClient(serverAddress)
client.run()
client.close()
}
"work between a akka client and a akka server" in {
val serverAddress = TestUtils.temporaryServerAddress()
val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this))
val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress))
probe.expectMsgType[Tcp.Bound]
val client = new AkkaSslClient(serverAddress)
client.run()
client.close()
}
}
val counter = new AtomicInteger
class AkkaSslClient(address: InetSocketAddress) {
val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Connect(address))
val connected = probe.expectMsgType[Tcp.Connected]
val connection = probe.sender
val init = new TcpPipelineHandler.Init(new SslTlsSupport(sslEngine(connected.remoteAddress, client = true))) {
override def makeContext(actorContext: ActorContext): HasLogging = new HasLogging {
override def getLogger = system.log
}
}
import init._
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref),
"client" + counter.incrementAndGet())
probe.send(connection, Tcp.Register(handler))
def run() {
probe.send(handler, Command(Tcp.Write(ByteString("3+4\n"))))
probe.expectMsg(Event(Tcp.Received(ByteString("7\n"))))
probe.send(handler, Command(Tcp.Write(ByteString("20+22\n"))))
probe.expectMsg(Event(Tcp.Received(ByteString("42\n"))))
}
def close() {
probe.send(handler, Command(Tcp.Close))
probe.expectMsgType[Event].evt match {
case _: Tcp.ConnectionClosed true
}
TestUtils.verifyActorTermination(handler)
}
}
//#server
class AkkaSslServer extends Actor with ActorLogging {
import Tcp.{ Connected, Received }
def receive: Receive = {
case Connected(remote, _)
val init =
new TcpPipelineHandler.Init(
new SslTlsSupport(sslEngine(remote, client = false))) {
override def makeContext(actorContext: ActorContext): HasLogging =
new HasLogging {
override def getLogger = log
}
}
import init._
val connection = sender
val handler = system.actorOf(
TcpPipelineHandler(init, sender, self), "server" + counter.incrementAndGet())
connection ! Tcp.Register(handler)
context become {
case Event(Received(data))
val input = data.utf8String.dropRight(1)
log.debug("akka-io Server received {} from {}", input, sender)
val response = serverResponse(input)
sender ! Command(Tcp.Write(ByteString(response)))
log.debug("akka-io Server sent: {}", response.dropRight(1))
}
}
}
//#server
class JavaSslServer extends Thread {
val log: LoggingAdapter = Logging(system, getClass)
val address = TestUtils.temporaryServerAddress()
private val serverSocket =
sslContext.getServerSocketFactory.createServerSocket(address.getPort).asInstanceOf[SSLServerSocket]
@volatile private var socket: SSLSocket = _
start()
def close() {
serverSocket.close()
if (socket != null) socket.close()
}
override def run() {
try {
socket = serverSocket.accept().asInstanceOf[SSLSocket]
val (reader, writer) = readerAndWriter(socket)
while (true) {
val line = reader.readLine()
log.debug("SSLServerSocket Server received: {}", line)
if (line == null) throw new SocketException("closed")
val result = serverResponse(line)
writer.write(result)
writer.flush()
log.debug("SSLServerSocket Server sent: {}", result.dropRight(1))
}
} catch {
case _: SocketException // expected during shutdown
} finally close()
}
}
class JavaSslClient(address: InetSocketAddress) {
val socket = sslContext.getSocketFactory.createSocket(address.getHostName, address.getPort).asInstanceOf[SSLSocket]
val (reader, writer) = readerAndWriter(socket)
val log: LoggingAdapter = Logging(system, getClass)
def run() {
write("1+2")
readLine() === "3"
write("12+24")
readLine() === "36"
}
def write(string: String) {
writer.write(string + "\n")
writer.flush()
log.debug("SSLSocket Client sent: {}", string)
}
def readLine() = {
val string = reader.readLine()
log.debug("SSLSocket Client received: {}", string)
string
}
def close() { socket.close() }
}
def readerAndWriter(socket: SSLSocket) = {
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream))
reader -> writer
}
def serverResponse(input: String): String = input.split('+').map(_.toInt).reduceLeft(_ + _).toString + '\n'
def sslEngine(address: InetSocketAddress, client: Boolean) = {
val engine = sslContext.createSSLEngine(address.getHostName, address.getPort)
engine.setUseClientMode(client)
engine
}
}
object SslTlsSupportSpec {
def createSslContext(keyStoreResource: String, trustStoreResource: String, password: String): SSLContext = {
val keyStore = KeyStore.getInstance("jks")
keyStore.load(getClass.getResourceAsStream(keyStoreResource), password.toCharArray)
val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
keyManagerFactory.init(keyStore, password.toCharArray)
val trustStore = KeyStore.getInstance("jks")
trustStore.load(getClass.getResourceAsStream(trustStoreResource), password.toCharArray())
val trustManagerFactory = TrustManagerFactory.getInstance("SunX509")
trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("SSL")
val rng = SecureRandom.getInstance("AES128CounterSecureRNG", AkkaProvider)
rng.nextInt() // if it stalls then it stalls here
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, rng)
context
}
}

View file

@ -472,7 +472,7 @@ object AkkaBuild extends Build {
id = "akka-docs", id = "akka-docs",
base = file("akka-docs"), base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels, dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels,
remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries), remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries),
settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
sourceDirectory in Sphinx <<= baseDirectory / "rst", sourceDirectory in Sphinx <<= baseDirectory / "rst",
sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" }, sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" },