diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index c30aceb9b8..84b3d447bd 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -131,9 +131,9 @@ class FlowGraphDocSpec extends AkkaSpec { // A Shape must be able to create a copy of itself. Basically // it means a new instance with copies of the ports override def deepCopy() = PriorityWorkerPoolShape( - new Inlet[In](jobsIn.toString), - new Inlet[In](priorityJobsIn.toString), - new Outlet[Out](resultsOut.toString)) + jobsIn.carbonCopy(), + priorityJobsIn.carbonCopy(), + resultsOut.carbonCopy()) // A Shape must also be able to create itself from existing ports override def copyFromPorts( diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala index 8b54e63b21..3d950a66ee 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala @@ -28,9 +28,9 @@ private object PoolConductor { override def deepCopy(): Shape = Ports( - new Inlet(requestIn.toString), - new Inlet(slotEventIn.toString), - slotOuts.map(o ⇒ new Outlet(o.toString))) + requestIn.carbonCopy(), + slotEventIn.carbonCopy(), + slotOuts.map(_.carbonCopy())) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = Ports( diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala index 59b1e89b58..97887d5a7f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -32,6 +32,6 @@ private[http] object MessageToFrameRenderer { case bm: BinaryMessage ⇒ streamedFrames(Opcode.Binary, bm.dataStream) case TextMessage.Strict(text) ⇒ strictFrames(Opcode.Text, ByteString(text, "UTF-8")) case tm: TextMessage ⇒ streamedFrames(Opcode.Text, tm.textStream.transform(() ⇒ new Utf8Encoder)) - }.flatten(FlattenStrategy.Concat()) + }.flatten(FlattenStrategy.concat) } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index b9032eb45c..3007330cd0 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -218,9 +218,9 @@ private[http] object StreamUtils { } def oneTimePublisherSink[In](cell: OneTimeWriteCell[Publisher[In]], name: String): Sink[In, Publisher[In]] = - new Sink[In, Publisher[In]](new OneTimePublisherSink(none, SinkShape(new Inlet(name)), cell)) + new Sink[In, Publisher[In]](new OneTimePublisherSink(none, SinkShape(Inlet(name)), cell)) def oneTimeSubscriberSource[Out](cell: OneTimeWriteCell[Subscriber[Out]], name: String): Source[Out, Subscriber[Out]] = - new Source[Out, Subscriber[Out]](new OneTimeSubscriberSource(none, SourceShape(new Outlet(name)), cell)) + new Source[Out, Subscriber[Out]](new OneTimeSubscriberSource(none, SourceShape(Outlet(name)), cell)) /** A copy of PublisherSink that allows access to the publisher through the cell but can only materialized once */ private class OneTimePublisherSink[In](attributes: Attributes, shape: SinkShape[In], cell: OneTimeWriteCell[Publisher[In]]) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala index ad5d7008d2..4f93e20247 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala @@ -17,6 +17,6 @@ object TestSink { /** * A Sink that materialized to a [[TestSubscriber.Probe]]. */ - def probe[T]()(implicit system: ActorSystem) = new Sink[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(new Inlet("ProbeSink.in")))) + def probe[T]()(implicit system: ActorSystem) = new Sink[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(Inlet("ProbeSink.in")))) } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala index b0de7d2ffa..2fa3b42703 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala @@ -19,6 +19,6 @@ object TestSource { /** * A Source that materializes to a [[TestPublisher.Probe]]. */ - def probe[T]()(implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(new Outlet("ProbeSource.out")))) + def probe[T]()(implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index d6b7ebf461..7296b5cfa1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -12,7 +12,7 @@ class StreamLayoutSpec extends AkkaSpec { import StreamLayout._ def testAtomic(inPortCount: Int, outPortCount: Int): Module = new Module { - override val shape = AmorphousShape(List.fill(inPortCount)(new Inlet("")), List.fill(outPortCount)(new Outlet(""))) + override val shape = AmorphousShape(List.fill(inPortCount)(Inlet("")), List.fill(outPortCount)(Outlet(""))) override def replaceShape(s: Shape): Module = ??? override def subModules: Set[Module] = Set.empty diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 1cdf49f0b9..dea4520e96 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -21,8 +21,8 @@ object GraphOpsIntegrationSpec { override def outlets: immutable.Seq[Outlet[_]] = List(out1, out2) override def deepCopy() = ShufflePorts( - new Inlet[In](in1.toString), new Inlet[In](in2.toString), - new Outlet[Out](out1.toString), new Outlet[Out](out2.toString)) + in1.carbonCopy(), in2.carbonCopy(), + out1.carbonCopy(), out2.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): ShufflePorts[In, Out] = { assert(inlets.size == this.inlets.size) assert(outlets.size == this.outlets.size) diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template index 21b8cf70c4..9e73c459a6 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template @@ -7,18 +7,24 @@ import scala.collection.immutable import scala.annotation.varargs object FanInShape { - sealed trait Init[+O] - case class Name(name: String) extends Init[Nothing] - case class Ports[O](out: Outlet[O], ins: immutable.Seq[Inlet[_]]) extends Init[O] + sealed trait Init[+O] { + def outlet: Outlet[O] + def inlets: immutable.Seq[Inlet[_]] + def name: String + } + final case class Name(override val name: String) extends Init[Nothing] { + override def outlet: Outlet[Nothing] = Outlet(s"$name.out") + override def inlets: immutable.Seq[Inlet[_]] = Nil + } + final case class Ports[O](override val outlet: Outlet[O], override val inlets: immutable.Seq[Inlet[_]]) extends Init[O] { + override def name: String = "FanIn" + } } -abstract class FanInShape[O](init: FanInShape.Init[O]) extends Shape { +abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inlet[_]], _name: String) extends Shape { import FanInShape._ - final private[this] val (_out, _registered, _name) = init match { - case Name(name) => (new Outlet[O](s"$name.out"), Nil.iterator, name) - case Ports(o, it) => (o, it.iterator, "FanIn") - } + def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name) final def out: Outlet[O] = _out final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil @@ -26,14 +32,14 @@ abstract class FanInShape[O](init: FanInShape.Init[O]) extends Shape { private var _inlets: Vector[Inlet[_]] = Vector.empty protected def newInlet[T](name: String): Inlet[T] = { - val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else new Inlet[T](s"${_name}.$name") + val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else Inlet[T](s"${_name}.$name") _inlets :+= p p } protected def construct(init: Init[O]): FanInShape[O] - def deepCopy(): FanInShape[O] = construct(Ports[O](new Outlet(_out.toString), inlets.map(i => new Inlet(i.toString)))) + def deepCopy(): FanInShape[O] = construct(Ports[O](_out.carbonCopy(), inlets.map(_.carbonCopy()))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = { require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanInShape") require(inlets.size == _inlets.size, s"proposed inlets [${inlets.mkString(", ")}] do not fit FanInShape") diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template index 24be389f62..0bd0cee686 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template @@ -6,18 +6,24 @@ package akka.stream import scala.collection.immutable object FanOutShape { - sealed trait Init[I] - case class Name[I](name: String) extends Init[I] - case class Ports[I](in: Inlet[I], outs: immutable.Seq[Outlet[_]]) extends Init[I] + sealed trait Init[I] { + def inlet: Inlet[I] + def outlets: immutable.Seq[Outlet[_]] + def name: String + } + final case class Name[I](override val name: String) extends Init[I] { + override def inlet: Inlet[I] = Inlet(s"$name.in") + override def outlets: immutable.Seq[Outlet[_]] = Nil + } + final case class Ports[I](override val inlet: Inlet[I], override val outlets: immutable.Seq[Outlet[_]]) extends Init[I] { + override def name: String = "FanOut" + } } -abstract class FanOutShape[I](init: FanOutShape.Init[I]) extends Shape { +abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outlet[_]], _name: String) extends Shape { import FanOutShape._ - - final private[this] val (_in, _registered, _name) = init match { - case Name(name) => (new Inlet[I](s"$name.in"), Nil.iterator, name) - case Ports(o, it) => (o, it.iterator, "FanOut") - } + + def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name) final def in: Inlet[I] = _in final override def outlets: immutable.Seq[Outlet[_]] = _outlets @@ -25,14 +31,14 @@ abstract class FanOutShape[I](init: FanOutShape.Init[I]) extends Shape { private var _outlets: Vector[Outlet[_]] = Vector.empty protected def newOutlet[T](name: String): Outlet[T] = { - val p = if (_registered.hasNext) _registered.next().asInstanceOf[Outlet[T]] else new Outlet[T](s"${_name}.$name") + val p = if (_registered.hasNext) _registered.next().asInstanceOf[Outlet[T]] else Outlet[T](s"${_name}.$name") _outlets :+= p p } protected def construct(init: Init[I]): FanOutShape[I] - def deepCopy(): FanOutShape[I] = construct(Ports[I](new Inlet(_in.toString), outlets.map(i => new Outlet(i.toString)))) + def deepCopy(): FanOutShape[I] = construct(Ports[I](_in.carbonCopy(), outlets.map(_.carbonCopy()))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanOutShape[I] = { require(outlets.size == _outlets.size, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanOutShape") require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit FanOutShape") diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 38f280ed82..cc123c0714 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -26,13 +26,26 @@ sealed abstract class OutPort * is the InPort (which does not bear an element type because Modules only * express the internal structural hierarchy of stream topologies). */ -final class Inlet[-T](override val toString: String) extends InPort +object Inlet { + def apply[T](toString: String): Inlet[T] = new Inlet[T](toString) +} + +final class Inlet[-T] private (override val toString: String) extends InPort { + def carbonCopy(): Inlet[T] = Inlet(toString) +} + /** * An Outlet is a typed output to a Shape. Its partner in the Module view * is the OutPort (which does not bear an element type because Modules only * express the internal structural hierarchy of stream topologies). */ -final class Outlet[+T](override val toString: String) extends OutPort +object Outlet { + def apply[T](toString: String): Outlet[T] = new Outlet[T](toString) +} + +final class Outlet[+T] private (override val toString: String) extends OutPort { + def carbonCopy(): Outlet[T] = Outlet(toString) +} /** * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the @@ -148,9 +161,7 @@ object ClosedShape extends ClosedShape { * meaningful type of Shape when the building is finished. */ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]) extends Shape { - override def deepCopy() = AmorphousShape( - inlets.map(i ⇒ new Inlet[Any](i.toString)), - outlets.map(o ⇒ new Outlet[Any](o.toString))) + override def deepCopy() = AmorphousShape(inlets.map(_.carbonCopy()), outlets.map(_.carbonCopy())) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = AmorphousShape(inlets, outlets) } @@ -162,7 +173,7 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = Nil override val outlets: immutable.Seq[Outlet[_]] = List(outlet) - override def deepCopy(): SourceShape[T] = SourceShape(new Outlet(outlet.toString)) + override def deepCopy(): SourceShape[T] = SourceShape(outlet.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit SourceShape") require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit SourceShape") @@ -179,7 +190,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = List(outlet) - override def deepCopy(): FlowShape[I, O] = FlowShape(new Inlet(inlet.toString), new Outlet(outlet.toString)) + override def deepCopy(): FlowShape[I, O] = FlowShape(inlet.carbonCopy(), outlet.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit FlowShape") require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FlowShape") @@ -194,7 +205,7 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = Nil - override def deepCopy(): SinkShape[T] = SinkShape(new Inlet(inlet.toString)) + override def deepCopy(): SinkShape[T] = SinkShape(inlet.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit SinkShape") require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit SinkShape") @@ -229,7 +240,7 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.inlet, top.outlet, bottom.inlet, bottom.outlet) override def deepCopy(): BidiShape[In1, Out1, In2, Out2] = - BidiShape(new Inlet(in1.toString), new Outlet(out1.toString), new Inlet(in2.toString), new Outlet(out2.toString)) + BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 2, s"proposed inlets [${inlets.mkString(", ")}] do not fit BidiShape") require(outlets.size == 2, s"proposed outlets [${outlets.mkString(", ")}] do not fit BidiShape") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 7af228309b..a44840356e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -128,9 +128,8 @@ private[akka] case class ActorMaterializerImpl( case Identity(attr) ⇒ (new VirtualProcessor, ()) case _ ⇒ val (opprops, mat) = ActorProcessorFactory.props(ActorMaterializerImpl.this, op, effectiveAttributes) - val processor = ActorProcessorFactory[Any, Any]( - actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) - processor -> mat + ActorProcessorFactory[Any, Any]( + actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat } private def materializeJunction(op: JunctionModule, diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 22555807aa..838046b491 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -92,9 +92,7 @@ private[akka] final class CancellingSubscriber[T] extends Subscriber[T] { private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = - try { - ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "Publisher") - } catch { + try rejectAdditionalSubscriber(subscriber, "Publisher") catch { case _: SpecViolation ⇒ // nothing we can do } def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index b55de3bfb8..38b56f32a0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -37,6 +37,13 @@ private[akka] object FanIn { } } + type State = Byte + final val Marked = 1 + final val Pending = 2 + final val Depleted = 4 + final val Completed = 8 + final val Cancelled = 16 + abstract class InputBunch(inputCount: Int, bufferSize: Int, pump: Pump) { private var allCancelled = false @@ -46,22 +53,39 @@ private[akka] object FanIn { } } - private val marked = Array.ofDim[Boolean](inputCount) + private[this] final val states = Array.ofDim[State](inputCount) private var markCount = 0 - private val pending = Array.ofDim[Boolean](inputCount) private var markedPending = 0 - private val depleted = Array.ofDim[Boolean](inputCount) - private val completed = Array.ofDim[Boolean](inputCount) private var markedDepleted = 0 - private val cancelled = Array.ofDim[Boolean](inputCount) + + private[this] final def hasState(index: Int, flag: Int): Boolean = + (states(index) & flag) != 0 + private[this] final def setState(index: Int, flag: Int, on: Boolean): Unit = + states(index) = if (on) (states(index) | flag).toByte else (states(index) & ~flag).toByte + + private[this] final def cancelled(index: Int): Boolean = hasState(index, Cancelled) + private[this] final def cancelled(index: Int, on: Boolean): Unit = setState(index, Cancelled, on) + + private[this] final def completed(index: Int): Boolean = hasState(index, Completed) + private[this] final def completed(index: Int, on: Boolean): Unit = setState(index, Completed, on) + + private[this] final def depleted(index: Int): Boolean = hasState(index, Depleted) + private[this] final def depleted(index: Int, on: Boolean): Unit = setState(index, Depleted, on) + + private[this] final def pending(index: Int): Boolean = hasState(index, Pending) + private[this] final def pending(index: Int, on: Boolean): Unit = setState(index, Pending, on) + + private[this] final def marked(index: Int): Boolean = hasState(index, Marked) + private[this] final def marked(index: Int, on: Boolean): Unit = setState(index, Marked, on) override def toString: String = s"""|InputBunch - | marked: ${marked.mkString(", ")} - | pending: ${pending.mkString(", ")} - | depleted: ${depleted.mkString(", ")} - | completed: ${completed.mkString(", ")} - | cancelled: ${cancelled.mkString(", ")} + | marked: ${states.iterator.map(marked(_)).mkString(", ")} + | pending: ${states.iterator.map(pending(_)).mkString(", ")} + | depleted: ${states.iterator.map(depleted(_)).mkString(", ")} + | completed: ${states.iterator.map(completed(_)).mkString(", ")} + | cancelled: ${states.iterator.map(cancelled(_)).mkString(", ")} + | | mark=$markCount pend=$markedPending depl=$markedDepleted pref=$preferredId""".stripMargin private var preferredId = 0 @@ -81,7 +105,7 @@ private[akka] object FanIn { def cancel(input: Int) = if (!cancelled(input)) { inputs(input).cancel() - cancelled(input) = true + cancelled(input, true) unmarkInput(input) } @@ -93,7 +117,7 @@ private[akka] object FanIn { if (!marked(input)) { if (depleted(input)) markedDepleted += 1 if (pending(input)) markedPending += 1 - marked(input) = true + marked(input, true) markCount += 1 } } @@ -102,7 +126,7 @@ private[akka] object FanIn { if (marked(input)) { if (depleted(input)) markedDepleted -= 1 if (pending(input)) markedPending -= 1 - marked(input) = false + marked(input, false) markCount -= 1 } } @@ -147,11 +171,11 @@ private[akka] object FanIn { val elem = input.dequeueInputElement() if (!input.inputsAvailable) { if (marked(id)) markedPending -= 1 - pending(id) = false + pending(id, false) } if (input.inputsDepleted) { if (marked(id)) markedDepleted += 1 - depleted(id) = true + depleted(id, true) onDepleted(id) } elem @@ -198,15 +222,15 @@ private[akka] object FanIn { inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription)) case OnNext(id, elem) ⇒ if (marked(id) && !pending(id)) markedPending += 1 - pending(id) = true + pending(id, true) inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem)) case OnComplete(id) ⇒ if (!pending(id)) { if (marked(id) && !depleted(id)) markedDepleted += 1 - depleted(id) = true + depleted(id, true) onDepleted(id) } - completed(id) = true + completed(id, true) inputs(id).subreceive(ActorSubscriberMessage.OnComplete) case OnError(id, e) ⇒ onError(id, e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Flows.scala b/akka-stream/src/main/scala/akka/stream/impl/Flows.scala index 8ac6c2a129..673c6dd674 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Flows.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Flows.scala @@ -14,8 +14,8 @@ private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module { if (s == shape) this else throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") - val inPort = new Inlet[In]("Flow.in") - val outPort = new Outlet[Out]("Flow.out") + val inPort = Inlet[In]("Flow.in") + val outPort = Outlet[Out]("Flow.out") override val shape = new FlowShape(inPort, outPort) override def subModules: Set[Module] = Set.empty diff --git a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala index fb280a67cd..93bba67484 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala @@ -5,13 +5,9 @@ package akka.stream.impl import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.FlexiRoute.RouteLogic -import akka.stream.Attributes -import akka.stream.{ Inlet, Outlet, Shape, InPort, OutPort } import akka.stream.scaladsl.FlexiMerge.MergeLogic -import akka.stream.UniformFanInShape -import akka.stream.UniformFanOutShape -import akka.stream.FanOutShape2 import akka.stream.scaladsl.MergePreferred +import akka.stream.{ Attributes, Inlet, Outlet, Shape, InPort, OutPort, UniformFanInShape, UniformFanOutShape, FanOutShape2 } import akka.event.Logging.simpleName /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index d430f783d2..93000e9dc8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -30,20 +30,16 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out // This is okay since the only caller of this method is right below. protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat] - override def carbonCopy: Module = { - val out = new Outlet[Out](shape.outlet.toString) - newInstance(SourceShape(out)) - } + override def carbonCopy: Module = newInstance(SourceShape(shape.outlet.carbonCopy())) override def subModules: Set[Module] = Set.empty - def amendShape(attr: Attributes): SourceShape[Out] = { + def amendShape(attr: Attributes): SourceShape[Out] = attr.nameOption match { case None ⇒ shape case s: Some[String] if s == attributes.nameOption ⇒ shape - case Some(name) ⇒ shape.copy(outlet = new Outlet(name + ".out")) + case Some(name) ⇒ shape.copy(outlet = Outlet(name + ".out")) } - } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 924ce74e4b..3649c7e312 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -32,10 +32,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte // This is okay since we the only caller of this method is right below. protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat] - override def carbonCopy: Module = { - val in = new Inlet[In](shape.inlet.toString) - newInstance(SinkShape(in)) - } + override def carbonCopy: Module = newInstance(SinkShape(shape.inlet.carbonCopy())) override def subModules: Set[Module] = Set.empty @@ -43,7 +40,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte attr.nameOption match { case None ⇒ shape case s: Some[String] if s == attributes.nameOption ⇒ shape - case Some(name) ⇒ shape.copy(inlet = new Inlet(name + ".in")) + case Some(name) ⇒ shape.copy(inlet = Inlet(name + ".in")) } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index e20f1d753b..366a27302f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -300,12 +300,7 @@ private[stream] object VirtualProcessor { case object Completed extends Termination case class Failed(ex: Throwable) extends Termination - private object InertSubscriber extends Subscriber[Any] { - override def onSubscribe(s: Subscription): Unit = s.cancel() - override def onNext(elem: Any): Unit = () - override def onError(thr: Throwable): Unit = () - override def onComplete(): Unit = () - } + private val InertSubscriber = new CancellingSubscriber[Any] } private[stream] final class VirtualProcessor[T] extends Processor[T, T] { @@ -403,12 +398,12 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { * INTERNAL API */ private[stream] final case class MaterializedValueSource[M]( - shape: SourceShape[M] = SourceShape[M](new Outlet[M]("Materialized.out")), + shape: SourceShape[M] = SourceShape[M](Outlet[M]("Materialized.out")), attributes: Attributes = Attributes.name("Materialized")) extends StreamLayout.Module { override def subModules: Set[Module] = Set.empty override def withAttributes(attr: Attributes): Module = this.copy(shape = amendShape(attr), attributes = attr) - override def carbonCopy: Module = this.copy(shape = SourceShape(new Outlet[M]("Materialized.out"))) + override def carbonCopy: Module = this.copy(shape = SourceShape(Outlet[M]("Materialized.out"))) override def replaceShape(s: Shape): Module = if (s == shape) this @@ -418,7 +413,7 @@ private[stream] final case class MaterializedValueSource[M]( attr.nameOption match { case None ⇒ shape case s: Some[String] if s == attributes.nameOption ⇒ shape - case Some(name) ⇒ shape.copy(outlet = new Outlet(name + ".out")) + case Some(name) ⇒ shape.copy(outlet = Outlet(name + ".out")) } } diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index afd762d159..4121679292 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -124,10 +124,10 @@ object SslTls { private[akka] object TlsModule { def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing): TlsModule = { val name = attributes.nameOrDefault(s"StreamTls($role)") - val cipherIn = new Inlet[ByteString](s"$name.cipherIn") - val cipherOut = new Outlet[ByteString](s"$name.cipherOut") - val plainIn = new Inlet[SslTlsOutbound](s"$name.transportIn") - val plainOut = new Outlet[SslTlsInbound](s"$name.transportOut") + val cipherIn = Inlet[ByteString](s"$name.cipherIn") + val cipherOut = Outlet[ByteString](s"$name.cipherOut") + val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn") + val plainOut = Outlet[SslTlsInbound](s"$name.transportOut") val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut) TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing) } diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala index c66f9b124f..ab549a5b5c 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -4,15 +4,13 @@ package akka.stream.io import java.io.File -import akka.stream.impl.io.SynchronousFileSource import akka.stream.scaladsl.Source import akka.stream.{ ActorAttributes, Attributes, javadsl } import akka.util.ByteString - import scala.concurrent.Future object SynchronousFileSource { - + import akka.stream.impl.io.SynchronousFileSource final val DefaultChunkSize = 8192 final val DefaultAttributes = Attributes.name("synchronousFileSource") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala index 4661e9484a..5bcd43588c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala @@ -17,11 +17,9 @@ object FlattenStrategy { * emitting its elements directly to the output until it completes and then taking the next stream. This has the * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. */ - def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat[T, U]() - + def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, U], T]] /** * INTERNAL API */ - private[akka] final case class Concat[T, U]() extends FlattenStrategy[Source[T, U], T] - + private[akka] final case object Concat extends FlattenStrategy[Any, Nothing] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala index 3d587c5635..8ff8a1ca67 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala @@ -15,7 +15,7 @@ object FlattenStrategy { * emitting its elements directly to the output until it completes and then taking the next stream. This has the * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. */ - def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat[T]() + def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, Any], T]] - private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, Any], T] + private[akka] final case object Concat extends FlattenStrategy[Any, Nothing] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index dac2cc548b..5acd5f817e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -245,7 +245,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** INTERNAL API */ override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = { //No need to copy here, op is a fresh instance - if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]] + if (op.isInstanceOf[Stages.Identity]) this.asInstanceOf[Repr[U, Mat]] + else if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]] else new Flow(module.growConnect(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort))) } @@ -288,7 +289,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) object Flow extends FlowApply { - private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(new Inlet(name + ".in"), new Outlet(name + ".out")) + private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(Inlet(name + ".in"), Outlet(name + ".out")) /** * Helper to create `Flow` without a [[Source]] or a [[Sink]]. @@ -311,6 +312,7 @@ object Flow extends FlowApply { */ def wrap[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) ⇒ M): Flow[I, O, M] = Flow(sink, source)(f) { implicit b ⇒ (in, out) ⇒ (in.inlet, out.outlet) } + } /** @@ -922,7 +924,7 @@ trait FlowOps[+Out, +Mat] { * */ def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { - case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out, _] ⇒ andThen(ConcatAll()) + case scaladsl.FlattenStrategy.Concat | javadsl.FlattenStrategy.Concat ⇒ andThen(ConcatAll()) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 2a5e05cad7..dba25efe9c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -693,7 +693,5 @@ object FlowGraph extends GraphApply { implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] { override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet } - } - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala index 8537ae93cc..873d59e687 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala @@ -3,8 +3,6 @@ */ package akka.stream.scaladsl -import akka.stream.impl.{ PublisherSink, SubscriberSource, StreamLayout } - /** * Convenience functions for often-encountered purposes like keeping only the * left (first) or only the right (second) of two input values. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 6a64662cf3..3804ceb495 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -47,7 +47,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) object Sink extends SinkApply { /** INTERNAL API */ - private[stream] def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in")) + private[stream] def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in")) /** * A graph with the shape of a sink logically is a sink, this method makes diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 4fbf948f47..ee1ffbfd16 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -172,7 +172,7 @@ object Source extends SourceApply { new Source(module) /** INTERNAL API */ - private[stream] def shape[T](name: String): SourceShape[T] = SourceShape(new Outlet(name + ".out")) + private[stream] def shape[T](name: String): SourceShape[T] = SourceShape(Outlet(name + ".out")) /** * Helper to create [[Source]] from `Publisher`. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 622288005f..fdc30ea3c4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -140,7 +140,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { halfClose: Boolean = false, idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { new Source(new BindSource(new InetSocketAddress(interface, port), backlog, options, halfClose, idleTimeout, - Attributes.none, SourceShape(new Outlet("BindSource.out")))) + Attributes.none, SourceShape(Outlet("BindSource.out")))) } /** diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 2306a11b0e..e29cc8ca0d 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -60,36 +60,40 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C * INTERNAL API */ private[stream] def enterAndPush(elem: Out): Unit = { - context.enter() - context.push(elem) - context.execute() + val c = context + c.enter() + c.push(elem) + c.execute() } /** * INTERNAL API */ private[stream] def enterAndPull(): Unit = { - context.enter() - context.pull() - context.execute() + val c = context + c.enter() + c.pull() + c.execute() } /** * INTERNAL API */ private[stream] def enterAndFinish(): Unit = { - context.enter() - context.finish() - context.execute() + val c = context + c.enter() + c.finish() + c.execute() } /** * INTERNAL API */ private[stream] def enterAndFail(e: Throwable): Unit = { - context.enter() - context.fail(e) - context.execute() + val c = context + c.enter() + c.fail(e) + c.execute() } /** @@ -186,7 +190,6 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C * if there are any state that should be cleared before restarting, e.g. by returning a new instance. */ def restart(): Stage[In, Out] = this - } /** @@ -341,8 +344,8 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] { */ abstract class State extends StageState[In, Out] - private var emitting = false - private var _current: StageState[In, Out] = _ + private[this] var emitting = false + private[this] var _current: StageState[In, Out] = _ become(initial) /**