From f4c83771bb4724da919cc19e0046a3a8f8351da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 29 May 2015 16:43:02 +0200 Subject: [PATCH] !str #17393: Make stream-tests pass with serialize-messages=on --- .../akka/stream/testkit/StreamTestKit.scala | 7 +++--- .../akka/stream/javadsl/FlowGraphTest.java | 8 +++---- .../src/test/resources/reference.conf | 6 +++++ .../test/scala/akka/stream/io/TcpHelper.scala | 13 ++++++----- .../scala/akka/stream/scaladsl/FlowSpec.scala | 2 +- .../stream/impl/GenJunctions.scala.template | 3 ++- .../akka/stream/actor/ActorPublisher.scala | 20 ++++------------ .../akka/stream/actor/ActorSubscriber.scala | 23 ++++++------------- .../impl/ActorFlowMaterializerImpl.scala | 2 +- .../akka/stream/impl/ConcatAllImpl.scala | 4 ++-- .../main/scala/akka/stream/impl/FanIn.scala | 20 ++++++++-------- .../main/scala/akka/stream/impl/FanOut.scala | 22 ++++++++---------- .../akka/stream/impl/FuturePublisher.scala | 13 ++++------- .../stream/impl/GroupByProcessorImpl.scala | 4 ++-- .../scala/akka/stream/impl/Messages.scala | 10 ++++---- .../akka/stream/impl/PrefixAndTailImpl.scala | 4 ++-- .../main/scala/akka/stream/impl/Sinks.scala | 4 ++-- .../impl/StreamOfStreamProcessors.scala | 21 ++++++++--------- .../akka/stream/impl/TickPublisher.scala | 7 +++--- .../impl/TimerTransformerProcessorsImpl.scala | 4 ++-- .../stream/impl/fusing/ActorInterpreter.scala | 6 ++--- .../{io/impl => impl/io}/IOSettings.scala | 2 +- .../stream/{io/impl => impl/io}/IOSinks.scala | 5 ++-- .../{io/impl => impl/io}/IOSources.scala | 2 +- .../io}/InputStreamPublisher.scala | 6 ++--- .../io}/OutputStreamSubscriber.scala | 6 ++--- .../scala/akka/stream/impl/io/SslTls.scala | 4 ++-- .../stream/impl/io/StreamTcpManager.scala | 9 ++++---- .../io}/SynchronousFilePublisher.scala | 5 ++-- .../io}/SynchronousFileSubscriber.scala | 6 ++--- .../stream/impl/io/TcpConnectionStream.scala | 4 ++-- .../stream/impl/io/TcpListenStreamActor.scala | 6 ++--- .../akka/stream/io/InputStreamSource.scala | 2 +- .../akka/stream/io/OutputStreamSink.scala | 2 +- .../akka/stream/io/SynchronousFileSink.scala | 4 ++-- .../stream/io/SynchronousFileSource.scala | 3 +-- .../main/scala/akka/stream/scaladsl/Tcp.scala | 11 ++------- 37 files changed, 121 insertions(+), 159 deletions(-) create mode 100644 akka-stream-tests/src/test/resources/reference.conf rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/IOSettings.scala (95%) rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/IOSinks.scala (96%) rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/IOSources.scala (99%) rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/InputStreamPublisher.scala (95%) rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/OutputStreamSubscriber.scala (93%) rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/SynchronousFilePublisher.scala (96%) rename akka-stream/src/main/scala/akka/stream/{io/impl => impl/io}/SynchronousFileSubscriber.scala (94%) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index b37d0a78b2..4f488f5fda 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -4,8 +4,7 @@ package akka.stream.testkit import scala.language.existentials -import akka.actor.ActorSystem -import akka.actor.DeadLetterSuppression +import akka.actor.{ NoSerializationVerificationNeeded, ActorSystem, DeadLetterSuppression } import akka.stream._ import akka.stream.impl._ import akka.testkit.TestProbe @@ -359,12 +358,12 @@ private[testkit] object StreamTestKit { import TestPublisher._ import TestSubscriber._ - sealed trait PublisherEvent extends DeadLetterSuppression + sealed trait PublisherEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class Subscribe(subscription: Subscription) extends PublisherEvent final case class CancelSubscription(subscription: Subscription) extends PublisherEvent final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent - sealed trait SubscriberEvent extends DeadLetterSuppression + sealed trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent final case class OnNext[I](element: I) extends SubscriberEvent final case object OnComplete extends SubscriberEvent diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index d649c0a3b5..2bb372e29b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -5,6 +5,7 @@ package akka.stream.javadsl; import akka.actor.ActorRef; import akka.japi.Pair; +import akka.pattern.Patterns; import akka.stream.*; import akka.stream.javadsl.FlowGraph.Builder; import akka.stream.stage.*; @@ -223,7 +224,7 @@ public class FlowGraphTest extends StreamTest { b.from(Source.single(1)).to(out); b.from(b.materializedValue()).to(Sink.foreach(new Procedure>(){ public void apply(Future mat) throws Exception { - probe.ref().tell(mat, ActorRef.noSender()); + Patterns.pipe(mat, system.dispatcher()).to(probe.ref()); } })); } @@ -232,10 +233,7 @@ public class FlowGraphTest extends StreamTest { final Integer result = Await.result(future, Duration.create(300, TimeUnit.MILLISECONDS)); assertEquals(1, (int) result); - final Future future2 = probe.expectMsgClass(Future.class); - - final Integer result2 = Await.result(future2, Duration.create(300, TimeUnit.MILLISECONDS)); - assertEquals(1, (int) result2); + probe.expectMsg(1); } } diff --git a/akka-stream-tests/src/test/resources/reference.conf b/akka-stream-tests/src/test/resources/reference.conf new file mode 100644 index 0000000000..ab48718a51 --- /dev/null +++ b/akka-stream-tests/src/test/resources/reference.conf @@ -0,0 +1,6 @@ +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala index 777370e095..0eca64eaf3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala @@ -3,7 +3,7 @@ */ package akka.stream.io -import akka.actor.{ Actor, ActorRef, Props } +import akka.actor._ import akka.io.Tcp.{ ResumeReading, Register, ConnectionClosed, Closed } import akka.io.{ IO, Tcp } import akka.stream.testkit._ @@ -17,9 +17,10 @@ import akka.stream.testkit.TestUtils.temporaryServerAddress import scala.concurrent.duration._ object TcpHelper { - case class ClientWrite(bytes: ByteString) - case class ClientRead(count: Int, readTo: ActorRef) - case class ClientClose(cmd: Tcp.CloseCommand) + case class ClientWrite(bytes: ByteString) extends NoSerializationVerificationNeeded + case class ClientRead(count: Int, readTo: ActorRef) extends NoSerializationVerificationNeeded + case class ClientClose(cmd: Tcp.CloseCommand) extends NoSerializationVerificationNeeded + case class ReadResult(bytes: ByteString) extends NoSerializationVerificationNeeded // FIXME: Workaround object just to force a ResumeReading that will poll for a possibly pending close event // See https://github.com/akka/akka/issues/16552 @@ -69,7 +70,7 @@ object TcpHelper { case Tcp.Received(bytes) ⇒ readBuffer ++= bytes if (readBuffer.size >= toRead) { - readTo ! readBuffer + readTo ! ReadResult(readBuffer) readBuffer = ByteString.empty toRead = 0 readTo = context.system.deadLetters @@ -136,7 +137,7 @@ trait TcpHelper { this: TestKitBase ⇒ def read(count: Int): Unit = connectionActor ! ClientRead(count, connectionProbe.ref) - def waitRead(): ByteString = connectionProbe.expectMsgType[ByteString] + def waitRead(): ByteString = connectionProbe.expectMsgType[ReadResult].bytes def confirmedClose(): Unit = connectionActor ! ClientClose(Tcp.ConfirmedClose) def close(): Unit = connectionActor ! ClientClose(Tcp.Close) def abort(): Unit = connectionActor ! ClientClose(Tcp.Abort) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index f746a95be3..d10779c9ec 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -59,7 +59,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val faultyFlow: Flow[Any, Any, _] ⇒ Flow[Any, Any, _] = in ⇒ in.andThenMat { () ⇒ val props = Props(new BrokenActorInterpreter(settings, List(fusing.Map({ x: Any ⇒ x }, stoppingDecider)), "a3")) - .withDispatcher("akka.test.stream-dispatcher") + .withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local) val processor = ActorProcessorFactory[Any, Any](system.actorOf( props, "borken-stage-actor")) diff --git a/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template b/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template index ca907f2b55..49c224ada8 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template @@ -4,6 +4,7 @@ package akka.stream.impl import akka.actor.Props +import akka.actor.Deploy import akka.stream._ import akka.stream.impl.Junctions.FanInModule import akka.stream.impl.StreamLayout.Module @@ -29,7 +30,7 @@ private[akka] object GenJunctions { override def carbonCopy: Module = ZipWith1Module(shape.deepCopy(), f, attributes) override def props(settings: ActorFlowMaterializerSettings): Props = - Props(new Zip1With(settings, f.asInstanceOf[Function1[[#Any#], Any]])) + Props(new Zip1With(settings, f.asInstanceOf[Function1[[#Any#], Any]])).withDeploy(Deploy.local) }# ] diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 5e3b097c28..7c299c518f 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -4,21 +4,11 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap -import akka.actor.Cancellable +import akka.actor._ import akka.stream.impl.{ ReactiveStreamsCompliance, StreamSubscriptionTimeoutSupport } import org.reactivestreams.{ Publisher, Subscriber, Subscription } -import akka.actor.AbstractActor -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.actor.Extension -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.actor.UntypedActor import concurrent.duration.Duration import concurrent.duration.FiniteDuration -import akka.actor.DeadLetterSuppression import akka.stream.impl.CancelledSubscription import akka.stream.impl.ReactiveStreamsCompliance._ @@ -35,7 +25,7 @@ object ActorPublisher { * INTERNAL API */ private[akka] object Internal { - final case class Subscribe(subscriber: Subscriber[Any]) extends DeadLetterSuppression + final case class Subscribe(subscriber: Subscriber[Any]) extends DeadLetterSuppression with NoSerializationVerificationNeeded sealed trait LifecycleState case object PreSubscriber extends LifecycleState @@ -55,20 +45,20 @@ object ActorPublisherMessage { * more elements. * @param n number of requested elements */ - @SerialVersionUID(1L) final case class Request(n: Long) extends ActorPublisherMessage + final case class Request(n: Long) extends ActorPublisherMessage with NoSerializationVerificationNeeded /** * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the * subscription. */ - @SerialVersionUID(1L) final case object Cancel extends Cancel + final case object Cancel extends Cancel with NoSerializationVerificationNeeded sealed class Cancel extends ActorPublisherMessage /** * This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout. * Once the actor receives this message, this publisher will already be in cancelled state, thus the actor should clean-up and stop itself. */ - @SerialVersionUID(1L) final case object SubscriptionTimeoutExceeded extends SubscriptionTimeoutExceeded + final case object SubscriptionTimeoutExceeded extends SubscriptionTimeoutExceeded with NoSerializationVerificationNeeded sealed abstract class SubscriptionTimeoutExceeded extends ActorPublisherMessage /** diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index f57c67c91a..6070a430a2 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -5,16 +5,7 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap import org.reactivestreams.{ Subscriber, Subscription } -import akka.actor.AbstractActor -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.actor.Extension -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.actor.UntypedActor -import akka.actor.DeadLetterSuppression +import akka.actor._ import akka.stream.impl.ReactiveStreamsCompliance object ActorSubscriber { @@ -28,17 +19,17 @@ object ActorSubscriber { /** * INTERNAL API */ - @SerialVersionUID(1L) private[akka] final case class OnSubscribe(subscription: Subscription) - extends DeadLetterSuppression + private[akka] final case class OnSubscribe(subscription: Subscription) + extends DeadLetterSuppression with NoSerializationVerificationNeeded } -sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression +sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression with NoSerializationVerificationNeeded object ActorSubscriberMessage { - @SerialVersionUID(1L) final case class OnNext(element: Any) extends ActorSubscriberMessage - @SerialVersionUID(1L) final case class OnError(cause: Throwable) extends ActorSubscriberMessage - @SerialVersionUID(1L) case object OnComplete extends ActorSubscriberMessage + final case class OnNext(element: Any) extends ActorSubscriberMessage + final case class OnError(cause: Throwable) extends ActorSubscriberMessage + case object OnComplete extends ActorSubscriberMessage /** * Java API: get the singleton instance of the `OnComplete` message diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 9dce508c39..f033d14e02 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -246,7 +246,7 @@ private[akka] class FlowNameCounter extends Extension { * INTERNAL API */ private[akka] object StreamSupervisor { - def props(settings: ActorFlowMaterializerSettings): Props = Props(new StreamSupervisor(settings)) + def props(settings: ActorFlowMaterializerSettings): Props = Props(new StreamSupervisor(settings)).withDeploy(Deploy.local) final case class Materialize(props: Props, name: String) extends DeadLetterSuppression diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index b397659bd9..a387be99d1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -5,14 +5,14 @@ package akka.stream.impl import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl.Sink -import akka.actor.Props +import akka.actor.{ Deploy, Props } /** * INTERNAL API */ private[akka] object ConcatAllImpl { def props(materializer: ActorFlowMaterializer): Props = - Props(new ConcatAllImpl(materializer)) + Props(new ConcatAllImpl(materializer)).withDeploy(Deploy.local) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index e5d749238c..7609762c7f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -3,13 +3,11 @@ */ package akka.stream.impl -import akka.actor.{ ActorRef, ActorLogging, Actor } -import akka.actor.Props +import akka.actor._ import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, InPort, Shape } import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } import akka.stream.scaladsl.FlexiMerge.MergeLogic import org.reactivestreams.{ Subscription, Subscriber } -import akka.actor.DeadLetterSuppression import scala.collection.immutable @@ -18,10 +16,10 @@ import scala.collection.immutable */ private[akka] object FanIn { - final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression - final case class OnComplete(id: Int) extends DeadLetterSuppression - final case class OnNext(id: Int, e: Any) extends DeadLetterSuppression - final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression + final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class OnComplete(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class OnNext(id: Int, e: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] { override def onError(cause: Throwable): Unit = { @@ -264,7 +262,7 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings, */ private[akka] object FairMerge { def props(settings: ActorFlowMaterializerSettings, inputPorts: Int): Props = - Props(new FairMerge(settings, inputPorts)) + Props(new FairMerge(settings, inputPorts)).withDeploy(Deploy.local) } /** @@ -287,7 +285,7 @@ private[akka] object UnfairMerge { val DefaultPreferred = 0 def props(settings: ActorFlowMaterializerSettings, inputPorts: Int): Props = - Props(new UnfairMerge(settings, inputPorts, DefaultPreferred)) + Props(new UnfairMerge(settings, inputPorts, DefaultPreferred)).withDeploy(Deploy.local) } /** @@ -309,14 +307,14 @@ private[akka] final class UnfairMerge(_settings: ActorFlowMaterializerSettings, */ private[akka] object FlexiMerge { def props[T, S <: Shape](settings: ActorFlowMaterializerSettings, ports: S, mergeLogic: MergeLogic[T]): Props = - Props(new FlexiMergeImpl(settings, ports, mergeLogic)) + Props(new FlexiMergeImpl(settings, ports, mergeLogic)).withDeploy(Deploy.local) } /** * INTERNAL API */ private[akka] object Concat { - def props(settings: ActorFlowMaterializerSettings): Props = Props(new Concat(settings)) + def props(settings: ActorFlowMaterializerSettings): Props = Props(new Concat(settings)).withDeploy(Deploy.local) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index b727791b9a..e7a73bd766 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -7,21 +7,17 @@ import akka.stream.scaladsl.FlexiRoute.RouteLogic import akka.stream.{ AbruptTerminationException, Shape, ActorFlowMaterializerSettings } import scala.collection.immutable -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Props +import akka.actor._ import org.reactivestreams.Subscription -import akka.actor.DeadLetterSuppression /** * INTERNAL API */ private[akka] object FanOut { - final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression - final case class SubstreamCancel(id: Int) extends DeadLetterSuppression - final case class SubstreamSubscribePending(id: Int) extends DeadLetterSuppression + final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class SubstreamCancel(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class SubstreamSubscribePending(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription { override def request(elements: Long): Unit = parent ! SubstreamRequestMore(id, elements) @@ -33,7 +29,7 @@ private[akka] object FanOut { override def createSubscription(): Subscription = new SubstreamSubscription(actor, id) } - final case class ExposedPublishers(publishers: immutable.Seq[ActorPublisher[Any]]) extends DeadLetterSuppression + final case class ExposedPublishers(publishers: immutable.Seq[ActorPublisher[Any]]) extends DeadLetterSuppression with NoSerializationVerificationNeeded class OutputBunch(outputCount: Int, impl: ActorRef, pump: Pump) { private var bunchCancelled = false @@ -294,7 +290,7 @@ private[akka] abstract class FanOut(val settings: ActorFlowMaterializerSettings, */ private[akka] object Broadcast { def props(settings: ActorFlowMaterializerSettings, outputPorts: Int): Props = - Props(new Broadcast(settings, outputPorts)) + Props(new Broadcast(settings, outputPorts)).withDeploy(Deploy.local) } /** @@ -314,7 +310,7 @@ private[akka] class Broadcast(_settings: ActorFlowMaterializerSettings, _outputP */ private[akka] object Balance { def props(settings: ActorFlowMaterializerSettings, outputPorts: Int, waitForAllDownstreams: Boolean): Props = - Props(new Balance(settings, outputPorts, waitForAllDownstreams)) + Props(new Balance(settings, outputPorts, waitForAllDownstreams)).withDeploy(Deploy.local) } /** @@ -341,7 +337,7 @@ private[akka] class Balance(_settings: ActorFlowMaterializerSettings, _outputPor */ private[akka] object Unzip { def props(settings: ActorFlowMaterializerSettings): Props = - Props(new Unzip(settings)) + Props(new Unzip(settings)).withDeploy(Deploy.local) } /** @@ -373,5 +369,5 @@ private[akka] class Unzip(_settings: ActorFlowMaterializerSettings) extends FanO */ private[akka] object FlexiRoute { def props[T, S <: Shape](settings: ActorFlowMaterializerSettings, ports: S, routeLogic: RouteLogic[T]): Props = - Props(new FlexiRouteImpl(settings, ports, routeLogic)) + Props(new FlexiRouteImpl(settings, ports, routeLogic)).withDeploy(Deploy.local) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index 7b6141e25f..827415113b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -7,16 +7,11 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.Status -import akka.actor.SupervisorStrategy +import akka.actor._ import akka.stream.ActorFlowMaterializerSettings import akka.pattern.pipe import org.reactivestreams.Subscriber import org.reactivestreams.Subscription -import akka.actor.DeadLetterSuppression import scala.util.control.NonFatal /** @@ -24,11 +19,11 @@ import scala.util.control.NonFatal */ private[akka] object FuturePublisher { def props(future: Future[Any], settings: ActorFlowMaterializerSettings): Props = - Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher) + Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) object FutureSubscription { - final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression - final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression + final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded } class FutureSubscription(ref: ActorRef) extends Subscription { diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 4a910f1637..db441f6489 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -4,7 +4,7 @@ package akka.stream.impl import scala.util.control.NonFatal -import akka.actor.Props +import akka.actor.{ Deploy, Props } import akka.stream.ActorFlowMaterializerSettings import akka.stream.Supervision import akka.stream.scaladsl.Source @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Source */ private[akka] object GroupByProcessorImpl { def props(settings: ActorFlowMaterializerSettings, keyFor: Any ⇒ Any): Props = - Props(new GroupByProcessorImpl(settings, keyFor)) + Props(new GroupByProcessorImpl(settings, keyFor)).withDeploy(Deploy.local) private case object Drop } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index 39734637cc..02b09ddd20 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -5,28 +5,28 @@ package akka.stream.impl import language.existentials import org.reactivestreams.Subscription -import akka.actor.DeadLetterSuppression +import akka.actor.{ NoSerializationVerificationNeeded, DeadLetterSuppression } /** * INTERNAL API */ -private[akka] case object SubscribePending extends DeadLetterSuppression +private[akka] case object SubscribePending extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long) - extends DeadLetterSuppression + extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ private[akka] final case class Cancel(subscription: ActorSubscription[_]) - extends DeadLetterSuppression + extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any]) - extends DeadLetterSuppression + extends DeadLetterSuppression with NoSerializationVerificationNeeded diff --git a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala index 37a40c799c..29f4412b47 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala @@ -6,14 +6,14 @@ package akka.stream.impl import scala.collection.immutable import akka.stream.ActorFlowMaterializerSettings import akka.stream.scaladsl.Source -import akka.actor.Props +import akka.actor.{ Deploy, Props } /** * INTERNAL API */ private[akka] object PrefixAndTailImpl { def props(settings: ActorFlowMaterializerSettings, takeMax: Int): Props = - Props(new PrefixAndTailImpl(settings, takeMax)) + Props(new PrefixAndTailImpl(settings, takeMax)).withDeploy(Deploy.local) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index c309e7a5ca..e9687d8f49 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -5,7 +5,7 @@ package akka.stream.impl import java.io.File import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ ActorRef, Props } +import akka.actor.{ Deploy, ActorRef, Props } import akka.stream.ActorOperationAttributes.Dispatcher import akka.stream.impl.StreamLayout.Module import akka.stream.OperationAttributes @@ -83,7 +83,7 @@ private[akka] final class FanoutPublisherSink[In]( val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) val fanoutActor = actorMaterializer.actorOf(context, Props(new FanoutProcessorImpl(actorMaterializer.effectiveSettings(context.effectiveAttributes), - initialBufferSize, maximumBufferSize))) + initialBufferSize, maximumBufferSize)).withDeploy(Deploy.local)) val fanoutProcessor = ActorProcessorFactory[In, In](fanoutActor) (fanoutProcessor, fanoutProcessor) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 6ff69ff9ae..1ee755fd25 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -4,24 +4,21 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference -import akka.actor.ActorLogging -import akka.actor.Cancellable -import akka.actor.{ Actor, ActorRef } +import akka.actor._ import akka.stream.ActorFlowMaterializerSettings import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.collection.mutable import scala.concurrent.duration.FiniteDuration -import akka.actor.DeadLetterSuppression /** * INTERNAL API */ private[akka] object MultiStreamOutputProcessor { final case class SubstreamKey(id: Long) - final case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) extends DeadLetterSuppression - final case class SubstreamCancel(substream: SubstreamKey) extends DeadLetterSuppression - final case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) extends DeadLetterSuppression - final case class SubstreamSubscriptionTimeout(substream: SubstreamKey) extends DeadLetterSuppression + final case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class SubstreamCancel(substream: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class SubstreamSubscriptionTimeout(substream: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements) @@ -316,10 +313,10 @@ private[akka] object MultiStreamInputProcessor { } } - case class SubstreamOnComplete(key: SubstreamKey) extends DeadLetterSuppression - case class SubstreamOnNext(key: SubstreamKey, element: Any) extends DeadLetterSuppression - case class SubstreamOnError(key: SubstreamKey, e: Throwable) extends DeadLetterSuppression - case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) extends DeadLetterSuppression + case class SubstreamOnComplete(key: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded + case class SubstreamOnNext(key: SubstreamKey, element: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded + case class SubstreamOnError(key: SubstreamKey, e: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded + case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded class SubstreamInput(val key: SubstreamKey, bufferSize: Int, processor: MultiStreamInputProcessorLike, pump: Pump) extends BatchingInputBuffer(bufferSize, pump) { // Not driven directly diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 54eb4365ed..d94499db2c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -4,13 +4,12 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean -import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } +import akka.actor._ import akka.stream.ActorFlowMaterializerSettings import org.reactivestreams.{ Subscriber, Subscription } import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal -import akka.actor.DeadLetterSuppression import akka.event.Logging /** @@ -19,7 +18,9 @@ import akka.event.Logging private[akka] object TickPublisher { def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any, settings: ActorFlowMaterializerSettings, cancelled: AtomicBoolean): Props = - Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher) + Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)) + .withDispatcher(settings.dispatcher) + .withDeploy(Deploy.local) object TickPublisherSubscription { case object Cancel extends DeadLetterSuppression diff --git a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala index a9a4899d1f..95c19e51bb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala @@ -7,11 +7,11 @@ import java.util.LinkedList import akka.stream.ActorFlowMaterializerSettings import akka.stream.TimerTransformer import scala.util.control.NonFatal -import akka.actor.Props +import akka.actor.{ Deploy, Props } private[akka] object TimerTransformerProcessorsImpl { def props(settings: ActorFlowMaterializerSettings, transformer: TimerTransformer[Any, Any]): Props = - Props(new TimerTransformerProcessorsImpl(settings, transformer)) + Props(new TimerTransformerProcessorsImpl(settings, transformer)).withDeploy(Deploy.local) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 83563e8702..8e906db61e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -154,7 +154,7 @@ private[akka] object ActorOutputBoundary { /** * INTERNAL API. */ - private case object ContinuePulling extends DeadLetterSuppression + private case object ContinuePulling extends DeadLetterSuppression with NoSerializationVerificationNeeded } /** @@ -308,9 +308,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, */ private[akka] object ActorInterpreter { def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]], materializer: ActorFlowMaterializer, attributes: OperationAttributes = OperationAttributes.none): Props = - Props(new ActorInterpreter(settings, ops, materializer, attributes)) + Props(new ActorInterpreter(settings, ops, materializer, attributes)).withDeploy(Deploy.local) - case class AsyncInput(op: AsyncStage[Any, Any, Any], ctx: AsyncContext[Any, Any], event: Any) extends DeadLetterSuppression + case class AsyncInput(op: AsyncStage[Any, Any, Any], ctx: AsyncContext[Any, Any], event: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded } /** diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala similarity index 95% rename from akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala index 0681e8ebf7..28bce437d4 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala @@ -1,4 +1,4 @@ -package akka.stream.io.impl +package akka.stream.impl.io import akka.stream.ActorOperationAttributes.Dispatcher import akka.stream.{ ActorFlowMaterializer, MaterializationContext } diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala similarity index 96% rename from akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index aac9015381..1b1b0c8c54 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -1,13 +1,12 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.stream.io.impl +package akka.stream.impl.io import java.io.{ File, OutputStream } import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module -import akka.stream.io.impl.IOSettings._ import akka.stream.{ ActorFlowMaterializer, MaterializationContext, OperationAttributes, SinkShape } import akka.util.ByteString @@ -27,7 +26,7 @@ private[akka] final class SynchronousFileSink(f: File, append: Boolean, val attr val bytesWrittenPromise = Promise[Long]() val props = SynchronousFileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append) - val dispatcher = fileIoDispatcher(context) + val dispatcher = IOSettings.fileIoDispatcher(context) val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala similarity index 99% rename from akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 4678c0de61..2ef1ef5ead 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.stream.io.impl +package akka.stream.impl.io import java.io.{ File, InputStream } diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala similarity index 95% rename from akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index 33808e9748..f7ec56d08a 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -1,11 +1,11 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.stream.io.impl +package akka.stream.impl.io import java.io.InputStream -import akka.actor.{ ActorLogging, DeadLetterSuppression, Props } +import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } import akka.io.DirectByteBufferPool import akka.stream.actor.ActorPublisherMessage import akka.util.ByteString @@ -22,7 +22,7 @@ private[akka] object InputStreamPublisher { require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") - Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize, initialBuffer, maxBuffer) + Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize, initialBuffer, maxBuffer).withDeploy(Deploy.local) } private final case object Continue extends DeadLetterSuppression diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala similarity index 93% rename from akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index 75a43faac9..b3507d7430 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -1,11 +1,11 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.stream.io.impl +package akka.stream.impl.io import java.io.OutputStream -import akka.actor.{ ActorLogging, Props } +import akka.actor.{ Deploy, ActorLogging, Props } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString @@ -15,7 +15,7 @@ import scala.concurrent.Promise private[akka] object OutputStreamSubscriber { def props(os: OutputStream, completionPromise: Promise[Long], bufSize: Int) = { require(bufSize > 0, "buffer size must be > 0") - Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize) + Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize).withDeploy(Deploy.local) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala index 2d7043cc6b..dd18289bc7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala @@ -10,7 +10,7 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl._ -import akka.actor.{ Props, Actor, ActorLogging, ActorRef } +import akka.actor._ import akka.stream.ActorFlowMaterializerSettings import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanOut.OutputBunch @@ -35,7 +35,7 @@ private[akka] object SslTlsCipherActor { tracing: Boolean, role: Role, closing: Closing): Props = - Props(new SslTlsCipherActor(settings, sslContext, firstSession, tracing, role, closing)) + Props(new SslTlsCipherActor(settings, sslContext, firstSession, tracing, role, closing)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala index 8778a3c763..12b1084882 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala @@ -10,7 +10,7 @@ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import akka.actor.Actor +import akka.actor.{ NoSerializationVerificationNeeded, Actor, DeadLetterSuppression } import akka.io.Inet.SocketOption import akka.io.Tcp import akka.stream.ActorFlowMaterializerSettings @@ -20,7 +20,6 @@ import akka.stream.scaladsl.{ Tcp ⇒ StreamTcp } import akka.util.ByteString import org.reactivestreams.Processor import org.reactivestreams.Subscriber -import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -37,7 +36,7 @@ private[akka] object StreamTcpManager { options: immutable.Traversable[SocketOption], connectTimeout: Duration, idleTimeout: Duration) - extends DeadLetterSuppression + extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API @@ -50,13 +49,13 @@ private[akka] object StreamTcpManager { backlog: Int, options: immutable.Traversable[SocketOption], idleTimeout: Duration) - extends DeadLetterSuppression + extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ private[akka] final case class ExposedProcessor(processor: Processor[ByteString, ByteString]) - extends DeadLetterSuppression + extends DeadLetterSuppression with NoSerializationVerificationNeeded } diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFilePublisher.scala similarity index 96% rename from akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFilePublisher.scala index 967cd045cf..67c0b30eaa 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFilePublisher.scala @@ -1,13 +1,13 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.stream.io.impl +package akka.stream.impl.io import java.io.{ File, RandomAccessFile } import java.nio.ByteBuffer import java.nio.channels.FileChannel -import akka.actor.{ ActorLogging, DeadLetterSuppression, Props } +import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } import akka.stream.actor.ActorPublisherMessage import akka.util.ByteString @@ -22,6 +22,7 @@ private[akka] object SynchronousFilePublisher { require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") Props(classOf[SynchronousFilePublisher], f, completionPromise, chunkSize, initialBuffer, maxBuffer) + .withDeploy(Deploy.local) } private final case object Continue extends DeadLetterSuppression diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFileSubscriber.scala similarity index 94% rename from akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFileSubscriber.scala index e57356f036..94f1d8c0cd 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFileSubscriber.scala @@ -1,12 +1,12 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.stream.io.impl +package akka.stream.impl.io import java.io.{ File, RandomAccessFile } import java.nio.channels.FileChannel -import akka.actor.{ ActorLogging, Props } +import akka.actor.{ Deploy, ActorLogging, Props } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString @@ -16,7 +16,7 @@ import scala.concurrent.Promise private[akka] object SynchronousFileSubscriber { def props(f: File, completionPromise: Promise[Long], bufSize: Int, append: Boolean) = { require(bufSize > 0, "buffer size must be > 0") - Props(classOf[SynchronousFileSubscriber], f, completionPromise, bufSize, append) + Props(classOf[SynchronousFileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index c9d90857d2..cb1ef5d232 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -27,10 +27,10 @@ private[akka] object TcpStreamActor { connectCmd: Connect, materializerSettings: ActorFlowMaterializerSettings): Props = Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, connectCmd, - materializerSettings)).withDispatcher(materializerSettings.dispatcher) + materializerSettings)).withDispatcher(materializerSettings.dispatcher).withDeploy(Deploy.local) def inboundProps(connection: ActorRef, settings: ActorFlowMaterializerSettings): Props = - Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher) + Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 6ad135f912..1e482a7551 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -5,9 +5,7 @@ package akka.stream.impl.io import java.net.InetSocketAddress import scala.concurrent.{ Future, Promise } -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props +import akka.actor._ import akka.io.{ IO, Tcp } import akka.io.Tcp._ import akka.stream.{ FlowMaterializer, ActorFlowMaterializerSettings } @@ -18,7 +16,6 @@ import akka.util.ByteString import org.reactivestreams.Subscriber import akka.stream.ConnectionException import akka.stream.BindFailedException -import akka.actor.ActorLogging /** * INTERNAL API @@ -29,6 +26,7 @@ private[akka] object TcpListenStreamActor { flowSubscriber: Subscriber[StreamTcp.IncomingConnection], bindCmd: Tcp.Bind, materializerSettings: ActorFlowMaterializerSettings): Props = { Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, bindCmd, materializerSettings)) + .withDeploy(Deploy.local) } } diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala index 16bb322843..81ce9bcf47 100644 --- a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala @@ -6,7 +6,7 @@ package akka.stream.io import java.io.InputStream import akka.japi.function.Creator -import akka.stream.io.impl.InputStreamSource +import akka.stream.impl.io.InputStreamSource import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source._ import akka.stream.{ OperationAttributes, javadsl } diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala index 91fa91b3ed..3df27882fd 100644 --- a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala @@ -6,7 +6,7 @@ package akka.stream.io import java.io.OutputStream import akka.japi.function.Creator -import akka.stream.io.impl.OutputStreamSink +import akka.stream.impl.io.OutputStreamSink import akka.stream.scaladsl.Sink import akka.stream.{ ActorOperationAttributes, OperationAttributes, javadsl } import akka.util.ByteString diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala index 10da78cdce..23e5cf4540 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala @@ -5,8 +5,8 @@ package akka.stream.io import java.io.File +import akka.stream.impl.io.SynchronousFileSink import akka.stream.{ OperationAttributes, javadsl, ActorOperationAttributes } -import akka.stream.io.impl.SynchronousFileSink import akka.stream.scaladsl.Sink import akka.util.ByteString @@ -28,7 +28,7 @@ object SynchronousFileSink { * unless configured otherwise by using [[ActorOperationAttributes]]. */ def apply(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = - new Sink(new impl.SynchronousFileSink(f, append, DefaultAttributes, Sink.shape("SynchronousFileSink"))) + new Sink(new SynchronousFileSink(f, append, DefaultAttributes, Sink.shape("SynchronousFileSink"))) /** * Java API diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala index 3764c558ed..6fb71cf21d 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -4,8 +4,7 @@ package akka.stream.io import java.io.File - -import akka.stream.io.impl.SynchronousFileSource +import akka.stream.impl.io.SynchronousFileSource import akka.stream.scaladsl.Source import akka.stream.{ ActorOperationAttributes, OperationAttributes, javadsl } import akka.util.ByteString diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index a7bae8a5d5..835e60bbf8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -10,13 +10,7 @@ import scala.concurrent.{ Promise, ExecutionContext, Future } import scala.concurrent.duration.Duration import scala.util.{ Failure, Success } import scala.util.control.NoStackTrace -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.actor.Props +import akka.actor._ import akka.io.Inet.SocketOption import akka.io.{ Tcp ⇒ IoTcp } import akka.stream._ @@ -25,7 +19,6 @@ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.scaladsl._ import akka.util.ByteString import org.reactivestreams.{ Publisher, Processor, Subscriber, Subscription } -import akka.actor.actorRef2Scala import akka.stream.impl.io.TcpStreamActor import akka.stream.impl.io.TcpListenStreamActor import akka.stream.impl.io.DelayedInitProcessor @@ -77,7 +70,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { import Tcp._ private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager] - .withDispatcher(IoTcp(system).Settings.ManagementDispatcher), name = "IO-TCP-STREAM") + .withDispatcher(IoTcp(system).Settings.ManagementDispatcher).withDeploy(Deploy.local), name = "IO-TCP-STREAM") private class BindSource( val endpoint: InetSocketAddress,