From 4ba4c27b0ffa4d8d2732ce9eb6f920110caabc8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 16 Feb 2016 18:19:30 +0100 Subject: [PATCH] splitting up TLS APIs --- .gitignore | 4 + .../stream/migration-guide-2.0-2.4-java.rst | 6 + .../stream/migration-guide-2.0-2.4-scala.rst | 7 + .../client/OutgoingConnectionBlueprint.scala | 2 +- .../engine/parsing/HttpMessageParser.scala | 2 +- .../engine/server/HttpServerBluePrint.scala | 13 +- .../impl/engine/ws/FrameEventParser.scala | 2 +- .../engine/ws/WebSocketClientBlueprint.scala | 4 +- .../akka/http/javadsl/ConnectionContext.scala | 6 +- .../main/scala/akka/http/javadsl/Http.scala | 2 +- .../http/scaladsl/ConnectionContext.scala | 9 +- .../main/scala/akka/http/scaladsl/Http.scala | 18 +- .../http/scaladsl/model/headers/headers.scala | 3 +- .../engine/client/ConnectionPoolSpec.scala | 2 +- .../LowLevelOutgoingConnectionSpec.scala | 4 +- .../client/TlsEndpointVerificationSpec.scala | 3 +- .../engine/parsing/RequestParserSpec.scala | 4 +- .../engine/parsing/ResponseParserSpec.scala | 4 +- .../server/HttpServerTestSetupBase.scala | 5 +- .../impl/engine/ws/WSClientAutobahnTest.scala | 1 - .../impl/engine/ws/WebSocketClientSpec.scala | 2 +- .../engine/ws/WebSocketIntegrationSpec.scala | 5 +- .../javadsl/model/JavaApiTestCaseSpecs.scala | 4 +- .../akka/http/scaladsl/coding/Deflate.scala | 4 +- .../akka/http/scaladsl/coding/Gzip.scala | 4 +- .../test/scala/akka/stream/io/TlsSpec.scala | 23 +- .../scala/akka/stream/SslTlsOptions.scala | 255 ++++++++++ .../stream/impl/ActorMaterializerImpl.scala | 14 +- .../{ => impl}/io/ByteStringParser.scala | 17 +- ...SslTlsCipherActor.scala => TLSActor.scala} | 30 +- .../scala/akka/stream/impl/io/TlsModule.scala | 45 ++ .../main/scala/akka/stream/io/SslTls.scala | 453 ------------------ .../main/scala/akka/stream/javadsl/TLS.scala | 101 ++++ .../main/scala/akka/stream/scaladsl/TLS.scala | 137 ++++++ 34 files changed, 651 insertions(+), 544 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/SslTlsOptions.scala rename akka-stream/src/main/scala/akka/stream/{ => impl}/io/ByteStringParser.scala (95%) rename akka-stream/src/main/scala/akka/stream/impl/io/{SslTlsCipherActor.scala => TLSActor.scala} (94%) create mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/io/SslTls.scala create mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala diff --git a/.gitignore b/.gitignore index 2716cbae25..8e716eb2d4 100755 --- a/.gitignore +++ b/.gitignore @@ -80,5 +80,9 @@ worker*.log *-shim.sbt test-output +# test output of QuickStart guide +factorials.txt +factorial2.txt + # Default sigar library extract location. native/ diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index 2ddd2f16ca..eeabc82333 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -185,6 +185,12 @@ The old behaviour can be achieved by explicitly draining the entity: response.entity().getDataBytes().runWith(Sink.ignore()) +SslTls has been renamed to TLS and moved +---------------------------------------- + +The DSL to access a TLS (or SSL) :class:`BidiFlow` have now split between the ``javadsl`` and ``scaladsl`` packages and +have been renamed to :class:`TLS`. Common option types (closing modes, authentication modes, etc.) have been moved to +the top level ``stream`` package, and the common message types are accessible in the class :class:`akka.stream.TLSProtocol` Websocket now consistently named WebSocket ------------------------------------------ diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 7d0a493f05..4f90f6162f 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -190,6 +190,13 @@ Replace with:: http.cachedHostConnectionPool(toHostHttps("akka.io", 8081), materializer()); http.cachedHostConnectionPool(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext), materializer()); +SslTls has been renamed to TLS and moved +---------------------------------------- + +The DSL to access a TLS (or SSL) :class:`BidiFlow` have now split between the ``javadsl`` and ``scaladsl`` packages and +have been renamed to :class:`TLS`. Common option types (closing modes, authentication modes, etc.) have been moved to +the top level ``stream`` package, and the common message types are accessible in the class :class:`akka.stream.TLSProtocol` + Framing moved to akka.stream.[javadsl/scaladsl] ----------------------------------------------- diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 35da511010..f195c6f642 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -9,7 +9,7 @@ import akka.http.scaladsl.settings.{ ClientConnectionSettings, ParserSettings } import language.existentials import scala.annotation.tailrec import scala.collection.mutable.ListBuffer -import akka.stream.io.{ SessionBytes, SslTlsInbound, SendBytes } +import akka.stream.TLSProtocol._ import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream._ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala index 0f135d1e9d..254fdf84a3 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpMessageParser.scala @@ -6,7 +6,7 @@ package akka.http.impl.engine.parsing import javax.net.ssl.SSLSession -import akka.stream.io.SessionBytes +import akka.stream.TLSProtocol._ import scala.annotation.tailrec import scala.collection.mutable.ListBuffer diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 0d67ed92cc..9f7a5b6abc 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -16,7 +16,7 @@ import akka.japi.Function import akka.event.LoggingAdapter import akka.util.ByteString import akka.stream._ -import akka.stream.io._ +import akka.stream.TLSProtocol._ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.http.scaladsl.settings.ServerSettings @@ -150,10 +150,9 @@ private[http] object HttpServerBluePrint { } } - def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity = creator match { - case StrictEntityCreator(entity) ⇒ entity + case StrictEntityCreator(entity) ⇒ entity case StreamedEntityCreator(creator) ⇒ streamRequestEntity(creator) } @@ -253,7 +252,7 @@ private[http] object HttpServerBluePrint { } class RequestTimeoutSupport(initialTimeout: FiniteDuration) - extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] { + extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] { private val requestIn = Inlet[HttpRequest]("requestIn") private val requestOut = Outlet[HttpRequest]("requestOut") private val responseIn = Inlet[HttpResponse]("responseIn") @@ -309,7 +308,7 @@ private[http] object HttpServerBluePrint { private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit], trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer) - extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ + extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ import materializer.executionContext set { @@ -351,7 +350,7 @@ private[http] object HttpServerBluePrint { } class ControllerStage(settings: ServerSettings, log: LoggingAdapter) - extends GraphStage[BidiShape[RequestOutput, RequestOutput, HttpResponse, ResponseRenderingContext]] { + extends GraphStage[BidiShape[RequestOutput, RequestOutput, HttpResponse, ResponseRenderingContext]] { private val requestParsingIn = Inlet[RequestOutput]("requestParsingIn") private val requestPrepOut = Outlet[RequestOutput]("requestPrepOut") private val httpResponseIn = Inlet[HttpResponse]("httpResponseIn") @@ -533,7 +532,7 @@ private[http] object HttpServerBluePrint { One2OneBidiFlow[HttpRequest, HttpResponse](pipeliningLimit).reversed private class ProtocolSwitchStage(settings: ServerSettings, log: LoggingAdapter) - extends GraphStage[BidiShape[ResponseRenderingOutput, ByteString, SessionBytes, SessionBytes]] { + extends GraphStage[BidiShape[ResponseRenderingOutput, ByteString, SessionBytes, SessionBytes]] { private val fromNet = Inlet[SessionBytes]("fromNet") private val toNet = Outlet[ByteString]("toNet") diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala index 8d04e133b0..d2aa1dcfac 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala @@ -4,9 +4,9 @@ package akka.http.impl.engine.ws +import akka.stream.impl.io.ByteStringParser import akka.util.ByteString import scala.annotation.tailrec -import akka.stream.io.ByteStringParser import akka.stream.Attributes /** diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala index cae2f15287..13b7ccdb30 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala @@ -13,8 +13,8 @@ import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.stage._ -import akka.stream.BidiShape -import akka.stream.io.{ SessionBytes, SendBytes, SslTlsInbound } +import akka.stream._ +import akka.stream.TLSProtocol._ import akka.stream.scaladsl._ import akka.http.scaladsl.settings.ClientConnectionSettings diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala index 12f660fe36..d7d2413c78 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala @@ -6,7 +6,7 @@ package akka.http.javadsl import java.util.{ Collection ⇒ JCollection, Optional } import javax.net.ssl.{ SSLContext, SSLParameters } import akka.http.scaladsl -import akka.stream.io.ClientAuth +import akka.stream.TLSClientAuth import scala.compat.java8.OptionConverters @@ -18,7 +18,7 @@ object ConnectionContext { /** Used to serve HTTPS traffic. */ def https(sslContext: SSLContext, enabledCipherSuites: Optional[JCollection[String]], - enabledProtocols: Optional[JCollection[String]], clientAuth: Optional[ClientAuth], sslParameters: Optional[SSLParameters]) = + enabledProtocols: Optional[JCollection[String]], clientAuth: Optional[TLSClientAuth], sslParameters: Optional[SSLParameters]) = scaladsl.ConnectionContext.https(sslContext, sslParameters = OptionConverters.toScala(sslParameters)) //#https-context-creation @@ -47,7 +47,7 @@ abstract class HttpsConnectionContext extends akka.http.javadsl.ConnectionContex /** Java API */ def getEnabledProtocols: Optional[JCollection[String]] /** Java API */ - def getClientAuth: Optional[ClientAuth] + def getClientAuth: Optional[TLSClientAuth] /** Java API */ def getSslContext: SSLContext diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 3d0e2ee6ab..d742639407 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -11,7 +11,7 @@ import akka.http.impl.util.JavaMapping.HttpsConnectionContext import akka.http.javadsl.model.ws._ import akka.http.javadsl.settings.{ ConnectionPoolSettings, ClientConnectionSettings, ServerSettings } import akka.{ NotUsed, stream } -import akka.stream.io.{ SslTlsInbound, SslTlsOutbound } +import akka.stream.TLSProtocol._ import scala.language.implicitConversions import scala.concurrent.Future import scala.util.Try diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala index 95b21f1f88..e3176496f7 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala @@ -4,7 +4,8 @@ package akka.http.scaladsl -import akka.stream.io.{ ClientAuth, NegotiateNewSession } +import akka.stream.TLSClientAuth +import akka.stream.TLSProtocol._ import scala.collection.JavaConverters._ import java.util.{ Optional, Collection ⇒ JCollection } @@ -22,7 +23,7 @@ object ConnectionContext { def https(sslContext: SSLContext, enabledCipherSuites: Option[immutable.Seq[String]] = None, enabledProtocols: Option[immutable.Seq[String]] = None, - clientAuth: Option[ClientAuth] = None, + clientAuth: Option[TLSClientAuth] = None, sslParameters: Option[SSLParameters] = None) = { new HttpsConnectionContext(sslContext, enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) } @@ -35,7 +36,7 @@ final class HttpsConnectionContext( val sslContext: SSLContext, val enabledCipherSuites: Option[immutable.Seq[String]] = None, val enabledProtocols: Option[immutable.Seq[String]] = None, - val clientAuth: Option[ClientAuth] = None, + val clientAuth: Option[TLSClientAuth] = None, val sslParameters: Option[SSLParameters] = None) extends akka.http.javadsl.HttpsConnectionContext with ConnectionContext { @@ -44,7 +45,7 @@ final class HttpsConnectionContext( override def getSslContext = sslContext override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava - override def getClientAuth: Optional[ClientAuth] = clientAuth.asJava + override def getClientAuth: Optional[TLSClientAuth] = clientAuth.asJava override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 6f83e8507d..0c1b71c5a3 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -22,8 +22,8 @@ import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgrade import akka.http.scaladsl.settings.{ ServerSettings, ClientConnectionSettings, ConnectionPoolSettings } import akka.http.scaladsl.util.FastFuture import akka.{ Done, NotUsed } -import akka.stream.Materializer -import akka.stream.io._ +import akka.stream._ +import akka.stream.TLSProtocol._ import akka.stream.scaladsl._ import com.typesafe.config.Config import com.typesafe.sslconfig.akka._ @@ -608,10 +608,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte } /** Creates real or placebo SslTls stage based on if ConnectionContext is HTTPS or not. */ - private[http] def sslTlsStage(connectionContext: ConnectionContext, role: Role, hostInfo: Option[(String, Int)] = None) = + private[http] def sslTlsStage(connectionContext: ConnectionContext, role: TLSRole, hostInfo: Option[(String, Int)] = None) = connectionContext match { - case hctx: HttpsConnectionContext ⇒ SslTls(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) - case other ⇒ SslTlsPlacebo.forScala // if it's not HTTPS, we don't enable SSL/TLS + case hctx: HttpsConnectionContext ⇒ TLS(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) + case other ⇒ TLSPlacebo() // if it's not HTTPS, we don't enable SSL/TLS } } @@ -746,7 +746,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * TLS configuration for an HTTPS server binding or client connection. * For the sslContext please refer to the com.typeasfe.ssl-config library. * The remaining four parameters configure the initial session that will - * be negotiated, see [[akka.stream.io.NegotiateNewSession]] for details. + * be negotiated, see [[NegotiateNewSession]] for details. */ trait DefaultSSLContextCreation { @@ -786,9 +786,9 @@ trait DefaultSSLContextCreation { import com.typesafe.sslconfig.ssl.{ ClientAuth ⇒ SslClientAuth } val clientAuth = config.sslParametersConfig.clientAuth match { case SslClientAuth.Default ⇒ None - case SslClientAuth.Want ⇒ Some(ClientAuth.Want) - case SslClientAuth.Need ⇒ Some(ClientAuth.Need) - case SslClientAuth.None ⇒ Some(ClientAuth.None) + case SslClientAuth.Want ⇒ Some(TLSClientAuth.Want) + case SslClientAuth.Need ⇒ Some(TLSClientAuth.Need) + case SslClientAuth.None ⇒ Some(TLSClientAuth.None) } // hostname! defaultParams.setEndpointIdentificationAlgorithm("https") diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index 9f2d14bd46..1bd55bd681 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -9,12 +9,13 @@ import java.net.InetSocketAddress import java.security.MessageDigest import java.util import javax.net.ssl.SSLSession +import akka.stream.scaladsl.ScalaSessionAPI + import scala.reflect.ClassTag import scala.util.{ Failure, Success, Try } import scala.annotation.tailrec import scala.collection.immutable import akka.parboiled2.util.Base64 -import akka.stream.io.ScalaSessionAPI import akka.http.impl.util._ import akka.http.javadsl.{ model ⇒ jm } import akka.http.scaladsl.model._ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index b41590b149..f6fb04f6ad 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -18,8 +18,8 @@ import akka.util.ByteString import akka.http.scaladsl.{ TestUtils, Http } import akka.http.impl.util.{ SingletonException, StreamUtils } import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } -import akka.stream.io.{ SessionBytes, SendBytes, SslTlsOutbound } import akka.stream.{ BidiShape, ActorMaterializer } +import akka.stream.TLSProtocol._ import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } import akka.stream.scaladsl._ import akka.http.scaladsl.model.headers._ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 6e53fdda39..d973805051 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -9,10 +9,10 @@ import scala.reflect.ClassTag import org.scalatest.Inside import org.scalatest.concurrent.ScalaFutures import akka.http.scaladsl.settings.ClientConnectionSettings -import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes } import akka.util.ByteString import akka.event.NoLogging -import akka.stream.{ ClosedShape, ActorMaterializer } +import akka.stream.{ClosedShape, ActorMaterializer} +import akka.stream.TLSProtocol._ import akka.stream.testkit._ import akka.stream.scaladsl._ import akka.http.scaladsl.model.HttpEntity._ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala index f70691837b..4af89441e2 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala @@ -7,8 +7,7 @@ package akka.http.impl.engine.client import akka.NotUsed import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures -import akka.stream.ActorMaterializer -import akka.stream.io._ +import akka.stream.{ Server, Client, ActorMaterializer } import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import akka.http.impl.util._ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index dbb0cef368..8ee45396dd 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -18,7 +18,7 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ -import akka.stream.io.{ SslTlsPlacebo, SessionBytes } +import akka.stream.TLSProtocol._ import org.scalatest.matchers.Matcher import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } @@ -482,7 +482,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] = Source(input.toList) - .map(bytes ⇒ SessionBytes(SslTlsPlacebo.dummySession, ByteString(bytes))) + .map(bytes ⇒ SessionBytes(TLSPlacebo.dummySession, ByteString(bytes))) .transform(() ⇒ parser.stage).named("parser") .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .prefixAndTail(1) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala index dbcc470240..8e44e356e0 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala @@ -7,7 +7,7 @@ package akka.http.impl.engine.parsing import akka.NotUsed import akka.http.scaladsl.settings.ParserSettings import akka.http.scaladsl.util.FastFuture -import akka.stream.io.{ SslTlsPlacebo, SessionBytes } +import akka.stream.TLSProtocol._ import com.typesafe.config.{ ConfigFactory, Config } import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ @@ -292,7 +292,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def rawParse(requestMethod: HttpMethod, input: String*): Source[Either[ResponseOutput, HttpResponse], NotUsed] = Source(input.toList) - .map(bytes ⇒ SessionBytes(SslTlsPlacebo.dummySession, ByteString(bytes))) + .map(bytes ⇒ SessionBytes(TLSPlacebo.dummySession, ByteString(bytes))) .transform(() ⇒ newParserStage(requestMethod)).named("parser") .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .prefixAndTail(1) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala index af4b76b46e..45f503a4dc 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala @@ -7,18 +7,17 @@ package akka.http.impl.engine.server import java.net.InetSocketAddress import akka.http.impl.engine.ws.ByteStringSinkProbe import akka.http.scaladsl.settings.ServerSettings -import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes } +import akka.stream.TLSProtocol._ import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem import akka.event.NoLogging import akka.util.ByteString -import akka.stream.{ ClosedShape, Materializer } +import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.http.impl.util._ import akka.http.scaladsl.model.headers.{ ProductVersion, Server } import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } -import akka.stream.OverflowStrategy abstract class HttpServerTestSetupBase { implicit def system: ActorSystem diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala index 0d58bcfd84..9863653aa7 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala @@ -12,7 +12,6 @@ import spray.json._ import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.io.SslTlsPlacebo import akka.stream.stage.{ TerminationDirective, Context, SyncDirective, PushStage } import akka.stream.scaladsl._ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketClientSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketClientSpec.scala index b75a323042..0f29f05098 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketClientSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketClientSpec.scala @@ -9,6 +9,7 @@ import java.util.Random import akka.NotUsed import akka.http.scaladsl.model.ws.{ InvalidUpgradeResponse, WebSocketUpgradeResponse } import akka.stream.ClosedShape +import akka.stream.TLSProtocol._ import scala.concurrent.duration._ @@ -17,7 +18,6 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.{ ProductVersion, `User-Agent` } import akka.http.scaladsl.model.ws._ import akka.http.scaladsl.model.Uri -import akka.stream.io._ import akka.stream.scaladsl._ import akka.stream.testkit.{ TestSubscriber, TestPublisher } import akka.util.ByteString diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala index d4a0d6ad1c..bd4fb84d2a 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala @@ -17,7 +17,6 @@ import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.stream.scaladsl.GraphDSL.Implicits._ import org.scalatest.concurrent.Eventually -import akka.stream.io.SslTlsPlacebo import java.net.InetSocketAddress import akka.stream.impl.fusing.GraphStages import akka.util.ByteString @@ -78,7 +77,7 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug. Source.empty .viaMat { Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) - .atop(SslTlsPlacebo.forScala) + .atop(TLSPlacebo()) .joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via( Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both) }(Keep.right) @@ -158,7 +157,7 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug. Source.maybe .viaMat { Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) - .atop(SslTlsPlacebo.forScala) + .atop(TLSPlacebo()) // the resource leak of #19398 existed only for severed websocket connections .atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right) .join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)) diff --git a/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala b/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala index 831f03328e..ec08a389e5 100644 --- a/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala +++ b/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala @@ -10,7 +10,7 @@ import javax.net.ssl.{ SSLParameters, SSLContext } import akka.http.javadsl.model.headers.Cookie import akka.http.scaladsl.model import akka.http.scaladsl.model.headers.BasicHttpCredentials -import akka.stream.io.ClientAuth +import akka.stream.TLSClientAuth import org.scalatest.{ FreeSpec, MustMatchers } import scala.collection.immutable @@ -64,7 +64,7 @@ class JavaApiTestCaseSpecs extends FreeSpec with MustMatchers { akka.http.javadsl.ConnectionContext.https(SSLContext.getDefault, Optional.empty[java.util.Collection[String]], Optional.empty[java.util.Collection[String]], - Optional.empty[ClientAuth], + Optional.empty[TLSClientAuth], Optional.empty[SSLParameters]) mustNot be(null) } } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala b/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala index 646b564fe6..4e847bef0c 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala @@ -6,8 +6,8 @@ package akka.http.scaladsl.coding import java.util.zip.{ Inflater, Deflater } import akka.stream.Attributes -import akka.stream.io.ByteStringParser -import akka.stream.io.ByteStringParser.{ ParseResult, ParseStep } +import akka.stream.impl.io.ByteStringParser +import ByteStringParser.{ ParseResult, ParseStep } import akka.util.{ ByteStringBuilder, ByteString } import scala.annotation.tailrec diff --git a/akka-http/src/main/scala/akka/http/scaladsl/coding/Gzip.scala b/akka-http/src/main/scala/akka/http/scaladsl/coding/Gzip.scala index 35724e3de6..ce83632a59 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/coding/Gzip.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/coding/Gzip.scala @@ -10,8 +10,8 @@ import akka.http.impl.engine.ws.{ ProtocolException, FrameEvent } import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpEncodings import akka.stream.Attributes -import akka.stream.io.ByteStringParser -import akka.stream.io.ByteStringParser.{ ParseResult, ParseStep } +import akka.stream.impl.io.ByteStringParser +import ByteStringParser.{ ParseResult, ParseStep } import akka.util.ByteString class Gzip(val messageFilter: HttpMessage ⇒ Boolean) extends Coder with StreamDecoder { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 5cec762df0..7292dafde1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -15,6 +15,7 @@ import scala.util.Random import akka.actor.ActorSystem import akka.pattern.{ after ⇒ later } import akka.stream._ +import akka.stream.TLSProtocol._ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.stream.testkit._ @@ -102,9 +103,9 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off } val cipherSuites = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") - def clientTls(closing: Closing) = SslTls(sslContext, cipherSuites, Client, closing) - def badClientTls(closing: Closing) = SslTls(initWithTrust("/badtruststore"), cipherSuites, Client, closing) - def serverTls(closing: Closing) = SslTls(sslContext, cipherSuites, Server, closing) + def clientTls(closing: TLSClosing) = TLS(sslContext, cipherSuites, Client, closing) + def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), cipherSuites, Client, closing) + def serverTls(closing: TLSClosing) = TLS(sslContext, cipherSuites, Server, closing) trait Named { def name: String = @@ -116,19 +117,19 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off } trait CommunicationSetup extends Named { - def decorateFlow(leftClosing: Closing, rightClosing: Closing, + def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed] def cleanup(): Unit = () } object ClientInitiates extends CommunicationSetup { - def decorateFlow(leftClosing: Closing, rightClosing: Closing, + def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = clientTls(leftClosing) atop serverTls(rightClosing).reversed join rhs } object ServerInitiates extends CommunicationSetup { - def decorateFlow(leftClosing: Closing, rightClosing: Closing, + def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = serverTls(leftClosing) atop clientTls(rightClosing).reversed join rhs } @@ -143,7 +144,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off object ClientInitiatesViaTcp extends CommunicationSetup { var binding: Tcp.ServerBinding = null - def decorateFlow(leftClosing: Closing, rightClosing: Closing, + def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { binding = server(serverTls(rightClosing).reversed join rhs) clientTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress) @@ -153,7 +154,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off object ServerInitiatesViaTcp extends CommunicationSetup { var binding: Tcp.ServerBinding = null - def decorateFlow(leftClosing: Closing, rightClosing: Closing, + def decorateFlow(leftClosing: TLSClosing, rightClosing: TLSClosing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { binding = server(clientTls(rightClosing).reversed join rhs) serverTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress) @@ -189,8 +190,8 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off case SessionBytes(s, b) ⇒ SendBytes(b) } } - def leftClosing: Closing = IgnoreComplete - def rightClosing: Closing = IgnoreComplete + def leftClosing: TLSClosing = IgnoreComplete + def rightClosing: TLSClosing = IgnoreComplete def inputs: immutable.Seq[SslTlsOutbound] def output: ByteString @@ -441,7 +442,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off "pass through data" in { val f = Source(1 to 3) .map(b ⇒ SendBytes(ByteString(b.toByte))) - .via(SslTlsPlacebo.forScala join Flow.apply) + .via(TLSPlacebo() join Flow.apply) .grouped(10) .runWith(Sink.head) val result = Await.result(f, 3.seconds) diff --git a/akka-stream/src/main/scala/akka/stream/SslTlsOptions.scala b/akka-stream/src/main/scala/akka/stream/SslTlsOptions.scala new file mode 100644 index 0000000000..49c98651a9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/SslTlsOptions.scala @@ -0,0 +1,255 @@ +/** + * Copyright (C) 2015-2016 Typesafe Inc. + */ +package akka.stream + +import javax.net.ssl._ + +import akka.util.ByteString + +import scala.annotation.varargs +import scala.collection.immutable + +/** + * Many protocols are asymmetric and distinguish between the client and the + * server, where the latter listens passively for messages and the former + * actively initiates the exchange. + */ +object TLSRole { + /** + * Java API: obtain the [[Client]] singleton value. + */ + def client: TLSRole = Client + /** + * Java API: obtain the [[Server]] singleton value. + */ + def server: TLSRole = Server +} +sealed abstract class TLSRole + +/** + * The client is usually the side that consumes the service provided by its + * interlocutor. The precise interpretation of this role is protocol specific. + */ +sealed abstract class Client extends TLSRole +case object Client extends Client + +/** + * The server is usually the side the provides the service to its interlocutor. + * The precise interpretation of this role is protocol specific. + */ +sealed abstract class Server extends TLSRole +case object Server extends Server + +/** + * All streams in Akka are unidirectional: while in a complex flow graph data + * may flow in multiple directions these individual flows are independent from + * each other. The difference between two half-duplex connections in opposite + * directions and a full-duplex connection is that the underlying transport + * is shared in the latter and tearing it down will end the data transfer in + * both directions. + * + * When integrating a full-duplex transport medium that does not support + * half-closing (which means ending one direction of data transfer without + * ending the other) into a stream topology, there can be unexpected effects. + * Feeding a finite Source into this medium will close the connection after + * all elements have been sent, which means that possible replies may not + * be received in full. To support this type of usage, the sending and + * receiving of data on the same side (e.g. on the [[Client]]) need to be + * coordinated such that it is known when all replies have been received. + * Only then should the transport be shut down. + * + * To support these scenarios it is recommended that the full-duplex + * transport integration is configurable in terms of termination handling, + * which means that the user can optionally suppress the normal (closing) + * reaction to completion or cancellation events, as is expressed by the + * possible values of this type: + * + * - [[EagerClose]] means to not ignore signals + * - [[IgnoreCancel]] means to not react to cancellation of the receiving + * side unless the sending side has already completed + * - [[IgnoreComplete]] means to not react to the completion of the sending + * side unless the receiving side has already canceled + * - [[IgnoreBoth]] means to ignore the first termination signal—be that + * cancellation or completion—and only act upon the second one + */ +sealed abstract class TLSClosing { + def ignoreCancel: Boolean + def ignoreComplete: Boolean +} +object TLSClosing { + /** + * Java API: obtain the [[EagerClose]] singleton value. + */ + def eagerClose: TLSClosing = EagerClose + /** + * Java API: obtain the [[IgnoreCancel]] singleton value. + */ + def ignoreCancel: TLSClosing = IgnoreCancel + /** + * Java API: obtain the [[IgnoreComplete]] singleton value. + */ + def ignoreComplete: TLSClosing = IgnoreComplete + /** + * Java API: obtain the [[IgnoreBoth]] singleton value. + */ + def ignoreBoth: TLSClosing = IgnoreBoth +} + +/** + * see [[TLSClosing]] + */ +sealed abstract class EagerClose extends TLSClosing { + override def ignoreCancel = false + override def ignoreComplete = false +} +case object EagerClose extends EagerClose + +/** + * see [[TLSClosing]] + */ +sealed abstract class IgnoreCancel extends TLSClosing { + override def ignoreCancel = true + override def ignoreComplete = false +} +case object IgnoreCancel extends IgnoreCancel + +/** + * see [[TLSClosing]] + */ +sealed abstract class IgnoreComplete extends TLSClosing { + override def ignoreCancel = false + override def ignoreComplete = true +} +case object IgnoreComplete extends IgnoreComplete + +/** + * see [[TLSClosing]] + */ +sealed abstract class IgnoreBoth extends TLSClosing { + override def ignoreCancel = true + override def ignoreComplete = true +} +case object IgnoreBoth extends IgnoreBoth + +object TLSProtocol { + + /** + * This is the supertype of all messages that the SslTls stage emits on the + * plaintext side. + */ + sealed trait SslTlsInbound + + /** + * If the underlying transport is closed before the final TLS closure command + * is received from the peer then the SSLEngine will throw an SSLException that + * warns about possible truncation attacks. This exception is caught and + * translated into this message when encountered. Most of the time this occurs + * not because of a malicious attacker but due to a connection abort or a + * misbehaving communication peer. + */ + sealed abstract class SessionTruncated extends SslTlsInbound + + case object SessionTruncated extends SessionTruncated + + /** + * Plaintext bytes emitted by the SSLEngine are received over one specific + * encryption session and this class bundles the bytes with the SSLSession + * object. When the session changes due to renegotiation (which can be + * initiated by either party) the new session value will not compare equal to + * the previous one. + * + * The Java API for getting session information is given by the SSLSession object, + * the Scala API adapters are offered below. + */ + final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound with scaladsl.ScalaSessionAPI + + /** + * This is the supertype of all messages that the SslTls stage accepts on its + * plaintext side. + */ + sealed trait SslTlsOutbound + + /** + * Initiate a new session negotiation. Any [[SendBytes]] commands following + * this one will be held back (i.e. back-pressured) until the new handshake is + * completed, meaning that the bytes following this message will be encrypted + * according to the requirements outlined here. + * + * Each of the values in this message is optional and will have the following + * effect if provided: + * + * - `enabledCipherSuites` will be passed to `SSLEngine::setEnabledCipherSuites()` + * - `enabledProtocols` will be passed to `SSLEngine::setEnabledProtocols()` + * - `clientAuth` will be passed to `SSLEngine::setWantClientAuth()` or `SSLEngine.setNeedClientAuth()`, respectively + * - `sslParameters` will be passed to `SSLEngine::setSSLParameters()` + * + * Please note that passing `clientAuth = None` means that no change is done + * on client authentication requirements while `clientAuth = Some(ClientAuth.None)` + * switches off client authentication. + */ + case class NegotiateNewSession( + enabledCipherSuites: Option[immutable.Seq[String]], + enabledProtocols: Option[immutable.Seq[String]], + clientAuth: Option[TLSClientAuth], + sslParameters: Option[SSLParameters]) extends SslTlsOutbound { + + /** + * Java API: Make a copy of this message with the given `enabledCipherSuites`. + */ + @varargs + def withCipherSuites(s: String*) = copy(enabledCipherSuites = Some(s.toList)) + + /** + * Java API: Make a copy of this message with the given `enabledProtocols`. + */ + @varargs + def withProtocols(p: String*) = copy(enabledProtocols = Some(p.toList)) + + /** + * Java API: Make a copy of this message with the given [[TLSClientAuth]] setting. + */ + def withClientAuth(ca: TLSClientAuth) = copy(clientAuth = Some(ca)) + + /** + * Java API: Make a copy of this message with the given [[SSLParameters]]. + */ + def withParameters(p: SSLParameters) = copy(sslParameters = Some(p)) + } + + object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) { + /** + * Java API: obtain the default value (which will leave the SSLEngine’s + * settings unchanged). + */ + def withDefaults: NegotiateNewSession = this + + } + + /** + * Send the given [[akka.util.ByteString]] across the encrypted session to the + * peer. + */ + final case class SendBytes(bytes: ByteString) extends SslTlsOutbound + +} + +/** + * An SSLEngine can either demand, allow or ignore its peer’s authentication + * (via certificates), where `Need` will fail the handshake if the peer does + * not provide valid credentials, `Want` allows the peer to send credentials + * and verifies them if provided, and `None` disables peer certificate + * verification. + * + * See the documentation for `SSLEngine::setWantClientAuth` for more information. + */ +sealed abstract class TLSClientAuth +object TLSClientAuth { + case object None extends TLSClientAuth + case object Want extends TLSClientAuth + case object Need extends TLSClientAuth + + def none: TLSClientAuth = None + def want: TLSClientAuth = Want + def need: TLSClientAuth = Need +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 2d6ee38d94..2d9c2a90ef 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -13,8 +13,8 @@ import akka.pattern.ask import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } -import akka.stream.impl.io.SslTlsCipherActor -import akka.stream.io.SslTls.TlsModule +import akka.stream.impl.io.TLSActor +import akka.stream.impl.io.TlsModule import org.reactivestreams._ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Await, ExecutionContextExecutor } @@ -121,7 +121,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) val props = - SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tls.role, tls.closing, tls.hostInfo) + TLSActor.props(es, tls.sslContext, tls.firstSession, tls.role, tls.closing, tls.hostInfo) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) @@ -129,11 +129,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, val publishers = Vector.tabulate(2)(factory) impl ! FanOut.ExposedPublishers(publishers) - assignPort(tls.plainOut, publishers(SslTlsCipherActor.UserOut)) - assignPort(tls.cipherOut, publishers(SslTlsCipherActor.TransportOut)) + assignPort(tls.plainOut, publishers(TLSActor.UserOut)) + assignPort(tls.cipherOut, publishers(TLSActor.TransportOut)) - assignPort(tls.plainIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn)) - assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn)) + assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn)) + assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn)) matVal.put(atomic, NotUsed) diff --git a/akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala similarity index 95% rename from akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala index d034c3dcbc..ad3f968215 100644 --- a/akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala @@ -1,15 +1,19 @@ /** * Copyright (C) 2015-2016 Typesafe Inc. */ -package akka.stream.io +package akka.stream.impl.io -import scala.util.control.NoStackTrace import akka.stream._ import akka.stream.stage._ import akka.util.ByteString -import scala.annotation.tailrec -abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] { +import scala.annotation.tailrec +import scala.util.control.NoStackTrace + +/** + * INTERNAL API + */ +private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] { import ByteStringParser._ private val bytesIn = Inlet[ByteString]("bytesIn") @@ -67,7 +71,10 @@ abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] } } -object ByteStringParser { +/** + * INTERNAL API + */ +private[akka] object ByteStringParser { /** * @param result - parser can return some element for downstream or return None if no element was generated diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala similarity index 94% rename from akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index e8f567ea05..e746ec4e89 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -9,28 +9,28 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl._ import akka.actor._ -import akka.stream.{ ConnectionException, ActorMaterializerSettings } +import akka.stream._ import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl._ import akka.util.ByteString import com.typesafe.sslconfig.akka.AkkaSSLConfig import scala.annotation.tailrec -import akka.stream.io._ +import akka.stream.TLSProtocol._ /** * INTERNAL API. */ -private[akka] object SslTlsCipherActor { +private[akka] object TLSActor { def props(settings: ActorMaterializerSettings, sslContext: SSLContext, firstSession: NegotiateNewSession, - role: Role, - closing: Closing, + role: TLSRole, + closing: TLSClosing, hostInfo: Option[(String, Int)], tracing: Boolean = false): Props = - Props(new SslTlsCipherActor(settings, sslContext, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local) + Props(new TLSActor(settings, sslContext, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 @@ -42,13 +42,13 @@ private[akka] object SslTlsCipherActor { /** * INTERNAL API. */ -private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, - sslContext: SSLContext, - firstSession: NegotiateNewSession, role: Role, closing: Closing, - hostInfo: Option[(String, Int)], tracing: Boolean) +private[akka] class TLSActor(settings: ActorMaterializerSettings, + sslContext: SSLContext, + firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, + hostInfo: Option[(String, Int)], tracing: Boolean) extends Actor with ActorLogging with Pump { - import SslTlsCipherActor._ + import TLSActor._ protected val outputBunch = new OutputBunch(outputCount = 2, self, this) outputBunch.markAllOutputs() @@ -161,10 +161,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, params.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) params.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) params.clientAuth match { - case Some(ClientAuth.None) ⇒ engine.setNeedClientAuth(false) - case Some(ClientAuth.Want) ⇒ engine.setWantClientAuth(true) - case Some(ClientAuth.Need) ⇒ engine.setNeedClientAuth(true) - case _ ⇒ // do nothing + case Some(TLSClientAuth.None) ⇒ engine.setNeedClientAuth(false) + case Some(TLSClientAuth.Want) ⇒ engine.setWantClientAuth(true) + case Some(TLSClientAuth.Need) ⇒ engine.setNeedClientAuth(true) + case _ ⇒ // do nothing } params.sslParameters foreach (p ⇒ engine.setSSLParameters(p)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala new file mode 100644 index 0000000000..84bcb032ee --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -0,0 +1,45 @@ +package akka.stream.impl.io + +import javax.net.ssl.SSLContext + +import akka.stream._ +import akka.stream.impl.StreamLayout.{ CompositeModule, Module } +import akka.stream.TLSProtocol._ +import akka.util.ByteString + +/** + * INTERNAL API. + */ +private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], + cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], + shape: Shape, attributes: Attributes, + sslContext: SSLContext, + firstSession: NegotiateNewSession, + role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends Module { + override def subModules: Set[Module] = Set.empty + + override def withAttributes(att: Attributes): Module = copy(attributes = att) + override def carbonCopy: Module = + TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) + + override def replaceShape(s: Shape) = + if (s != shape) { + shape.requireSamePortsAs(s) + CompositeModule(this, s) + } else this +} + +/** + * INTERNAL API. + */ +private[akka] object TlsModule { + def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]): TlsModule = { + val name = attributes.nameOrDefault(s"StreamTls($role)") + val cipherIn = Inlet[ByteString](s"$name.cipherIn") + val cipherOut = Outlet[ByteString](s"$name.cipherOut") + val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn") + val plainOut = Outlet[SslTlsInbound](s"$name.transportOut") + val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut) + TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing, hostInfo) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala deleted file mode 100644 index 781ef0942b..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ /dev/null @@ -1,453 +0,0 @@ -/** - * Copyright (C) 2015-2016 Typesafe Inc. - */ -package akka.stream.io - -import java.lang.{ Integer ⇒ jInteger } -import java.security.Principal -import java.util.Optional - -import akka.{ NotUsed, japi } -import akka.stream._ -import akka.stream.impl.StreamLayout.{ Module, CompositeModule } -import akka.util.ByteString -import javax.net.ssl._ - -import scala.annotation.varargs -import scala.collection.immutable -import scala.compat.java8.OptionConverters - -/** - * Stream cipher support based upon JSSE. - * - * The underlying SSLEngine has four ports: plaintext input/output and - * ciphertext input/output. These are modeled as a [[akka.stream.BidiShape]] - * element for use in stream topologies, where the plaintext ports are on the - * left hand side of the shape and the ciphertext ports on the right hand side. - * - * Configuring JSSE is a rather complex topic, please refer to the JDK platform - * documentation or the excellent user guide that is part of the Play Framework - * documentation. The philosophy of this integration into Akka Streams is to - * expose all knobs and dials to client code and therefore not limit the - * configuration possibilities. In particular the client code will have to - * provide the SSLContext from which the SSLEngine is then created. Handshake - * parameters are set using [[NegotiateNewSession]] messages, the settings for - * the initial handshake need to be provided up front using the same class; - * please refer to the method documentation below. - * - * '''IMPORTANT NOTE''' - * - * The TLS specification does not permit half-closing of the user data session - * that it transports—to be precise a half-close will always promptly lead to a - * full close. This means that canceling the plaintext output or completing the - * plaintext input of the SslTls stage will lead to full termination of the - * secure connection without regard to whether bytes are remaining to be sent or - * received, respectively. Especially for a client the common idiom of attaching - * a finite Source to the plaintext input and transforming the plaintext response - * bytes coming out will not work out of the box due to early termination of the - * connection. For this reason there is a parameter that determines whether the - * SslTls stage shall ignore completion and/or cancellation events, and the - * default is to ignore completion (in view of the client–server scenario). In - * order to terminate the connection the client will then need to cancel the - * plaintext output as soon as all expected bytes have been received. When - * ignoring both types of events the stage will shut down once both events have - * been received. See also [[Closing]]. - */ -object SslTls { - - type ScalaFlow = scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] - type JavaFlow = javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] - - /** - * Scala API: create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * For a description of the `closing` parameter please refer to [[Closing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[SSLParameters.setEndpointIdentificationAlgorithm]]. - */ - def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, - closing: Closing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): ScalaFlow = - new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, firstSession, role, closing, hostInfo)) - - /** - * Java API: create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * This method uses the default closing behavior or [[IgnoreComplete]]. - */ - def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role): JavaFlow = - new javadsl.BidiFlow(apply(sslContext, firstSession, role)) - - /** - * Java API: create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * For a description of the `closing` parameter please refer to [[Closing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[SSLParameters.setEndpointIdentificationAlgorithm]]. - */ - def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, hostInfo: Optional[japi.Pair[String, jInteger]], closing: Closing): JavaFlow = - new javadsl.BidiFlow(apply(sslContext, firstSession, role, closing, OptionConverters.toScala(hostInfo).map(e ⇒ (e.first, e.second)))) - - /** - * INTERNAL API. - */ - private[akka] case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], - cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], - shape: Shape, attributes: Attributes, - sslContext: SSLContext, - firstSession: NegotiateNewSession, - role: Role, closing: Closing, hostInfo: Option[(String, Int)]) extends Module { - override def subModules: Set[Module] = Set.empty - - override def withAttributes(att: Attributes): Module = copy(attributes = att) - override def carbonCopy: Module = - TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) - - override def replaceShape(s: Shape) = - if (s != shape) { - shape.requireSamePortsAs(s) - CompositeModule(this, s) - } else this - } - - /** - * INTERNAL API. - */ - private[akka] object TlsModule { - def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing, hostInfo: Option[(String, Int)]): TlsModule = { - val name = attributes.nameOrDefault(s"StreamTls($role)") - val cipherIn = Inlet[ByteString](s"$name.cipherIn") - val cipherOut = Outlet[ByteString](s"$name.cipherOut") - val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn") - val plainOut = Outlet[SslTlsInbound](s"$name.transportOut") - val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut) - TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing, hostInfo) - } - } -} - -/** - * This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can - * be used instead of [[SslTls]] when no encryption is desired. The flows will - * just adapt the message protocol by wrapping into [[SessionBytes]] and - * unwrapping [[SendBytes]]. - */ -object SslTlsPlacebo { - // this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL - private[akka] val dummySession = SSLContext.getDefault.createSSLEngine.getSession - - val forScala: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = - scaladsl.BidiFlow.fromGraph(scaladsl.GraphDSL.create() { implicit b ⇒ - val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) ⇒ bytes }) - val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(dummySession, _))) - BidiShape.fromFlows(top, bottom) - }) - val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = - new javadsl.BidiFlow(forScala) -} - -import java.security.Principal -import java.security.cert.Certificate -import javax.net.ssl.{ SSLPeerUnverifiedException, SSLSession } - -/** Allows access to an SSLSession with Scala types */ -trait ScalaSessionAPI { - def session: SSLSession - - /** - * Scala API: Extract the certificates that were actually used by this - * engine during this session’s negotiation. The list is empty if no - * certificates were used. - */ - def localCertificates: List[Certificate] = Option(session.getLocalCertificates).map(_.toList).getOrElse(Nil) - /** - * Scala API: Extract the Principal that was actually used by this engine - * during this session’s negotiation. - */ - def localPrincipal: Option[Principal] = Option(session.getLocalPrincipal) - /** - * Scala API: Extract the certificates that were used by the peer engine - * during this session’s negotiation. The list is empty if no certificates - * were used. - */ - def peerCertificates: List[Certificate] = - try Option(session.getPeerCertificates).map(_.toList).getOrElse(Nil) - catch { case e: SSLPeerUnverifiedException ⇒ Nil } - /** - * Scala API: Extract the Principal that the peer engine presented during - * this session’s negotiation. - */ - def peerPrincipal: Option[Principal] = - try Option(session.getPeerPrincipal) - catch { case e: SSLPeerUnverifiedException ⇒ None } -} - -object ScalaSessionAPI { - /** Constructs a ScalaSessionAPI instance from an SSLSession */ - def apply(_session: SSLSession): ScalaSessionAPI = - new ScalaSessionAPI { - def session: SSLSession = _session - } -} - -/** - * Many protocols are asymmetric and distinguish between the client and the - * server, where the latter listens passively for messages and the former - * actively initiates the exchange. - */ -object Role { - /** - * Java API: obtain the [[Client]] singleton value. - */ - def client: Role = Client - /** - * Java API: obtain the [[Server]] singleton value. - */ - def server: Role = Server -} -sealed abstract class Role - -/** - * The client is usually the side that consumes the service provided by its - * interlocutor. The precise interpretation of this role is protocol specific. - */ -sealed abstract class Client extends Role -case object Client extends Client - -/** - * The server is usually the side the provides the service to its interlocutor. - * The precise interpretation of this role is protocol specific. - */ -sealed abstract class Server extends Role -case object Server extends Server - -/** - * All streams in Akka are unidirectional: while in a complex flow graph data - * may flow in multiple directions these individual flows are independent from - * each other. The difference between two half-duplex connections in opposite - * directions and a full-duplex connection is that the underlying transport - * is shared in the latter and tearing it down will end the data transfer in - * both directions. - * - * When integrating a full-duplex transport medium that does not support - * half-closing (which means ending one direction of data transfer without - * ending the other) into a stream topology, there can be unexpected effects. - * Feeding a finite Source into this medium will close the connection after - * all elements have been sent, which means that possible replies may not - * be received in full. To support this type of usage, the sending and - * receiving of data on the same side (e.g. on the [[Client]]) need to be - * coordinated such that it is known when all replies have been received. - * Only then should the transport be shut down. - * - * To support these scenarios it is recommended that the full-duplex - * transport integration is configurable in terms of termination handling, - * which means that the user can optionally suppress the normal (closing) - * reaction to completion or cancellation events, as is expressed by the - * possible values of this type: - * - * - [[EagerClose]] means to not ignore signals - * - [[IgnoreCancel]] means to not react to cancellation of the receiving - * side unless the sending side has already completed - * - [[IgnoreComplete]] means to not react to the completion of the sending - * side unless the receiving side has already canceled - * - [[IgnoreBoth]] means to ignore the first termination signal—be that - * cancellation or completion—and only act upon the second one - */ -sealed abstract class Closing { - def ignoreCancel: Boolean - def ignoreComplete: Boolean -} -object Closing { - /** - * Java API: obtain the [[EagerClose]] singleton value. - */ - def eagerClose: Closing = EagerClose - /** - * Java API: obtain the [[IgnoreCancel]] singleton value. - */ - def ignoreCancel: Closing = IgnoreCancel - /** - * Java API: obtain the [[IgnoreComplete]] singleton value. - */ - def ignoreComplete: Closing = IgnoreComplete - /** - * Java API: obtain the [[IgnoreBoth]] singleton value. - */ - def ignoreBoth: Closing = IgnoreBoth -} - -/** - * see [[Closing]] - */ -sealed abstract class EagerClose extends Closing { - override def ignoreCancel = false - override def ignoreComplete = false -} -case object EagerClose extends EagerClose - -/** - * see [[Closing]] - */ -sealed abstract class IgnoreCancel extends Closing { - override def ignoreCancel = true - override def ignoreComplete = false -} -case object IgnoreCancel extends IgnoreCancel - -/** - * see [[Closing]] - */ -sealed abstract class IgnoreComplete extends Closing { - override def ignoreCancel = false - override def ignoreComplete = true -} -case object IgnoreComplete extends IgnoreComplete - -/** - * see [[Closing]] - */ -sealed abstract class IgnoreBoth extends Closing { - override def ignoreCancel = true - override def ignoreComplete = true -} -case object IgnoreBoth extends IgnoreBoth - -/** - * This is the supertype of all messages that the SslTls stage emits on the - * plaintext side. - */ -sealed trait SslTlsInbound - -/** - * If the underlying transport is closed before the final TLS closure command - * is received from the peer then the SSLEngine will throw an SSLException that - * warns about possible truncation attacks. This exception is caught and - * translated into this message when encountered. Most of the time this occurs - * not because of a malicious attacker but due to a connection abort or a - * misbehaving communication peer. - */ -sealed abstract class SessionTruncated extends SslTlsInbound -case object SessionTruncated extends SessionTruncated - -/** - * Plaintext bytes emitted by the SSLEngine are received over one specific - * encryption session and this class bundles the bytes with the SSLSession - * object. When the session changes due to renegotiation (which can be - * initiated by either party) the new session value will not compare equal to - * the previous one. - * - * The Java API for getting session information is given by the SSLSession object, - * the Scala API adapters are offered below. - */ -case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound with ScalaSessionAPI - -/** - * This is the supertype of all messages that the SslTls stage accepts on its - * plaintext side. - */ -sealed trait SslTlsOutbound - -/** - * Initiate a new session negotiation. Any [[SendBytes]] commands following - * this one will be held back (i.e. back-pressured) until the new handshake is - * completed, meaning that the bytes following this message will be encrypted - * according to the requirements outlined here. - * - * Each of the values in this message is optional and will have the following - * effect if provided: - * - * - `enabledCipherSuites` will be passed to `SSLEngine::setEnabledCipherSuites()` - * - `enabledProtocols` will be passed to `SSLEngine::setEnabledProtocols()` - * - `clientAuth` will be passed to `SSLEngine::setWantClientAuth()` or `SSLEngine.setNeedClientAuth()`, respectively - * - `sslParameters` will be passed to `SSLEngine::setSSLParameters()` - * - * Please note that passing `clientAuth = None` means that no change is done - * on client authentication requirements while `clientAuth = Some(ClientAuth.None)` - * switches off client authentication. - */ -case class NegotiateNewSession( - enabledCipherSuites: Option[immutable.Seq[String]], - enabledProtocols: Option[immutable.Seq[String]], - clientAuth: Option[ClientAuth], - sslParameters: Option[SSLParameters]) extends SslTlsOutbound { - - /** - * Java API: Make a copy of this message with the given `enabledCipherSuites`. - */ - @varargs - def withCipherSuites(s: String*) = copy(enabledCipherSuites = Some(s.toList)) - - /** - * Java API: Make a copy of this message with the given `enabledProtocols`. - */ - @varargs - def withProtocols(p: String*) = copy(enabledProtocols = Some(p.toList)) - - /** - * Java API: Make a copy of this message with the given [[ClientAuth]] setting. - */ - def withClientAuth(ca: ClientAuth) = copy(clientAuth = Some(ca)) - - /** - * Java API: Make a copy of this message with the given [[SSLParameters]]. - */ - def withParameters(p: SSLParameters) = copy(sslParameters = Some(p)) -} - -object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) { - /** - * Java API: obtain the default value (which will leave the SSLEngine’s - * settings unchanged). - */ - def withDefaults = this - -} - -/** - * Send the given [[akka.util.ByteString]] across the encrypted session to the - * peer. - */ -case class SendBytes(bytes: ByteString) extends SslTlsOutbound - -/** - * An SSLEngine can either demand, allow or ignore its peer’s authentication - * (via certificates), where `Need` will fail the handshake if the peer does - * not provide valid credentials, `Want` allows the peer to send credentials - * and verifies them if provided, and `None` disables peer certificate - * verification. - * - * See the documentation for `SSLEngine::setWantClientAuth` for more information. - */ -sealed abstract class ClientAuth -object ClientAuth { - case object None extends ClientAuth - case object Want extends ClientAuth - case object Need extends ClientAuth - - def none: ClientAuth = None - def want: ClientAuth = Want - def need: ClientAuth = Need -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala new file mode 100644 index 0000000000..a0d930cd3d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala @@ -0,0 +1,101 @@ +package akka.stream.javadsl + +import java.util.Optional +import javax.net.ssl.{ SSLContext, SSLParameters } + +import akka.{ japi, NotUsed } +import akka.stream._ +import akka.stream.TLSProtocol._ +import akka.util.ByteString + +import scala.compat.java8.OptionConverters + +/** + * Stream cipher support based upon JSSE. + * + * The underlying SSLEngine has four ports: plaintext input/output and + * ciphertext input/output. These are modeled as a [[akka.stream.BidiShape]] + * element for use in stream topologies, where the plaintext ports are on the + * left hand side of the shape and the ciphertext ports on the right hand side. + * + * Configuring JSSE is a rather complex topic, please refer to the JDK platform + * documentation or the excellent user guide that is part of the Play Framework + * documentation. The philosophy of this integration into Akka Streams is to + * expose all knobs and dials to client code and therefore not limit the + * configuration possibilities. In particular the client code will have to + * provide the SSLContext from which the SSLEngine is then created. Handshake + * parameters are set using [[NegotiateNewSession]] messages, the settings for + * the initial handshake need to be provided up front using the same class; + * please refer to the method documentation below. + * + * '''IMPORTANT NOTE''' + * + * The TLS specification does not permit half-closing of the user data session + * that it transports—to be precise a half-close will always promptly lead to a + * full close. This means that canceling the plaintext output or completing the + * plaintext input of the SslTls stage will lead to full termination of the + * secure connection without regard to whether bytes are remaining to be sent or + * received, respectively. Especially for a client the common idiom of attaching + * a finite Source to the plaintext input and transforming the plaintext response + * bytes coming out will not work out of the box due to early termination of the + * connection. For this reason there is a parameter that determines whether the + * SslTls stage shall ignore completion and/or cancellation events, and the + * default is to ignore completion (in view of the client–server scenario). In + * order to terminate the connection the client will then need to cancel the + * plaintext output as soon as all expected bytes have been received. When + * ignoring both types of events the stage will shut down once both events have + * been received. See also [[TLSClosing]]. + */ +object TLS { + + /** + * Create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The + * SSLContext will be used to create an SSLEngine to which then the + * `firstSession` parameters are applied before initiating the first + * handshake. The `role` parameter determines the SSLEngine’s role; this is + * often the same as the underlying transport’s server or client role, but + * that is not a requirement and depends entirely on the application + * protocol. + * + * This method uses the default closing behavior or [[IgnoreComplete]]. + */ + def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, firstSession, role)) + + /** + * Create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The + * SSLContext will be used to create an SSLEngine to which then the + * `firstSession` parameters are applied before initiating the first + * handshake. The `role` parameter determines the SSLEngine’s role; this is + * often the same as the underlying transport’s server or client role, but + * that is not a requirement and depends entirely on the application + * protocol. + * + * For a description of the `closing` parameter please refer to [[TLSClosing]]. + * + * The `hostInfo` parameter allows to optionally specify a pair of hostname and port + * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. + * The SSLEngine may use this information e.g. when an endpoint identification algorithm was + * configured using [[SSLParameters.setEndpointIdentificationAlgorithm]]. + */ + def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, hostInfo: Optional[japi.Pair[String, java.lang.Integer]], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, firstSession, role, closing, OptionConverters.toScala(hostInfo).map(e ⇒ (e.first, e.second)))) + +} + +/** + * This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can + * be used instead of [[TLS]] when no encryption is desired. The flows will + * just adapt the message protocol by wrapping into [[SessionBytes]] and + * unwrapping [[SendBytes]]. + */ +object TLSPlacebo { + + /** + * Returns a reusable [[BidiFlow]] instance representing a [[TLSPlacebo$]]. + */ + def getInstance(): javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = forJava + + private val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = + new javadsl.BidiFlow(scaladsl.TLSPlacebo()) +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala new file mode 100644 index 0000000000..ad97abd18b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala @@ -0,0 +1,137 @@ +package akka.stream.scaladsl + +import javax.net.ssl.{ SSLContext, SSLParameters } + +import akka.stream.impl.io.TlsModule +import akka.NotUsed +import akka.stream._ +import akka.stream.TLSProtocol._ +import akka.util.ByteString + +/** + * Stream cipher support based upon JSSE. + * + * The underlying SSLEngine has four ports: plaintext input/output and + * ciphertext input/output. These are modeled as a [[akka.stream.BidiShape]] + * element for use in stream topologies, where the plaintext ports are on the + * left hand side of the shape and the ciphertext ports on the right hand side. + * + * Configuring JSSE is a rather complex topic, please refer to the JDK platform + * documentation or the excellent user guide that is part of the Play Framework + * documentation. The philosophy of this integration into Akka Streams is to + * expose all knobs and dials to client code and therefore not limit the + * configuration possibilities. In particular the client code will have to + * provide the SSLContext from which the SSLEngine is then created. Handshake + * parameters are set using [[NegotiateNewSession]] messages, the settings for + * the initial handshake need to be provided up front using the same class; + * please refer to the method documentation below. + * + * '''IMPORTANT NOTE''' + * + * The TLS specification does not permit half-closing of the user data session + * that it transports—to be precise a half-close will always promptly lead to a + * full close. This means that canceling the plaintext output or completing the + * plaintext input of the SslTls stage will lead to full termination of the + * secure connection without regard to whether bytes are remaining to be sent or + * received, respectively. Especially for a client the common idiom of attaching + * a finite Source to the plaintext input and transforming the plaintext response + * bytes coming out will not work out of the box due to early termination of the + * connection. For this reason there is a parameter that determines whether the + * SslTls stage shall ignore completion and/or cancellation events, and the + * default is to ignore completion (in view of the client–server scenario). In + * order to terminate the connection the client will then need to cancel the + * plaintext output as soon as all expected bytes have been received. When + * ignoring both types of events the stage will shut down once both events have + * been received. See also [[TLSClosing]]. + */ +object TLS { + + /** + * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The + * SSLContext will be used to create an SSLEngine to which then the + * `firstSession` parameters are applied before initiating the first + * handshake. The `role` parameter determines the SSLEngine’s role; this is + * often the same as the underlying transport’s server or client role, but + * that is not a requirement and depends entirely on the application + * protocol. + * + * For a description of the `closing` parameter please refer to [[TLSClosing]]. + * + * The `hostInfo` parameter allows to optionally specify a pair of hostname and port + * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. + * The SSLEngine may use this information e.g. when an endpoint identification algorithm was + * configured using [[SSLParameters.setEndpointIdentificationAlgorithm]]. + */ + def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, + closing: TLSClosing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, firstSession, role, closing, hostInfo)) + +} + +/** + * This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can + * be used instead of [[TLS]] when no encryption is desired. The flows will + * just adapt the message protocol by wrapping into [[SessionBytes]] and + * unwrapping [[SendBytes]]. + */ +object TLSPlacebo { + // this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL + private[akka] val dummySession = SSLContext.getDefault.createSSLEngine.getSession + + def apply(): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = instance + + private val instance: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, NotUsed] = + scaladsl.BidiFlow.fromGraph(scaladsl.GraphDSL.create() { implicit b ⇒ + val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) ⇒ bytes }) + val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(dummySession, _))) + BidiShape.fromFlows(top, bottom) + }) +} + +import java.security.Principal +import java.security.cert.Certificate +import javax.net.ssl.{ SSLPeerUnverifiedException, SSLSession } + +/** Allows access to an SSLSession with Scala types */ +trait ScalaSessionAPI { + + /** + * The underlying [[javax.net.ssl.SSLSession]]. + */ + def session: SSLSession + + /** + * Scala API: Extract the certificates that were actually used by this + * engine during this session’s negotiation. The list is empty if no + * certificates were used. + */ + def localCertificates: List[Certificate] = Option(session.getLocalCertificates).map(_.toList).getOrElse(Nil) + /** + * Scala API: Extract the Principal that was actually used by this engine + * during this session’s negotiation. + */ + def localPrincipal: Option[Principal] = Option(session.getLocalPrincipal) + /** + * Scala API: Extract the certificates that were used by the peer engine + * during this session’s negotiation. The list is empty if no certificates + * were used. + */ + def peerCertificates: List[Certificate] = + try Option(session.getPeerCertificates).map(_.toList).getOrElse(Nil) + catch { case e: SSLPeerUnverifiedException ⇒ Nil } + /** + * Scala API: Extract the Principal that the peer engine presented during + * this session’s negotiation. + */ + def peerPrincipal: Option[Principal] = + try Option(session.getPeerPrincipal) + catch { case e: SSLPeerUnverifiedException ⇒ None } +} + +object ScalaSessionAPI { + /** Constructs a ScalaSessionAPI instance from an SSLSession */ + def apply(_session: SSLSession): ScalaSessionAPI = + new ScalaSessionAPI { + def session: SSLSession = _session + } +} \ No newline at end of file