splitting up TLS APIs

This commit is contained in:
Endre Sándor Varga 2016-02-16 18:19:30 +01:00 committed by Roland Kuhn
parent 650e94ba30
commit 4ba4c27b0f
34 changed files with 651 additions and 544 deletions

4
.gitignore vendored
View file

@ -80,5 +80,9 @@ worker*.log
*-shim.sbt *-shim.sbt
test-output test-output
# test output of QuickStart guide
factorials.txt
factorial2.txt
# Default sigar library extract location. # Default sigar library extract location.
native/ native/

View file

@ -185,6 +185,12 @@ The old behaviour can be achieved by explicitly draining the entity:
response.entity().getDataBytes().runWith(Sink.ignore()) response.entity().getDataBytes().runWith(Sink.ignore())
SslTls has been renamed to TLS and moved
----------------------------------------
The DSL to access a TLS (or SSL) :class:`BidiFlow` have now split between the ``javadsl`` and ``scaladsl`` packages and
have been renamed to :class:`TLS`. Common option types (closing modes, authentication modes, etc.) have been moved to
the top level ``stream`` package, and the common message types are accessible in the class :class:`akka.stream.TLSProtocol`
Websocket now consistently named WebSocket Websocket now consistently named WebSocket
------------------------------------------ ------------------------------------------

View file

@ -190,6 +190,13 @@ Replace with::
http.cachedHostConnectionPool(toHostHttps("akka.io", 8081), materializer()); http.cachedHostConnectionPool(toHostHttps("akka.io", 8081), materializer());
http.cachedHostConnectionPool(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext), materializer()); http.cachedHostConnectionPool(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext), materializer());
SslTls has been renamed to TLS and moved
----------------------------------------
The DSL to access a TLS (or SSL) :class:`BidiFlow` have now split between the ``javadsl`` and ``scaladsl`` packages and
have been renamed to :class:`TLS`. Common option types (closing modes, authentication modes, etc.) have been moved to
the top level ``stream`` package, and the common message types are accessible in the class :class:`akka.stream.TLSProtocol`
Framing moved to akka.stream.[javadsl/scaladsl] Framing moved to akka.stream.[javadsl/scaladsl]
----------------------------------------------- -----------------------------------------------

View file

@ -9,7 +9,7 @@ import akka.http.scaladsl.settings.{ ClientConnectionSettings, ParserSettings }
import language.existentials import language.existentials
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import akka.stream.io.{ SessionBytes, SslTlsInbound, SendBytes } import akka.stream.TLSProtocol._
import akka.util.ByteString import akka.util.ByteString
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream._ import akka.stream._

View file

@ -6,7 +6,7 @@ package akka.http.impl.engine.parsing
import javax.net.ssl.SSLSession import javax.net.ssl.SSLSession
import akka.stream.io.SessionBytes import akka.stream.TLSProtocol._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer

View file

