=str #22430 add TlsModulePhase (#22466)

This commit is contained in:
Johannes Rudolph 2017-03-07 12:39:15 +01:00 committed by drewhk
parent c7c109db4a
commit 3325ee23e2
2 changed files with 42 additions and 6 deletions

View file

@ -14,13 +14,13 @@ import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary }
import akka.stream.impl.fusing.GraphInterpreter.Connection
import akka.stream.impl.fusing._
import akka.stream.impl.io.{ TLSActor, TlsModule }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.collection.immutable.Map
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.util.Random
import scala.concurrent.ExecutionContextExecutor
object PhasedFusingActorMaterializer {
@ -48,6 +48,10 @@ object PhasedFusingActorMaterializer {
islandName: String): PhaseIsland[Any] =
new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
},
TlsModuleIslandTag new Phase[Any] {
def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] =
new TlsModulePhase(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]]
},
GraphStageTag DefaultPhase)
def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = {
@ -740,3 +744,36 @@ final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, is
override def onIslandReady(): Unit = ()
}
object TlsModuleIslandTag extends IslandTag
final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] {
def name: String = "TlsModulePhase"
var tlsActor: ActorRef = _
var publishers: Vector[ActorPublisher[Any]] = _
def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = {
println(mod)
val tls = mod.asInstanceOf[TlsModule]
val props = TLSActor.props(settings, tls.createSSLEngine, tls.verifySession, tls.closing)
tlsActor = materializer.actorOf(props, islandName)
def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
}
publishers = Vector.tabulate(2)(factory)
tlsActor ! FanOut.ExposedPublishers(publishers)
(NotUsed, NotUsed)
}
def assignPort(in: InPort, slot: Int, logic: NotUsed): Unit = ()
def assignPort(out: OutPort, slot: Int, logic: NotUsed): Unit = ()
def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] =
publishers(out.id)
def takePublisher(slot: Int, publisher: Publisher[Any]): Unit =
publisher.subscribe(FanIn.SubInput[Any](tlsActor, slot))
def onIslandReady(): Unit = ()
}