diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index 4b497a7af5..abfe8cce48 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } import akka.http.HostConnectionPoolSetup -import akka.actor.{ Props, ActorSystem, ActorRef } +import akka.actor.{ Deploy, Props, ActorSystem, ActorRef } import akka.http.scaladsl.Http import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } import akka.stream.FlowMaterializer @@ -40,7 +40,7 @@ private[http] class PoolGateway(hcps: HostConnectionPoolSetup, private val state = { val shutdownCompletedPromise = Promise[Unit]() - val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)) + val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local) val ref = system.actorOf(props, PoolInterfaceActor.name.next()) new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise)) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 72238cb30b..5dde210de8 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -8,7 +8,7 @@ import java.net.InetSocketAddress import scala.annotation.tailrec import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration -import akka.actor.{ PoisonPill, DeadLetterSuppression, ActorLogging, Cancellable } +import akka.actor._ import akka.stream.FlowMaterializer import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } import akka.stream.actor.ActorPublisherMessage._ @@ -22,7 +22,7 @@ import akka.http.scaladsl.Http import PoolFlow._ private object PoolInterfaceActor { - final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse]) + final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse]) extends NoSerializationVerificationNeeded case object Shutdown extends DeadLetterSuppression diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 0f8fecd77a..002284510c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -56,7 +56,8 @@ private object PoolSlot { val slotProcessor = b.add { Flow[RequestContext] andThenMat { () ⇒ - val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)), slotProcessorActorName.next()) + val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local), + slotProcessorActorName.next()) (ActorProcessor[RequestContext, List[ProcessorOut]](actor), ()) } } @@ -85,9 +86,9 @@ private object PoolSlot { var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _ var inflightRequests = immutable.Queue.empty[RequestContext] - val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self))) + val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local)) .via(connectionFlow) - .toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self))))(Keep.both) + .toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both) def requestStrategy = ZeroRequestStrategy def receive = waitingExposedPublisher @@ -187,7 +188,7 @@ private object PoolSlot { def shutdown(): Unit = context.stop(self) } - private case class FromConnection(ev: Any) + private case class FromConnection(ev: Any) extends NoSerializationVerificationNeeded private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] { def receive: Receive = { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 1d30b8cf57..8f39a991e8 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -10,7 +10,7 @@ import org.reactivestreams.{ Subscriber, Publisher } import scala.util.control.NonFatal import akka.util.ByteString import akka.event.LoggingAdapter -import akka.actor.{ ActorRef, Props } +import akka.actor.{ Deploy, ActorRef, Props } import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage.PushPullStage @@ -52,7 +52,7 @@ private[http] object HttpServerBluePrint { val actor = new TokenSourceActor(OneHundredContinue) oneHundredContinueRef = Some(actor.context.self) actor - } + }.withDeploy(Deploy.local) }, errorMsg = "Http.serverLayer is currently not reusable. You need to create a new instance for each materialization.") val requestParsingFlow = Flow[ByteString].transform(() ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 6bb5d4ac6a..36e46def7a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -80,7 +80,7 @@ package object util { private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = { synchronized { if (eventStreamLogger == null) - eventStreamLogger = system.actorOf(Props[util.EventStreamLogger], name = "event-stream-logger") + eventStreamLogger = system.actorOf(Props[util.EventStreamLogger].withDeploy(Deploy.local), name = "event-stream-logger") } system.eventStream.subscribe(eventStreamLogger, channel) } diff --git a/akka-http-core/src/test/resources/reference.conf b/akka-http-core/src/test/resources/reference.conf new file mode 100644 index 0000000000..ab48718a51 --- /dev/null +++ b/akka-http-core/src/test/resources/reference.conf @@ -0,0 +1,6 @@ +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} \ No newline at end of file diff --git a/akka-http-tests-java8/src/test/resources/reference.conf b/akka-http-tests-java8/src/test/resources/reference.conf new file mode 100644 index 0000000000..ab48718a51 --- /dev/null +++ b/akka-http-tests-java8/src/test/resources/reference.conf @@ -0,0 +1,6 @@ +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} \ No newline at end of file diff --git a/akka-http-tests/src/test/resources/reference.conf b/akka-http-tests/src/test/resources/reference.conf new file mode 100644 index 0000000000..ab48718a51 --- /dev/null +++ b/akka-http-tests/src/test/resources/reference.conf @@ -0,0 +1,6 @@ +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index f033d14e02..8dbce9c9d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -248,7 +248,7 @@ private[akka] class FlowNameCounter extends Extension { private[akka] object StreamSupervisor { def props(settings: ActorFlowMaterializerSettings): Props = Props(new StreamSupervisor(settings)).withDeploy(Deploy.local) - final case class Materialize(props: Props, name: String) extends DeadLetterSuppression + final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded /** Testing purpose */ final case object GetChildren diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index 827415113b..ce5ed913d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -26,6 +26,8 @@ private[akka] object FuturePublisher { final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded } + case class FutureValue(value: Any) extends NoSerializationVerificationNeeded + class FutureSubscription(ref: ActorRef) extends Subscription { import akka.stream.impl.FuturePublisher.FutureSubscription._ def cancel(): Unit = ref ! Cancel(this) @@ -39,7 +41,7 @@ private[akka] object FuturePublisher { */ // FIXME why do we need to have an actor to drive a Future? private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor { - import akka.stream.impl.FuturePublisher.FutureSubscription + import akka.stream.impl.FuturePublisher._ import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore import ReactiveStreamsCompliance._ @@ -64,7 +66,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber import context.dispatcher - future.pipeTo(self) + future.map(FutureValue) pipeTo (self) context.become(active) } @@ -89,7 +91,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate futureValue = Some(Failure(ex)) pushToAll() } - case value ⇒ + case FutureValue(value) ⇒ if (futureValue.isEmpty) { futureValue = Some(Success(value)) pushToAll() diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala index 899d7e8b7c..9f16781efa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl -import akka.actor.Props +import akka.actor.{ Deploy, Props } import akka.stream.impl.SplitDecision.SplitDecision import akka.stream.scaladsl.Source import akka.stream.{ ActorFlowMaterializerSettings, Supervision } @@ -35,7 +35,7 @@ private[akka] object SplitDecision { */ private[akka] object SplitWhereProcessorImpl { def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any ⇒ SplitDecision): Props = - Props(new SplitWhereProcessorImpl(settings, in ⇒ splitPredicate(in))) + Props(new SplitWhereProcessorImpl(settings, in ⇒ splitPredicate(in))).withDeploy(Deploy.local) } /**