@ -16,7 +16,7 @@ import akka.japi.Function
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.util.ByteString import akka.util.ByteString
import akka.stream._ import akka.stream._
import akka.stream.io._ import akka.stream.TLSProtocol._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._
import akka.http.scaladsl.settings.ServerSettings import akka.http.scaladsl.settings.ServerSettings
@ -150,7 +150,6 @@ private[http] object HttpServerBluePrint {
} }
} }
def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity = def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity =
creator match { creator match {
case StrictEntityCreator(entity) entity case StrictEntityCreator(entity) entity

View file

@ -4,9 +4,9 @@
package akka.http.impl.engine.ws package akka.http.impl.engine.ws
import akka.stream.impl.io.ByteStringParser
import akka.util.ByteString import akka.util.ByteString
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.stream.io.ByteStringParser
import akka.stream.Attributes import akka.stream.Attributes
/** /**

View file

@ -13,8 +13,8 @@ import akka.util.ByteString
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.BidiShape import akka.stream._
import akka.stream.io.{ SessionBytes, SendBytes, SslTlsInbound } import akka.stream.TLSProtocol._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.http.scaladsl.settings.ClientConnectionSettings import akka.http.scaladsl.settings.ClientConnectionSettings

View file

@ -6,7 +6,7 @@ package akka.http.javadsl
import java.util.{ Collection JCollection, Optional } import java.util.{ Collection JCollection, Optional }
import javax.net.ssl.{ SSLContext, SSLParameters } import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.http.scaladsl import akka.http.scaladsl
import akka.stream.io.ClientAuth import akka.stream.TLSClientAuth
import scala.compat.java8.OptionConverters import scala.compat.java8.OptionConverters
@ -18,7 +18,7 @@ object ConnectionContext {
/** Used to serve HTTPS traffic. */ /** Used to serve HTTPS traffic. */
def https(sslContext: SSLContext, enabledCipherSuites: Optional[JCollection[String]], def https(sslContext: SSLContext, enabledCipherSuites: Optional[JCollection[String]],
enabledProtocols: Optional[JCollection[String]], clientAuth: Optional[ClientAuth], sslParameters: Optional[SSLParameters]) = enabledProtocols: Optional[JCollection[String]], clientAuth: Optional[TLSClientAuth], sslParameters: Optional[SSLParameters]) =
scaladsl.ConnectionContext.https(sslContext, sslParameters = OptionConverters.toScala(sslParameters)) scaladsl.ConnectionContext.https(sslContext, sslParameters = OptionConverters.toScala(sslParameters))
//#https-context-creation //#https-context-creation
@ -47,7 +47,7 @@ abstract class HttpsConnectionContext extends akka.http.javadsl.ConnectionContex
/** Java API */ /** Java API */
def getEnabledProtocols: Optional[JCollection[String]] def getEnabledProtocols: Optional[JCollection[String]]
/** Java API */ /** Java API */
def getClientAuth: Optional[ClientAuth] def getClientAuth: Optional[TLSClientAuth]
/** Java API */ /** Java API */
def getSslContext: SSLContext def getSslContext: SSLContext

View file

@ -11,7 +11,7 @@ import akka.http.impl.util.JavaMapping.HttpsConnectionContext
import akka.http.javadsl.model.ws._ import akka.http.javadsl.model.ws._
import akka.http.javadsl.settings.{ ConnectionPoolSettings, ClientConnectionSettings, ServerSettings } import akka.http.javadsl.settings.{ ConnectionPoolSettings, ClientConnectionSettings, ServerSettings }
import akka.{ NotUsed, stream } import akka.{ NotUsed, stream }
import akka.stream.io.{ SslTlsInbound, SslTlsOutbound } import akka.stream.TLSProtocol._
import scala.language.implicitConversions import scala.language.implicitConversions
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Try import scala.util.Try

View file

@ -4,7 +4,8 @@
package akka.http.scaladsl package akka.http.scaladsl
import akka.stream.io.{ ClientAuth, NegotiateNewSession } import akka.stream.TLSClientAuth
import akka.stream.TLSProtocol._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import java.util.{ Optional, Collection JCollection } import java.util.{ Optional, Collection JCollection }
@ -22,7 +23,7 @@ object ConnectionContext {
def https(sslContext: SSLContext, def https(sslContext: SSLContext,
enabledCipherSuites: Option[immutable.Seq[String]] = None, enabledCipherSuites: Option[immutable.Seq[String]] = None,
enabledProtocols: Option[immutable.Seq[String]] = None, enabledProtocols: Option[immutable.Seq[String]] = None,
clientAuth: Option[ClientAuth] = None, clientAuth: Option[TLSClientAuth] = None,
sslParameters: Option[SSLParameters] = None) = { sslParameters: Option[SSLParameters] = None) = {
new HttpsConnectionContext(sslContext, enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) new HttpsConnectionContext(sslContext, enabledCipherSuites, enabledProtocols, clientAuth, sslParameters)
} }
@ -35,7 +36,7 @@ final class HttpsConnectionContext(
val sslContext: SSLContext, val sslContext: SSLContext,
val enabledCipherSuites: Option[immutable.Seq[String]] = None, val enabledCipherSuites: Option[immutable.Seq[String]] = None,
val enabledProtocols: Option[immutable.Seq[String]] = None, val enabledProtocols: Option[immutable.Seq[String]] = None,
val clientAuth: Option[ClientAuth] = None, val clientAuth: Option[TLSClientAuth] = None,
val sslParameters: Option[SSLParameters] = None) val sslParameters: Option[SSLParameters] = None)
extends akka.http.javadsl.HttpsConnectionContext with ConnectionContext { extends akka.http.javadsl.HttpsConnectionContext with ConnectionContext {
@ -44,7 +45,7 @@ final class HttpsConnectionContext(
override def getSslContext = sslContext override def getSslContext = sslContext
override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava
override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava
override def getClientAuth: Optional[ClientAuth] = clientAuth.asJava override def getClientAuth: Optional[TLSClientAuth] = clientAuth.asJava
override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava
} }

View file

@ -22,8 +22,8 @@ import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgrade
import akka.http.scaladsl.settings.{ ServerSettings, ClientConnectionSettings, ConnectionPoolSettings } import akka.http.scaladsl.settings.{ ServerSettings, ClientConnectionSettings, ConnectionPoolSettings }
import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.stream.Materializer import akka.stream._
import akka.stream.io._ import akka.stream.TLSProtocol._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.sslconfig.akka._ import com.typesafe.sslconfig.akka._
@ -608,10 +608,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
} }
/** Creates real or placebo SslTls stage based on if ConnectionContext is HTTPS or not. */ /** Creates real or placebo SslTls stage based on if ConnectionContext is HTTPS or not. */
private[http] def sslTlsStage(connectionContext: ConnectionContext, role: Role, hostInfo: Option[(String, Int)] = None) = private[http] def sslTlsStage(connectionContext: ConnectionContext, role: TLSRole, hostInfo: Option[(String, Int)] = None) =
connectionContext match { connectionContext match {
case hctx: HttpsConnectionContext SslTls(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) case hctx: HttpsConnectionContext TLS(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo)
case other SslTlsPlacebo.forScala // if it's not HTTPS, we don't enable SSL/TLS case other TLSPlacebo() // if it's not HTTPS, we don't enable SSL/TLS
} }
} }
@ -746,7 +746,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* TLS configuration for an HTTPS server binding or client connection. * TLS configuration for an HTTPS server binding or client connection.
* For the sslContext please refer to the com.typeasfe.ssl-config library. * For the sslContext please refer to the com.typeasfe.ssl-config library.
* The remaining four parameters configure the initial session that will * The remaining four parameters configure the initial session that will
* be negotiated, see [[akka.stream.io.NegotiateNewSession]] for details. * be negotiated, see [[NegotiateNewSession]] for details.
*/ */
trait DefaultSSLContextCreation { trait DefaultSSLContextCreation {
@ -786,9 +786,9 @@ trait DefaultSSLContextCreation {
import com.typesafe.sslconfig.ssl.{ ClientAuth SslClientAuth } import com.typesafe.sslconfig.ssl.{ ClientAuth SslClientAuth }
val clientAuth = config.sslParametersConfig.clientAuth match { val clientAuth = config.sslParametersConfig.clientAuth match {
case SslClientAuth.Default None case SslClientAuth.Default None
case SslClientAuth.Want Some(ClientAuth.Want) case SslClientAuth.Want Some(TLSClientAuth.Want)
case SslClientAuth.Need Some(ClientAuth.Need) case SslClientAuth.Need Some(TLSClientAuth.Need)
case SslClientAuth.None Some(ClientAuth.None) case SslClientAuth.None Some(TLSClientAuth.None)
} }
// hostname! // hostname!
defaultParams.setEndpointIdentificationAlgorithm("https") defaultParams.setEndpointIdentificationAlgorithm("https")

View file

@ -9,12 +9,13 @@ import java.net.InetSocketAddress
import java.security.MessageDigest import java.security.MessageDigest
import java.util import java.util
import javax.net.ssl.SSLSession import javax.net.ssl.SSLSession
import akka.stream.scaladsl.ScalaSessionAPI
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import akka.parboiled2.util.Base64 import akka.parboiled2.util.Base64
import akka.stream.io.ScalaSessionAPI
import akka.http.impl.util._ import akka.http.impl.util._
import akka.http.javadsl.{ model jm } import akka.http.javadsl.{ model jm }
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._

View file

@ -18,8 +18,8 @@ import akka.util.ByteString
import akka.http.scaladsl.{ TestUtils, Http } import akka.http.scaladsl.{ TestUtils, Http }
import akka.http.impl.util.{ SingletonException, StreamUtils } import akka.http.impl.util.{ SingletonException, StreamUtils }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.stream.io.{ SessionBytes, SendBytes, SslTlsOutbound }
import akka.stream.{ BidiShape, ActorMaterializer } import akka.stream.{ BidiShape, ActorMaterializer }
import akka.stream.TLSProtocol._
import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers._

View file

@ -9,10 +9,10 @@ import scala.reflect.ClassTag
import org.scalatest.Inside import org.scalatest.Inside
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import akka.http.scaladsl.settings.ClientConnectionSettings import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes }
import akka.util.ByteString import akka.util.ByteString
import akka.event.NoLogging import akka.event.NoLogging
import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.{ClosedShape, ActorMaterializer}
import akka.stream.TLSProtocol._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.http.scaladsl.model.HttpEntity._ import akka.http.scaladsl.model.HttpEntity._

View file

@ -7,8 +7,7 @@ package akka.http.impl.engine.client
import akka.NotUsed import akka.NotUsed
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import akka.stream.ActorMaterializer import akka.stream.{ Server, Client, ActorMaterializer }
import akka.stream.io._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.http.impl.util._ import akka.http.impl.util._

View file

@ -18,7 +18,7 @@ import akka.actor.ActorSystem
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.io.{ SslTlsPlacebo, SessionBytes } import akka.stream.TLSProtocol._
import org.scalatest.matchers.Matcher import org.scalatest.matchers.Matcher
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
@ -482,7 +482,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] = def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] =
Source(input.toList) Source(input.toList)
.map(bytes SessionBytes(SslTlsPlacebo.dummySession, ByteString(bytes))) .map(bytes SessionBytes(TLSPlacebo.dummySession, ByteString(bytes)))
.transform(() parser.stage).named("parser") .transform(() parser.stage).named("parser")
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.prefixAndTail(1) .prefixAndTail(1)

View file

@ -7,7 +7,7 @@ package akka.http.impl.engine.parsing
import akka.NotUsed import akka.NotUsed
import akka.http.scaladsl.settings.ParserSettings import akka.http.scaladsl.settings.ParserSettings
import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture
import akka.stream.io.{ SslTlsPlacebo, SessionBytes } import akka.stream.TLSProtocol._
import com.typesafe.config.{ ConfigFactory, Config } import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.{ Future, Await } import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -292,7 +292,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def rawParse(requestMethod: HttpMethod, input: String*): Source[Either[ResponseOutput, HttpResponse], NotUsed] = def rawParse(requestMethod: HttpMethod, input: String*): Source[Either[ResponseOutput, HttpResponse], NotUsed] =
Source(input.toList) Source(input.toList)
.map(bytes SessionBytes(SslTlsPlacebo.dummySession, ByteString(bytes))) .map(bytes SessionBytes(TLSPlacebo.dummySession, ByteString(bytes)))
.transform(() newParserStage(requestMethod)).named("parser") .transform(() newParserStage(requestMethod)).named("parser")
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.prefixAndTail(1) .prefixAndTail(1)

