making http pass with serialization checks

This commit is contained in:
Endre Sándor Varga 2015-06-02 12:31:01 +02:00
parent f4c83771bb
commit 1dde8b3a3b
11 changed files with 38 additions and 17 deletions

View file

@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.{ Future, Promise }
import akka.http.HostConnectionPoolSetup
import akka.actor.{ Props, ActorSystem, ActorRef }
import akka.actor.{ Deploy, Props, ActorSystem, ActorRef }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpResponse, HttpRequest }
import akka.stream.FlowMaterializer
@ -40,7 +40,7 @@ private[http] class PoolGateway(hcps: HostConnectionPoolSetup,
private val state = {
val shutdownCompletedPromise = Promise[Unit]()
val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this))
val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local)
val ref = system.actorOf(props, PoolInterfaceActor.name.next())
new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise))
}

View file

@ -8,7 +8,7 @@ import java.net.InetSocketAddress
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import akka.actor.{ PoisonPill, DeadLetterSuppression, ActorLogging, Cancellable }
import akka.actor._
import akka.stream.FlowMaterializer
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy }
import akka.stream.actor.ActorPublisherMessage._
@ -22,7 +22,7 @@ import akka.http.scaladsl.Http
import PoolFlow._
private object PoolInterfaceActor {
final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse])
final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse]) extends NoSerializationVerificationNeeded
case object Shutdown extends DeadLetterSuppression

View file

@ -56,7 +56,8 @@ private object PoolSlot {
val slotProcessor = b.add {
Flow[RequestContext] andThenMat { ()
val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)), slotProcessorActorName.next())
val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local),
slotProcessorActorName.next())
(ActorProcessor[RequestContext, List[ProcessorOut]](actor), ())
}
}
@ -85,9 +86,9 @@ private object PoolSlot {
var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _
var inflightRequests = immutable.Queue.empty[RequestContext]
val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)))
val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local))
.via(connectionFlow)
.toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self))))(Keep.both)
.toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both)
def requestStrategy = ZeroRequestStrategy
def receive = waitingExposedPublisher
@ -187,7 +188,7 @@ private object PoolSlot {
def shutdown(): Unit = context.stop(self)
}
private case class FromConnection(ev: Any)
private case class FromConnection(ev: Any) extends NoSerializationVerificationNeeded
private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] {
def receive: Receive = {

View file

@ -10,7 +10,7 @@ import org.reactivestreams.{ Subscriber, Publisher }
import scala.util.control.NonFatal
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.actor.{ ActorRef, Props }
import akka.actor.{ Deploy, ActorRef, Props }
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.PushPullStage
@ -52,7 +52,7 @@ private[http] object HttpServerBluePrint {
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)
actor
}
}.withDeploy(Deploy.local)
}, errorMsg = "Http.serverLayer is currently not reusable. You need to create a new instance for each materialization.")
val requestParsingFlow = Flow[ByteString].transform(()

View file

@ -80,7 +80,7 @@ package object util {
private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = {
synchronized {
if (eventStreamLogger == null)
eventStreamLogger = system.actorOf(Props[util.EventStreamLogger], name = "event-stream-logger")
eventStreamLogger = system.actorOf(Props[util.EventStreamLogger].withDeploy(Deploy.local), name = "event-stream-logger")
}
system.eventStream.subscribe(eventStreamLogger, channel)
}

View file

@ -0,0 +1,6 @@
akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -0,0 +1,6 @@
akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -0,0 +1,6 @@
akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -248,7 +248,7 @@ private[akka] class FlowNameCounter extends Extension {
private[akka] object StreamSupervisor {
def props(settings: ActorFlowMaterializerSettings): Props = Props(new StreamSupervisor(settings)).withDeploy(Deploy.local)
final case class Materialize(props: Props, name: String) extends DeadLetterSuppression
final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded
/** Testing purpose */
final case object GetChildren

View file

@ -26,6 +26,8 @@ private[akka] object FuturePublisher {
final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded
}
case class FutureValue(value: Any) extends NoSerializationVerificationNeeded
class FutureSubscription(ref: ActorRef) extends Subscription {
import akka.stream.impl.FuturePublisher.FutureSubscription._
def cancel(): Unit = ref ! Cancel(this)
@ -39,7 +41,7 @@ private[akka] object FuturePublisher {
*/
// FIXME why do we need to have an actor to drive a Future?
private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor {
import akka.stream.impl.FuturePublisher.FutureSubscription
import akka.stream.impl.FuturePublisher._
import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel
import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore
import ReactiveStreamsCompliance._
@ -64,7 +66,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
import context.dispatcher
future.pipeTo(self)
future.map(FutureValue) pipeTo (self)
context.become(active)
}
@ -89,7 +91,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
futureValue = Some(Failure(ex))
pushToAll()
}
case value
case FutureValue(value)
if (futureValue.isEmpty) {
futureValue = Some(Success(value))
pushToAll()

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl
import akka.actor.Props
import akka.actor.{ Deploy, Props }
import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.scaladsl.Source
import akka.stream.{ ActorFlowMaterializerSettings, Supervision }
@ -35,7 +35,7 @@ private[akka] object SplitDecision {
*/
private[akka] object SplitWhereProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any SplitDecision): Props =
Props(new SplitWhereProcessorImpl(settings, in splitPredicate(in)))
Props(new SplitWhereProcessorImpl(settings, in splitPredicate(in))).withDeploy(Deploy.local)
}
/**