diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index e006183042..158c1d3077 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -14,13 +14,13 @@ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary } import akka.stream.impl.fusing.GraphInterpreter.Connection import akka.stream.impl.fusing._ +import akka.stream.impl.io.{ TLSActor, TlsModule } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.collection.immutable.Map import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ ExecutionContextExecutor, Future } -import scala.util.Random +import scala.concurrent.ExecutionContextExecutor object PhasedFusingActorMaterializer { @@ -48,6 +48,10 @@ object PhasedFusingActorMaterializer { islandName: String): PhaseIsland[Any] = new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, + TlsModuleIslandTag → new Phase[Any] { + def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = + new TlsModulePhase(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]] + }, GraphStageTag → DefaultPhase) def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = { @@ -740,3 +744,36 @@ final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, is override def onIslandReady(): Unit = () } + +object TlsModuleIslandTag extends IslandTag + +final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { + def name: String = "TlsModulePhase" + + var tlsActor: ActorRef = _ + var publishers: Vector[ActorPublisher[Any]] = _ + + def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { + println(mod) + val tls = mod.asInstanceOf[TlsModule] + + val props = TLSActor.props(settings, tls.createSSLEngine, tls.verifySession, tls.closing) + tlsActor = materializer.actorOf(props, islandName) + def factory(id: Int) = new ActorPublisher[Any](tlsActor) { + override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) + } + publishers = Vector.tabulate(2)(factory) + tlsActor ! FanOut.ExposedPublishers(publishers) + (NotUsed, NotUsed) + } + def assignPort(in: InPort, slot: Int, logic: NotUsed): Unit = () + def assignPort(out: OutPort, slot: Int, logic: NotUsed): Unit = () + + def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] = + publishers(out.id) + + def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = + publisher.subscribe(FanIn.SubInput[Any](tlsActor, slot)) + + def onIslandReady(): Unit = () +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index a0362d1d03..33e4cef06a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -1,15 +1,14 @@ package akka.stream.impl.io -import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession } +import javax.net.ssl.{ SSLEngine, SSLSession } import akka.NotUsed import akka.actor.ActorSystem import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.TLSProtocol._ -import akka.stream.impl.TraversalBuilder +import akka.stream.impl.{ TlsModuleIslandTag, TraversalBuilder } import akka.util.ByteString -import com.typesafe.sslconfig.akka.AkkaSSLConfig import scala.util.Try @@ -29,7 +28,7 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain override def toString: String = f"TlsModule($closing) [${System.identityHashCode(this)}%08x]" - override private[stream] def traversalBuilder = TraversalBuilder.atomic(this) + override private[stream] def traversalBuilder = TraversalBuilder.atomic(this).makeIsland(TlsModuleIslandTag) } /**