View file

@ -7,18 +7,17 @@ package akka.http.impl.engine.server
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.http.impl.engine.ws.ByteStringSinkProbe import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.http.scaladsl.settings.ServerSettings import akka.http.scaladsl.settings.ServerSettings
import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes } import akka.stream.TLSProtocol._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.NoLogging import akka.event.NoLogging
import akka.util.ByteString import akka.util.ByteString
import akka.stream.{ ClosedShape, Materializer } import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.http.impl.util._ import akka.http.impl.util._
import akka.http.scaladsl.model.headers.{ ProductVersion, Server } import akka.http.scaladsl.model.headers.{ ProductVersion, Server }
import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } import akka.http.scaladsl.model.{ HttpResponse, HttpRequest }
import akka.stream.OverflowStrategy
abstract class HttpServerTestSetupBase { abstract class HttpServerTestSetupBase {
implicit def system: ActorSystem implicit def system: ActorSystem

View file

@ -12,7 +12,6 @@ import spray.json._
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.io.SslTlsPlacebo
import akka.stream.stage.{ TerminationDirective, Context, SyncDirective, PushStage } import akka.stream.stage.{ TerminationDirective, Context, SyncDirective, PushStage }
import akka.stream.scaladsl._ import akka.stream.scaladsl._

View file

@ -9,6 +9,7 @@ import java.util.Random
import akka.NotUsed import akka.NotUsed
import akka.http.scaladsl.model.ws.{ InvalidUpgradeResponse, WebSocketUpgradeResponse } import akka.http.scaladsl.model.ws.{ InvalidUpgradeResponse, WebSocketUpgradeResponse }
import akka.stream.ClosedShape import akka.stream.ClosedShape
import akka.stream.TLSProtocol._
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -17,7 +18,6 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.{ ProductVersion, `User-Agent` } import akka.http.scaladsl.model.headers.{ ProductVersion, `User-Agent` }
import akka.http.scaladsl.model.ws._ import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri
import akka.stream.io._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.{ TestSubscriber, TestPublisher } import akka.stream.testkit.{ TestSubscriber, TestPublisher }
import akka.util.ByteString import akka.util.ByteString

View file

@ -17,7 +17,6 @@ import akka.stream.scaladsl._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.scaladsl.GraphDSL.Implicits._ import akka.stream.scaladsl.GraphDSL.Implicits._
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import akka.stream.io.SslTlsPlacebo
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages
import akka.util.ByteString import akka.util.ByteString
@ -78,7 +77,7 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
Source.empty Source.empty
.viaMat { .viaMat {
Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort))
.atop(SslTlsPlacebo.forScala) .atop(TLSPlacebo())
.joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via( .joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via(
Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both) Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both)
}(Keep.right) }(Keep.right)
@ -158,7 +157,7 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
Source.maybe Source.maybe
.viaMat { .viaMat {
Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort))
.atop(SslTlsPlacebo.forScala) .atop(TLSPlacebo())
// the resource leak of #19398 existed only for severed websocket connections // the resource leak of #19398 existed only for severed websocket connections
.atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right) .atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right)
.join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)) .join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true))

View file

@ -10,7 +10,7 @@ import javax.net.ssl.{ SSLParameters, SSLContext }
import akka.http.javadsl.model.headers.Cookie import akka.http.javadsl.model.headers.Cookie
import akka.http.scaladsl.model import akka.http.scaladsl.model
import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.stream.io.ClientAuth import akka.stream.TLSClientAuth
import org.scalatest.{ FreeSpec, MustMatchers } import org.scalatest.{ FreeSpec, MustMatchers }
import scala.collection.immutable import scala.collection.immutable
@ -64,7 +64,7 @@ class JavaApiTestCaseSpecs extends FreeSpec with MustMatchers {
akka.http.javadsl.ConnectionContext.https(SSLContext.getDefault, akka.http.javadsl.ConnectionContext.https(SSLContext.getDefault,
Optional.empty[java.util.Collection[String]], Optional.empty[java.util.Collection[String]],
Optional.empty[java.util.Collection[String]], Optional.empty[java.util.Collection[String]],
Optional.empty[ClientAuth], Optional.empty[TLSClientAuth],
Optional.empty[SSLParameters]) mustNot be(null) Optional.empty[SSLParameters]) mustNot be(null)
} }
} }

View file

@ -6,8 +6,8 @@ package akka.http.scaladsl.coding
import java.util.zip.{ Inflater, Deflater } import java.util.zip.{ Inflater, Deflater }
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.io.ByteStringParser import akka.stream.impl.io.ByteStringParser
import akka.stream.io.ByteStringParser.{ ParseResult, ParseStep } import ByteStringParser.{ ParseResult, ParseStep }
import akka.util.{ ByteStringBuilder, ByteString } import akka.util.{ ByteStringBuilder, ByteString }
import scala.annotation.tailrec import scala.annotation.tailrec

View file

@ -10,8 +10,8 @@ import akka.http.impl.engine.ws.{ ProtocolException, FrameEvent }
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.HttpEncodings import akka.http.scaladsl.model.headers.HttpEncodings
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.io.ByteStringParser import akka.stream.impl.io.ByteStringParser
import akka.stream.io.ByteStringParser.{ ParseResult, ParseStep } import ByteStringParser.{ ParseResult, ParseStep }
import akka.util.ByteString import akka.util.ByteString
class Gzip(val messageFilter: HttpMessage Boolean) extends Coder with StreamDecoder { class Gzip(val messageFilter: HttpMessage Boolean) extends Coder with StreamDecoder {

View file

@ -15,6 +15,7 @@ import scala.util.Random
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.pattern.{ after later } import akka.pattern.{ after later }
import akka.stream._ import akka.stream._
import akka.stream.TLSProtocol._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.testkit._ import akka.stream.testkit._
@ -102,9 +103,9 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
} }
val cipherSuites = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") val cipherSuites = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA")
def clientTls(closing: Closing) = SslTls(sslContext, cipherSuites, Client, closing) def clientTls(closing: TLSClosing) = TLS(sslContext, cipherSuites, Client, closing)
def badClientTls(closing: Closing) = SslTls(initWithTrust("/badtruststore"), cipherSuites, Client, closing) def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), cipherSuites, Client, closing)
def serverTls(closing: Closing) = SslTls(sslContext, cipherSuites, Server, closing) def serverTls(closing: TLSClosing) = TLS(sslContext, cipherSuites, Server, closing)
trait Named { trait Named {
def name: String = def name: String =
@ -116,19 +117,19 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
} }
trait CommunicationSetup extends Named { trait CommunicationSetup extends Named {
def decorateFlow(leftClosing: Closing, rightClosing: Closing, def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed] rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed]
def cleanup(): Unit = () def cleanup(): Unit = ()
} }
object ClientInitiates extends CommunicationSetup { object ClientInitiates extends CommunicationSetup {
def decorateFlow(leftClosing: Closing, rightClosing: Closing, def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) =
clientTls(leftClosing) atop serverTls(rightClosing).reversed join rhs clientTls(leftClosing) atop serverTls(rightClosing).reversed join rhs
} }
object ServerInitiates extends CommunicationSetup { object ServerInitiates extends CommunicationSetup {
def decorateFlow(leftClosing: Closing, rightClosing: Closing, def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) =
serverTls(leftClosing) atop clientTls(rightClosing).reversed join rhs serverTls(leftClosing) atop clientTls(rightClosing).reversed join rhs
} }
@ -143,7 +144,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
object ClientInitiatesViaTcp extends CommunicationSetup { object ClientInitiatesViaTcp extends CommunicationSetup {
var binding: Tcp.ServerBinding = null var binding: Tcp.ServerBinding = null
def decorateFlow(leftClosing: Closing, rightClosing: Closing, def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = {
binding = server(serverTls(rightClosing).reversed join rhs) binding = server(serverTls(rightClosing).reversed join rhs)
clientTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress) clientTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress)
@ -153,7 +154,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
object ServerInitiatesViaTcp extends CommunicationSetup { object ServerInitiatesViaTcp extends CommunicationSetup {
var binding: Tcp.ServerBinding = null var binding: Tcp.ServerBinding = null
def decorateFlow(leftClosing: Closing, rightClosing: Closing, def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = {
binding = server(clientTls(rightClosing).reversed join rhs) binding = server(clientTls(rightClosing).reversed join rhs)
serverTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress) serverTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress)
@ -189,8 +190,8 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
case SessionBytes(s, b) SendBytes(b) case SessionBytes(s, b) SendBytes(b)
} }
} }
def leftClosing: Closing = IgnoreComplete def leftClosing: TLSClosing = IgnoreComplete
def rightClosing: Closing = IgnoreComplete def rightClosing: TLSClosing = IgnoreComplete
def inputs: immutable.Seq[SslTlsOutbound] def inputs: immutable.Seq[SslTlsOutbound]
def output: ByteString def output: ByteString
@ -441,7 +442,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
"pass through data" in { "pass through data" in {
val f = Source(1 to 3) val f = Source(1 to 3)
.map(b SendBytes(ByteString(b.toByte))) .map(b SendBytes(ByteString(b.toByte)))
.via(SslTlsPlacebo.forScala join Flow.apply) .via(TLSPlacebo() join Flow.apply)
.grouped(10) .grouped(10)
.runWith(Sink.head) .runWith(Sink.head)
val result = Await.result(f, 3.seconds) val result = Await.result(f, 3.seconds)

View file

@ -0,0 +1,255 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import javax.net.ssl._
import akka.util.ByteString
import scala.annotation.varargs
import scala.collection.immutable
/**
* Many protocols are asymmetric and distinguish between the client and the
* server, where the latter listens passively for messages and the former
* actively initiates the exchange.
*/
object TLSRole {
/**
* Java API: obtain the [[Client]] singleton value.
*/
def client: TLSRole = Client
/**
* Java API: obtain the [[Server]] singleton value.
*/
def server: TLSRole = Server
}
sealed abstract class TLSRole
/**
* The client is usually the side that consumes the service provided by its
* interlocutor. The precise interpretation of this role is protocol specific.
*/
sealed abstract class Client extends TLSRole
case object Client extends Client
/**
* The server is usually the side the provides the service to its interlocutor.
* The precise interpretation of this role is protocol specific.
*/
sealed abstract class Server extends TLSRole
case object Server extends Server
/**
* All streams in Akka are unidirectional: while in a complex flow graph data
* may flow in multiple directions these individual flows are independent from
* each other. The difference between two half-duplex connections in opposite
* directions and a full-duplex connection is that the underlying transport
* is shared in the latter and tearing it down will end the data transfer in
* both directions.
*
* When integrating a full-duplex transport medium that does not support
* half-closing (which means ending one direction of data transfer without
* ending the other) into a stream topology, there can be unexpected effects.
* Feeding a finite Source into this medium will close the connection after
* all elements have been sent, which means that possible replies may not
* be received in full. To support this type of usage, the sending and
* receiving of data on the same side (e.g. on the [[Client]]) need to be
* coordinated such that it is known when all replies have been received.
* Only then should the transport be shut down.
*
* To support these scenarios it is recommended that the full-duplex
* transport integration is configurable in terms of termination handling,
* which means that the user can optionally suppress the normal (closing)
* reaction to completion or cancellation events, as is expressed by the
* possible values of this type:
*
* - [[EagerClose]] means to not ignore signals
* - [[IgnoreCancel]] means to not react to cancellation of the receiving
* side unless the sending side has already completed
* - [[IgnoreComplete]] means to not react to the completion of the sending
* side unless the receiving side has already canceled
* - [[IgnoreBoth]] means to ignore the first termination signalbe that
* cancellation or completionand only act upon the second one
*/
sealed abstract class TLSClosing {
def ignoreCancel: Boolean
def ignoreComplete: Boolean
}
object TLSClosing {
/**
* Java API: obtain the [[EagerClose]] singleton value.
*/
def eagerClose: TLSClosing = EagerClose
/**
* Java API: obtain the [[IgnoreCancel]] singleton value.
*/
def ignoreCancel: TLSClosing = IgnoreCancel
/**
* Java API: obtain the [[IgnoreComplete]] singleton value.
*/
def ignoreComplete: TLSClosing = IgnoreComplete
/**
* Java API: obtain the [[IgnoreBoth]] singleton value.
*/
def ignoreBoth: TLSClosing = IgnoreBoth
}
/**
* see [[TLSClosing]]
*/
sealed abstract class EagerClose extends TLSClosing {
override def ignoreCancel = false
override def ignoreComplete = false
}
case object EagerClose extends EagerClose
/**
* see [[TLSClosing]]
*/
sealed abstract class IgnoreCancel extends TLSClosing {
override def ignoreCancel = true
override def ignoreComplete = false
}
case object IgnoreCancel extends IgnoreCancel
/**
* see [[TLSClosing]]
*/
sealed abstract class IgnoreComplete extends TLSClosing {
override def ignoreCancel = false
override def ignoreComplete = true
}
case object IgnoreComplete extends IgnoreComplete
/**
* see [[TLSClosing]]
*/
sealed abstract class IgnoreBoth extends TLSClosing {
override def ignoreCancel = true
override def ignoreComplete = true
}
case object IgnoreBoth extends IgnoreBoth
object TLSProtocol {
/**
* This is the supertype of all messages that the SslTls stage emits on the
* plaintext side.
*/
sealed trait SslTlsInbound
/**
* If the underlying transport is closed before the final TLS closure command
* is received from the peer then the SSLEngine will throw an SSLException that
* warns about possible truncation attacks. This exception is caught and
* translated into this message when encountered. Most of the time this occurs
* not because of a malicious attacker but due to a connection abort or a
* misbehaving communication peer.
*/
sealed abstract class SessionTruncated extends SslTlsInbound
case object SessionTruncated extends SessionTruncated
/**
* Plaintext bytes emitted by the SSLEngine are received over one specific
* encryption session and this class bundles the bytes with the SSLSession
* object. When the session changes due to renegotiation (which can be
* initiated by either party) the new session value will not compare equal to
* the previous one.
*
* The Java API for getting session information is given by the SSLSession object,
* the Scala API adapters are offered below.
*/
final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound with scaladsl.ScalaSessionAPI
/**
* This is the supertype of all messages that the SslTls stage accepts on its
* plaintext side.
*/
sealed trait SslTlsOutbound
/**
* Initiate a new session negotiation. Any [[SendBytes]] commands following
* this one will be held back (i.e. back-pressured) until the new handshake is
* completed, meaning that the bytes following this message will be encrypted
* according to the requirements outlined here.
*
* Each of the values in this message is optional and will have the following
* effect if provided:
*
* - `enabledCipherSuites` will be passed to `SSLEngine::setEnabledCipherSuites()`
* - `enabledProtocols` will be passed to `SSLEngine::setEnabledProtocols()`
* - `clientAuth` will be passed to `SSLEngine::setWantClientAuth()` or `SSLEngine.setNeedClientAuth()`, respectively
* - `sslParameters` will be passed to `SSLEngine::setSSLParameters()`
*
* Please note that passing `clientAuth = None` means that no change is done
* on client authentication requirements while `clientAuth = Some(ClientAuth.None)`
* switches off client authentication.
*/
case class NegotiateNewSession(
enabledCipherSuites: Option[immutable.Seq[String]],
enabledProtocols: Option[immutable.Seq[String]],
clientAuth: Option[TLSClientAuth],
sslParameters: Option[SSLParameters]) extends SslTlsOutbound {
/**
* Java API: Make a copy of this message with the given `enabledCipherSuites`.
*/
@varargs
def withCipherSuites(s: String*) = copy(enabledCipherSuites = Some(s.toList))
/**
* Java API: Make a copy of this message with the given `enabledProtocols`.
*/
@varargs
def withProtocols(p: String*) = copy(enabledProtocols = Some(p.toList))
/**
* Java API: Make a copy of this message with the given [[TLSClientAuth]] setting.
*/
def withClientAuth(ca: TLSClientAuth) = copy(clientAuth = Some(ca))
/**
* Java API: Make a copy of this message with the given [[SSLParameters]].
*/
def withParameters(p: SSLParameters) = copy(sslParameters = Some(p))
}
object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) {
/**
* Java API: obtain the default value (which will leave the SSLEngines
* settings unchanged).
*/
def withDefaults: NegotiateNewSession = this
}
/**
* Send the given [[akka.util.ByteString]] across the encrypted session to the
* peer.
*/
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound
}
/**
* An SSLEngine can either demand, allow or ignore its peers authentication
* (via certificates), where `Need` will fail the handshake if the peer does
* not provide valid credentials, `Want` allows the peer to send credentials
* and verifies them if provided, and `None` disables peer certificate
* verification.
*
* See the documentation for `SSLEngine::setWantClientAuth` for more information.
*/
sealed abstract class TLSClientAuth
object TLSClientAuth {
case object None extends TLSClientAuth
case object Want extends TLSClientAuth
case object Need extends TLSClientAuth
def none: TLSClientAuth = None
def want: TLSClientAuth = Want
def need: TLSClientAuth = Need
}

View file

@ -13,8 +13,8 @@ import akka.pattern.ask
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule }
import akka.stream.impl.io.SslTlsCipherActor import akka.stream.impl.io.TLSActor
import akka.stream.io.SslTls.TlsModule import akka.stream.impl.io.TlsModule
import org.reactivestreams._ import org.reactivestreams._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, ExecutionContextExecutor } import scala.concurrent.{ Await, ExecutionContextExecutor }
@ -121,7 +121,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
case tls: TlsModule // TODO solve this so TlsModule doesn't need special treatment here case tls: TlsModule // TODO solve this so TlsModule doesn't need special treatment here
val es = effectiveSettings(effectiveAttributes) val es = effectiveSettings(effectiveAttributes)
val props = val props =
SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tls.role, tls.closing, tls.hostInfo) TLSActor.props(es, tls.sslContext, tls.firstSession, tls.role, tls.closing, tls.hostInfo)
val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
def factory(id: Int) = new ActorPublisher[Any](impl) { def factory(id: Int) = new ActorPublisher[Any](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
@ -129,11 +129,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
val publishers = Vector.tabulate(2)(factory) val publishers = Vector.tabulate(2)(factory)
impl ! FanOut.ExposedPublishers(publishers) impl ! FanOut.ExposedPublishers(publishers)
assignPort(tls.plainOut, publishers(SslTlsCipherActor.UserOut)) assignPort(tls.plainOut, publishers(TLSActor.UserOut))
assignPort(tls.cipherOut, publishers(SslTlsCipherActor.TransportOut)) assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
assignPort(tls.plainIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn)) assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn)) assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn))
matVal.put(atomic, NotUsed) matVal.put(atomic, NotUsed)

View file

@ -1,15 +1,19 @@
/** /**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream.io package akka.stream.impl.io
import scala.util.control.NoStackTrace
import akka.stream._ import akka.stream._
import akka.stream.stage._ import akka.stream.stage._
import akka.util.ByteString import akka.util.ByteString
import scala.annotation.tailrec
abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] { import scala.annotation.tailrec
import scala.util.control.NoStackTrace
/**
* INTERNAL API
*/
private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] {
import ByteStringParser._ import ByteStringParser._
private val bytesIn = Inlet[ByteString]("bytesIn") private val bytesIn = Inlet[ByteString]("bytesIn")
@ -67,7 +71,10 @@ abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]]
} }
} }
object ByteStringParser { /**
* INTERNAL API
*/
private[akka] object ByteStringParser {
/** /**
* @param result - parser can return some element for downstream or return None if no element was generated * @param result - parser can return some element for downstream or return None if no element was generated

View file

@ -9,28 +9,28 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus._
import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl.SSLEngineResult.Status._
import javax.net.ssl._ import javax.net.ssl._
import akka.actor._ import akka.actor._
import akka.stream.{ ConnectionException, ActorMaterializerSettings } import akka.stream._
import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanIn.InputBunch
import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl.FanOut.OutputBunch
import akka.stream.impl._ import akka.stream.impl._
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.sslconfig.akka.AkkaSSLConfig import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.stream.io._ import akka.stream.TLSProtocol._
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
private[akka] object SslTlsCipherActor { private[akka] object TLSActor {
def props(settings: ActorMaterializerSettings, def props(settings: ActorMaterializerSettings,
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
role: Role, role: TLSRole,
closing: Closing, closing: TLSClosing,
hostInfo: Option[(String, Int)], hostInfo: Option[(String, Int)],
tracing: Boolean = false): Props = tracing: Boolean = false): Props =
Props(new SslTlsCipherActor(settings, sslContext, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local) Props(new TLSActor(settings, sslContext, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local)
final val TransportIn = 0 final val TransportIn = 0
final val TransportOut = 0 final val TransportOut = 0
@ -42,13 +42,13 @@ private[akka] object SslTlsCipherActor {
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, private[akka] class TLSActor(settings: ActorMaterializerSettings,
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, role: Role, closing: Closing, firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing,
hostInfo: Option[(String, Int)], tracing: Boolean) hostInfo: Option[(String, Int)], tracing: Boolean)
extends Actor with ActorLogging with Pump { extends Actor with ActorLogging with Pump {
import SslTlsCipherActor._ import TLSActor._
protected val outputBunch = new OutputBunch(outputCount = 2, self, this) protected val outputBunch = new OutputBunch(outputCount = 2, self, this)
outputBunch.markAllOutputs() outputBunch.markAllOutputs()
@ -161,9 +161,9 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings,
params.enabledCipherSuites foreach (cs engine.setEnabledCipherSuites(cs.toArray)) params.enabledCipherSuites foreach (cs engine.setEnabledCipherSuites(cs.toArray))
params.enabledProtocols foreach (p engine.setEnabledProtocols(p.toArray)) params.enabledProtocols foreach (p engine.setEnabledProtocols(p.toArray))
params.clientAuth match { params.clientAuth match {
case Some(ClientAuth.None) engine.setNeedClientAuth(false) case Some(TLSClientAuth.None) engine.setNeedClientAuth(false)
case Some(ClientAuth.Want) engine.setWantClientAuth(true) case Some(TLSClientAuth.Want) engine.setWantClientAuth(true)
case Some(ClientAuth.Need) engine.setNeedClientAuth(true) case Some(TLSClientAuth.Need) engine.setNeedClientAuth(true)
case _ // do nothing case _ // do nothing
} }
params.sslParameters foreach (p engine.setSSLParameters(p)) params.sslParameters foreach (p engine.setSSLParameters(p))

View file

@ -0,0 +1,45 @@
package akka.stream.impl.io
import javax.net.ssl.SSLContext
import akka.stream._
import akka.stream.impl.StreamLayout.{ CompositeModule, Module }
import akka.stream.TLSProtocol._
import akka.util.ByteString
/**
* INTERNAL API.
*/
private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound],
cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString],
shape: Shape, attributes: Attributes,
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends Module {
override def subModules: Set[Module] = Set.empty
override def withAttributes(att: Attributes): Module = copy(attributes = att)
override def carbonCopy: Module =
TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo)
override def replaceShape(s: Shape) =
if (s != shape) {
shape.requireSamePortsAs(s)
CompositeModule(this, s)
} else this
}
/**
* INTERNAL API.
*/
private[akka] object TlsModule {
def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]): TlsModule = {
val name = attributes.nameOrDefault(s"StreamTls($role)")
val cipherIn = Inlet[ByteString](s"$name.cipherIn")
val cipherOut = Outlet[ByteString](s"$name.cipherOut")
val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn")
val plainOut = Outlet[SslTlsInbound](s"$name.transportOut")
val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut)
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing, hostInfo)
}
}

View file

@ -1,453 +0,0 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.lang.{ Integer jInteger }
import java.security.Principal
import java.util.Optional
import akka.{ NotUsed, japi }
import akka.stream._
import akka.stream.impl.StreamLayout.{ Module, CompositeModule }
import akka.util.ByteString
import javax.net.ssl._
import scala.annotation.varargs
import scala.collection.immutable
import scala.compat.java8.OptionConverters
/**
* Stream cipher support based upon JSSE.
*
* The underlying SSLEngine has four ports: plaintext input/output and
* ciphertext input/output. These are modeled as a [[akka.stream.BidiShape]]
* element for use in stream topologies, where the plaintext ports are on the
* left hand side of the shape and the ciphertext ports on the right hand side.
*
* Configuring JSSE is a rather complex topic, please refer to the JDK platform
* documentation or the excellent user guide that is part of the Play Framework
* documentation. The philosophy of this integration into Akka Streams is to
* expose all knobs and dials to client code and therefore not limit the
* configuration possibilities. In particular the client code will have to
* provide the SSLContext from which the SSLEngine is then created. Handshake
* parameters are set using [[NegotiateNewSession]] messages, the settings for
* the initial handshake need to be provided up front using the same class;
* please refer to the method documentation below.
*
* '''IMPORTANT NOTE'''
*
* The TLS specification does not permit half-closing of the user data session
* that it transportsto be precise a half-close will always promptly lead to a
* full close. This means that canceling the plaintext output or completing the
* plaintext input of the SslTls stage will lead to full termination of the
* secure connection without regard to whether bytes are remaining to be sent or
* received, respectively. Especially for a client the common idiom of attaching
* a finite Source to the plaintext input and transforming the plaintext response
* bytes coming out will not work out of the box due to early termination of the
* connection. For this reason there is a parameter that determines whether the
* SslTls stage shall ignore completion and/or cancellation events, and the
* default is to ignore completion (in view of the clientserver scenario). In
* order to terminate the connection the client will then need to cancel the
* plaintext output as soon as all expected bytes have been received. When
* ignoring both types of events the stage will shut down once both events have
* been received. See also [[Closing]].
*/
object SslTls {
type ScalaFlow = scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
type JavaFlow = javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
/**
* Scala API: create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[Closing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[SSLParameters.setEndpointIdentificationAlgorithm]].
*/
def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role,
closing: Closing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): ScalaFlow =
new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, firstSession, role, closing, hostInfo))
/**
* Java API: create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* This method uses the default closing behavior or [[IgnoreComplete]].
*/
def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role): JavaFlow =
new javadsl.BidiFlow(apply(sslContext, firstSession, role))
/**
* Java API: create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[Closing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[SSLParameters.setEndpointIdentificationAlgorithm]].
*/
def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, hostInfo: Optional[japi.Pair[String, jInteger]], closing: Closing): JavaFlow =
new javadsl.BidiFlow(apply(sslContext, firstSession, role, closing, OptionConverters.toScala(hostInfo).map(e (e.first, e.second))))
/**
* INTERNAL API.
*/
private[akka] case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound],
cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString],
shape: Shape, attributes: Attributes,
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: Role, closing: Closing, hostInfo: Option[(String, Int)]) extends Module {
override def subModules: Set[Module] = Set.empty
override def withAttributes(att: Attributes): Module = copy(attributes = att)
override def carbonCopy: Module =
TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo)
override def replaceShape(s: Shape) =
if (s != shape) {
shape.requireSamePortsAs(s)
CompositeModule(this, s)
} else this
}
/**
* INTERNAL API.
*/
private[akka] object TlsModule {
def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing, hostInfo: Option[(String, Int)]): TlsModule = {
val name = attributes.nameOrDefault(s"StreamTls($role)")
val cipherIn = Inlet[ByteString](s"$name.cipherIn")
val cipherOut = Outlet[ByteString](s"$name.cipherOut")
val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn")
val plainOut = Outlet[SslTlsInbound](s"$name.transportOut")
val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut)
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing, hostInfo)
}
}
}
/**
* This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can
* be used instead of [[SslTls]] when no encryption is desired. The flows will
* just adapt the message protocol by wrapping into [[SessionBytes]] and
* unwrapping [[SendBytes]].
*/
object SslTlsPlacebo {
// this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL
private[akka] val dummySession = SSLContext.getDefault.createSSLEngine.getSession
val forScala: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] =
scaladsl.BidiFlow.fromGraph(scaladsl.GraphDSL.create() { implicit b
val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) bytes })
val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(dummySession, _)))
BidiShape.fromFlows(top, bottom)
})
val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] =
new javadsl.BidiFlow(forScala)
}
import java.security.Principal
import java.security.cert.Certificate
import javax.net.ssl.{ SSLPeerUnverifiedException, SSLSession }
/** Allows access to an SSLSession with Scala types */
trait ScalaSessionAPI {
def session: SSLSession
/**
* Scala API: Extract the certificates that were actually used by this
* engine during this sessions negotiation. The list is empty if no
* certificates were used.
*/
def localCertificates: List[Certificate] = Option(session.getLocalCertificates).map(_.toList).getOrElse(Nil)
/**
* Scala API: Extract the Principal that was actually used by this engine
* during this sessions negotiation.
*/
def localPrincipal: Option[Principal] = Option(session.getLocalPrincipal)
/**
* Scala API: Extract the certificates that were used by the peer engine
* during this sessions negotiation. The list is empty if no certificates
* were used.
*/
def peerCertificates: List[Certificate] =
try Option(session.getPeerCertificates).map(_.toList).getOrElse(Nil)
catch { case e: SSLPeerUnverifiedException Nil }
/**
* Scala API: Extract the Principal that the peer engine presented during
* this sessions negotiation.
*/
def peerPrincipal: Option[Principal] =
try Option(session.getPeerPrincipal)
catch { case e: SSLPeerUnverifiedException None }
}
object ScalaSessionAPI {
/** Constructs a ScalaSessionAPI instance from an SSLSession */
def apply(_session: SSLSession): ScalaSessionAPI =
new ScalaSessionAPI {
def session: SSLSession = _session
}
}
/**
* Many protocols are asymmetric and distinguish between the client and the
* server, where the latter listens passively for messages and the former
* actively initiates the exchange.
*/
object Role {
/**
* Java API: obtain the [[Client]] singleton value.
*/
def client: Role = Client
/**
* Java API: obtain the [[Server]] singleton value.
*/
def server: Role = Server
}
sealed abstract class Role
/**
* The client is usually the side that consumes the service provided by its
* interlocutor. The precise interpretation of this role is protocol specific.
*/
sealed abstract class Client extends Role
case object Client extends Client
/**
* The server is usually the side the provides the service to its interlocutor.
* The precise interpretation of this role is protocol specific.
*/
sealed abstract class Server extends Role
case object Server extends Server
/**
* All streams in Akka are unidirectional: while in a complex flow graph data
* may flow in multiple directions these individual flows are independent from
* each other. The difference between two half-duplex connections in opposite
* directions and a full-duplex connection is that the underlying transport
* is shared in the latter and tearing it down will end the data transfer in
* both directions.
*
* When integrating a full-duplex transport medium that does not support
* half-closing (which means ending one direction of data transfer without
* ending the other) into a stream topology, there can be unexpected effects.
* Feeding a finite Source into this medium will close the connection after
* all elements have been sent, which means that possible replies may not
* be received in full. To support this type of usage, the sending and
* receiving of data on the same side (e.g. on the [[Client]]) need to be
* coordinated such that it is known when all replies have been received.
* Only then should the transport be shut down.
*
* To support these scenarios it is recommended that the full-duplex
* transport integration is configurable in terms of termination handling,
* which means that the user can optionally suppress the normal (closing)
* reaction to completion or cancellation events, as is expressed by the
* possible values of this type:
*
* - [[EagerClose]] means to not ignore signals
* - [[IgnoreCancel]] means to not react to cancellation of the receiving
* side unless the sending side has already completed
* - [[IgnoreComplete]] means to not react to the completion of the sending
* side unless the receiving side has already canceled
* - [[IgnoreBoth]] means to ignore the first termination signalbe that
* cancellation or completionand only act upon the second one
*/
sealed abstract class Closing {
def ignoreCancel: Boolean
def ignoreComplete: Boolean
}
object Closing {
/**
* Java API: obtain the [[EagerClose]] singleton value.
*/
def eagerClose: Closing = EagerClose
/**
* Java API: obtain the [[IgnoreCancel]] singleton value.
*/
def ignoreCancel: Closing = IgnoreCancel
/**
* Java API: obtain the [[IgnoreComplete]] singleton value.
*/
def ignoreComplete: Closing = IgnoreComplete
/**
* Java API: obtain the [[IgnoreBoth]] singleton value.
*/
def ignoreBoth: Closing = IgnoreBoth
}
/**
* see [[Closing]]
*/
sealed abstract class EagerClose extends Closing {
override def ignoreCancel = false
override def ignoreComplete = false
}
case object EagerClose extends EagerClose
/**
* see [[Closing]]
*/
sealed abstract class IgnoreCancel extends Closing {
override def ignoreCancel = true
override def ignoreComplete = false
}
case object IgnoreCancel extends IgnoreCancel
/**
* see [[Closing]]
*/
sealed abstract class IgnoreComplete extends Closing {
override def ignoreCancel = false
override def ignoreComplete = true
}
case object IgnoreComplete extends IgnoreComplete
/**
* see [[Closing]]
*/
sealed abstract class IgnoreBoth extends Closing {
override def ignoreCancel = true
override def ignoreComplete = true
}
case object IgnoreBoth extends IgnoreBoth
/**
* This is the supertype of all messages that the SslTls stage emits on the
* plaintext side.
*/
sealed trait SslTlsInbound
/**
* If the underlying transport is closed before the final TLS closure command
* is received from the peer then the SSLEngine will throw an SSLException that
* warns about possible truncation attacks. This exception is caught and
* translated into this message when encountered. Most of the time this occurs
* not because of a malicious attacker but due to a connection abort or a
* misbehaving communication peer.
*/
sealed abstract class SessionTruncated extends SslTlsInbound
case object SessionTruncated extends SessionTruncated
/**
* Plaintext bytes emitted by the SSLEngine are received over one specific
* encryption session and this class bundles the bytes with the SSLSession
* object. When the session changes due to renegotiation (which can be
* initiated by either party) the new session value will not compare equal to
* the previous one.
*
* The Java API for getting session information is given by the SSLSession object,
* the Scala API adapters are offered below.
*/
case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound with ScalaSessionAPI
/**
* This is the supertype of all messages that the SslTls stage accepts on its
* plaintext side.
*/
sealed trait SslTlsOutbound
/**
* Initiate a new session negotiation. Any [[SendBytes]] commands following
* this one will be held back (i.e. back-pressured) until the new handshake is
* completed, meaning that the bytes following this message will be encrypted
* according to the requirements outlined here.
*
* Each of the values in this message is optional and will have the following
* effect if provided:
*
* - `enabledCipherSuites` will be passed to `SSLEngine::setEnabledCipherSuites()`
* - `enabledProtocols` will be passed to `SSLEngine::setEnabledProtocols()`
* - `clientAuth` will be passed to `SSLEngine::setWantClientAuth()` or `SSLEngine.setNeedClientAuth()`, respectively
* - `sslParameters` will be passed to `SSLEngine::setSSLParameters()`
*
* Please note that passing `clientAuth = None` means that no change is done
* on client authentication requirements while `clientAuth = Some(ClientAuth.None)`
* switches off client authentication.
*/
case class NegotiateNewSession(
enabledCipherSuites: Option[immutable.Seq[String]],
enabledProtocols: Option[immutable.Seq[String]],
clientAuth: Option[ClientAuth],
sslParameters: Option[SSLParameters]) extends SslTlsOutbound {
/**
* Java API: Make a copy of this message with the given `enabledCipherSuites`.
*/
@varargs
def withCipherSuites(s: String*) = copy(enabledCipherSuites = Some(s.toList))
/**
* Java API: Make a copy of this message with the given `enabledProtocols`.
*/
@varargs
def withProtocols(p: String*) = copy(enabledProtocols = Some(p.toList))
/**
* Java API: Make a copy of this message with the given [[ClientAuth]] setting.
*/
def withClientAuth(ca: ClientAuth) = copy(clientAuth = Some(ca))
/**
* Java API: Make a copy of this message with the given [[SSLParameters]].
*/
def withParameters(p: SSLParameters) = copy(sslParameters = Some(p))
}
object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) {
/**
* Java API: obtain the default value (which will leave the SSLEngines
* settings unchanged).
*/
def withDefaults = this
}
/**
* Send the given [[akka.util.ByteString]] across the encrypted session to the
* peer.
*/
case class SendBytes(bytes: ByteString) extends SslTlsOutbound
/**
* An SSLEngine can either demand, allow or ignore its peers authentication
* (via certificates), where `Need` will fail the handshake if the peer does
* not provide valid credentials, `Want` allows the peer to send credentials
* and verifies them if provided, and `None` disables peer certificate
* verification.
*
* See the documentation for `SSLEngine::setWantClientAuth` for more information.
*/
sealed abstract class ClientAuth
object ClientAuth {
case object None extends ClientAuth
case object Want extends ClientAuth
case object Need extends ClientAuth
def none: ClientAuth = None
def want: ClientAuth = Want
def need: ClientAuth = Need
}

View file

@ -0,0 +1,101 @@
package akka.stream.javadsl
import java.util.Optional
import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.{ japi, NotUsed }
import akka.stream._
import akka.stream.TLSProtocol._
import akka.util.ByteString
import scala.compat.java8.OptionConverters
/**
* Stream cipher support based upon JSSE.
*
* The underlying SSLEngine has four ports: plaintext input/output and
* ciphertext input/output. These are modeled as a [[akka.stream.BidiShape]]
* element for use in stream topologies, where the plaintext ports are on the
* left hand side of the shape and the ciphertext ports on the right hand side.
*
* Configuring JSSE is a rather complex topic, please refer to the JDK platform
* documentation or the excellent user guide that is part of the Play Framework
* documentation. The philosophy of this integration into Akka Streams is to
* expose all knobs and dials to client code and therefore not limit the
* configuration possibilities. In particular the client code will have to
* provide the SSLContext from which the SSLEngine is then created. Handshake
* parameters are set using [[NegotiateNewSession]] messages, the settings for
* the initial handshake need to be provided up front using the same class;
* please refer to the method documentation below.
*
* '''IMPORTANT NOTE'''
*
* The TLS specification does not permit half-closing of the user data session
* that it transportsto be precise a half-close will always promptly lead to a
* full close. This means that canceling the plaintext output or completing the
* plaintext input of the SslTls stage will lead to full termination of the
* secure connection without regard to whether bytes are remaining to be sent or
* received, respectively. Especially for a client the common idiom of attaching
* a finite Source to the plaintext input and transforming the plaintext response
* bytes coming out will not work out of the box due to early termination of the
* connection. For this reason there is a parameter that determines whether the
* SslTls stage shall ignore completion and/or cancellation events, and the
* default is to ignore completion (in view of the clientserver scenario). In
* order to terminate the connection the client will then need to cancel the
* plaintext output as soon as all expected bytes have been received. When
* ignoring both types of events the stage will shut down once both events have
* been received. See also [[TLSClosing]].
*/
object TLS {
/**
* Create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* This method uses the default closing behavior or [[IgnoreComplete]].
*/
def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, firstSession, role))
/**
* Create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[SSLParameters.setEndpointIdentificationAlgorithm]].
*/
def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, hostInfo: Optional[japi.Pair[String, java.lang.Integer]], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, firstSession, role, closing, OptionConverters.toScala(hostInfo).map(e (e.first, e.second))))
}
/**
* This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can
* be used instead of [[TLS]] when no encryption is desired. The flows will
* just adapt the message protocol by wrapping into [[SessionBytes]] and
* unwrapping [[SendBytes]].
*/
object TLSPlacebo {
/**
* Returns a reusable [[BidiFlow]] instance representing a [[TLSPlacebo$]].
*/
def getInstance(): javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = forJava
private val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] =
new javadsl.BidiFlow(scaladsl.TLSPlacebo())
}

View file

@ -0,0 +1,137 @@
package akka.stream.scaladsl
import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.stream.impl.io.TlsModule
import akka.NotUsed
import akka.stream._
import akka.stream.TLSProtocol._
import akka.util.ByteString
/**
* Stream cipher support based upon JSSE.
*
* The underlying SSLEngine has four ports: plaintext input/output and
* ciphertext input/output. These are modeled as a [[akka.stream.BidiShape]]
* element for use in stream topologies, where the plaintext ports are on the
* left hand side of the shape and the ciphertext ports on the right hand side.
*
* Configuring JSSE is a rather complex topic, please refer to the JDK platform
* documentation or the excellent user guide that is part of the Play Framework
* documentation. The philosophy of this integration into Akka Streams is to
* expose all knobs and dials to client code and therefore not limit the
* configuration possibilities. In particular the client code will have to
* provide the SSLContext from which the SSLEngine is then created. Handshake
* parameters are set using [[NegotiateNewSession]] messages, the settings for
* the initial handshake need to be provided up front using the same class;
* please refer to the method documentation below.
*
* '''IMPORTANT NOTE'''
*
* The TLS specification does not permit half-closing of the user data session
* that it transportsto be precise a half-close will always promptly lead to a
* full close. This means that canceling the plaintext output or completing the
* plaintext input of the SslTls stage will lead to full termination of the
* secure connection without regard to whether bytes are remaining to be sent or
* received, respectively. Especially for a client the common idiom of attaching
* a finite Source to the plaintext input and transforming the plaintext response
* bytes coming out will not work out of the box due to early termination of the
* connection. For this reason there is a parameter that determines whether the
* SslTls stage shall ignore completion and/or cancellation events, and the
* default is to ignore completion (in view of the clientserver scenario). In
* order to terminate the connection the client will then need to cancel the
* plaintext output as soon as all expected bytes have been received. When
* ignoring both types of events the stage will shut down once both events have
* been received. See also [[TLSClosing]].
*/
object TLS {
/**
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[SSLParameters.setEndpointIdentificationAlgorithm]].
*/
def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole,
closing: TLSClosing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, firstSession, role, closing, hostInfo))
}
/**
* This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can
* be used instead of [[TLS]] when no encryption is desired. The flows will
* just adapt the message protocol by wrapping into [[SessionBytes]] and
* unwrapping [[SendBytes]].
*/
object TLSPlacebo {
// this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL
private[akka] val dummySession = SSLContext.getDefault.createSSLEngine.getSession
def apply(): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = instance
private val instance: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] =
scaladsl.BidiFlow.fromGraph(scaladsl.GraphDSL.create() { implicit b
val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) bytes })
val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(dummySession, _)))
BidiShape.fromFlows(top, bottom)
})
}
import java.security.Principal
import java.security.cert.Certificate
import javax.net.ssl.{ SSLPeerUnverifiedException, SSLSession }
/** Allows access to an SSLSession with Scala types */
trait ScalaSessionAPI {
/**
* The underlying [[javax.net.ssl.SSLSession]].
*/
def session: SSLSession
/**
* Scala API: Extract the certificates that were actually used by this
* engine during this sessions negotiation. The list is empty if no
* certificates were used.
*/
def localCertificates: List[Certificate] = Option(session.getLocalCertificates).map(_.toList).getOrElse(Nil)
/**
* Scala API: Extract the Principal that was actually used by this engine
* during this sessions negotiation.
*/
def localPrincipal: Option[Principal] = Option(session.getLocalPrincipal)
/**
* Scala API: Extract the certificates that were used by the peer engine
* during this sessions negotiation. The list is empty if no certificates
* were used.
*/
def peerCertificates: List[Certificate] =
try Option(session.getPeerCertificates).map(_.toList).getOrElse(Nil)
catch { case e: SSLPeerUnverifiedException Nil }
/**
* Scala API: Extract the Principal that the peer engine presented during
* this sessions negotiation.
*/
def peerPrincipal: Option[Principal] =
try Option(session.getPeerPrincipal)
catch { case e: SSLPeerUnverifiedException None }
}
object ScalaSessionAPI {
/** Constructs a ScalaSessionAPI instance from an SSLSession */
def apply(_session: SSLSession): ScalaSessionAPI =
new ScalaSessionAPI {
def session: SSLSession = _session
}
}