=str - Various internal Akka Stream improvements

* Gives Inlets and Outlets a `carbonCopy` method and switches to allocate them via `apply`
* Removes 4 Array allocations per FanIn and uses a bitmasked array instead
* Makes the FlattenStrategy.concat instance a singleton
This commit is contained in:
Viktor Klang 2015-06-13 16:28:38 -04:00
parent 65a425fe0b
commit 2725bfc044
30 changed files with 159 additions and 134 deletions

View file

@ -131,9 +131,9 @@ class FlowGraphDocSpec extends AkkaSpec {
// A Shape must be able to create a copy of itself. Basically
// it means a new instance with copies of the ports
override def deepCopy() = PriorityWorkerPoolShape(
new Inlet[In](jobsIn.toString),
new Inlet[In](priorityJobsIn.toString),
new Outlet[Out](resultsOut.toString))
jobsIn.carbonCopy(),
priorityJobsIn.carbonCopy(),
resultsOut.carbonCopy())
// A Shape must also be able to create itself from existing ports
override def copyFromPorts(

View file

@ -28,9 +28,9 @@ private object PoolConductor {
override def deepCopy(): Shape =
Ports(
new Inlet(requestIn.toString),
new Inlet(slotEventIn.toString),
slotOuts.map(o new Outlet(o.toString)))
requestIn.carbonCopy(),
slotEventIn.carbonCopy(),
slotOuts.map(_.carbonCopy()))
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape =
Ports(

View file

@ -32,6 +32,6 @@ private[http] object MessageToFrameRenderer {
case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream)
case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8"))
case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder))
}.flatten(FlattenStrategy.Concat())
}.flatten(FlattenStrategy.concat)
}
}

View file

@ -218,9 +218,9 @@ private[http] object StreamUtils {
}
def oneTimePublisherSink[In](cell: OneTimeWriteCell[Publisher[In]], name: String): Sink[In, Publisher[In]] =
new Sink[In, Publisher[In]](new OneTimePublisherSink(none, SinkShape(new Inlet(name)), cell))
new Sink[In, Publisher[In]](new OneTimePublisherSink(none, SinkShape(Inlet(name)), cell))
def oneTimeSubscriberSource[Out](cell: OneTimeWriteCell[Subscriber[Out]], name: String): Source[Out, Subscriber[Out]] =
new Source[Out, Subscriber[Out]](new OneTimeSubscriberSource(none, SourceShape(new Outlet(name)), cell))
new Source[Out, Subscriber[Out]](new OneTimeSubscriberSource(none, SourceShape(Outlet(name)), cell))
/** A copy of PublisherSink that allows access to the publisher through the cell but can only materialized once */
private class OneTimePublisherSink[In](attributes: Attributes, shape: SinkShape[In], cell: OneTimeWriteCell[Publisher[In]])

View file

@ -17,6 +17,6 @@ object TestSink {
/**
* A Sink that materialized to a [[TestSubscriber.Probe]].
*/
def probe[T]()(implicit system: ActorSystem) = new Sink[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(new Inlet("ProbeSink.in"))))
def probe[T]()(implicit system: ActorSystem) = new Sink[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(Inlet("ProbeSink.in"))))
}

View file

@ -19,6 +19,6 @@ object TestSource {
/**
* A Source that materializes to a [[TestPublisher.Probe]].
*/
def probe[T]()(implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(new Outlet("ProbeSource.out"))))
def probe[T]()(implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out"))))
}

View file

@ -12,7 +12,7 @@ class StreamLayoutSpec extends AkkaSpec {
import StreamLayout._
def testAtomic(inPortCount: Int, outPortCount: Int): Module = new Module {
override val shape = AmorphousShape(List.fill(inPortCount)(new Inlet("")), List.fill(outPortCount)(new Outlet("")))
override val shape = AmorphousShape(List.fill(inPortCount)(Inlet("")), List.fill(outPortCount)(Outlet("")))
override def replaceShape(s: Shape): Module = ???
override def subModules: Set[Module] = Set.empty

View file

@ -21,8 +21,8 @@ object GraphOpsIntegrationSpec {
override def outlets: immutable.Seq[Outlet[_]] = List(out1, out2)
override def deepCopy() = ShufflePorts(
new Inlet[In](in1.toString), new Inlet[In](in2.toString),
new Outlet[Out](out1.toString), new Outlet[Out](out2.toString))
in1.carbonCopy(), in2.carbonCopy(),
out1.carbonCopy(), out2.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): ShufflePorts[In, Out] = {
assert(inlets.size == this.inlets.size)
assert(outlets.size == this.outlets.size)

View file

@ -7,18 +7,24 @@ import scala.collection.immutable
import scala.annotation.varargs
object FanInShape {
sealed trait Init[+O]
case class Name(name: String) extends Init[Nothing]
case class Ports[O](out: Outlet[O], ins: immutable.Seq[Inlet[_]]) extends Init[O]
sealed trait Init[+O] {
def outlet: Outlet[O]
def inlets: immutable.Seq[Inlet[_]]
def name: String
}
final case class Name(override val name: String) extends Init[Nothing] {
override def outlet: Outlet[Nothing] = Outlet(s"$name.out")
override def inlets: immutable.Seq[Inlet[_]] = Nil
}
final case class Ports[O](override val outlet: Outlet[O], override val inlets: immutable.Seq[Inlet[_]]) extends Init[O] {
override def name: String = "FanIn"
}
}
abstract class FanInShape[O](init: FanInShape.Init[O]) extends Shape {
abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inlet[_]], _name: String) extends Shape {
import FanInShape._
final private[this] val (_out, _registered, _name) = init match {
case Name(name) => (new Outlet[O](s"$name.out"), Nil.iterator, name)
case Ports(o, it) => (o, it.iterator, "FanIn")
}
def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name)
final def out: Outlet[O] = _out
final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil
@ -26,14 +32,14 @@ abstract class FanInShape[O](init: FanInShape.Init[O]) extends Shape {
private var _inlets: Vector[Inlet[_]] = Vector.empty
protected def newInlet[T](name: String): Inlet[T] = {
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else new Inlet[T](s"${_name}.$name")
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else Inlet[T](s"${_name}.$name")
_inlets :+= p
p
}
protected def construct(init: Init[O]): FanInShape[O]
def deepCopy(): FanInShape[O] = construct(Ports[O](new Outlet(_out.toString), inlets.map(i => new Inlet(i.toString))))
def deepCopy(): FanInShape[O] = construct(Ports[O](_out.carbonCopy(), inlets.map(_.carbonCopy())))
final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = {
require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanInShape")
require(inlets.size == _inlets.size, s"proposed inlets [${inlets.mkString(", ")}] do not fit FanInShape")

View file

@ -6,18 +6,24 @@ package akka.stream
import scala.collection.immutable
object FanOutShape {
sealed trait Init[I]
case class Name[I](name: String) extends Init[I]
case class Ports[I](in: Inlet[I], outs: immutable.Seq[Outlet[_]]) extends Init[I]
sealed trait Init[I] {
def inlet: Inlet[I]
def outlets: immutable.Seq[Outlet[_]]
def name: String
}
final case class Name[I](override val name: String) extends Init[I] {
override def inlet: Inlet[I] = Inlet(s"$name.in")
override def outlets: immutable.Seq[Outlet[_]] = Nil
}
final case class Ports[I](override val inlet: Inlet[I], override val outlets: immutable.Seq[Outlet[_]]) extends Init[I] {
override def name: String = "FanOut"
}
}
abstract class FanOutShape[I](init: FanOutShape.Init[I]) extends Shape {
abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outlet[_]], _name: String) extends Shape {
import FanOutShape._
final private[this] val (_in, _registered, _name) = init match {
case Name(name) => (new Inlet[I](s"$name.in"), Nil.iterator, name)
case Ports(o, it) => (o, it.iterator, "FanOut")
}
def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name)
final def in: Inlet[I] = _in
final override def outlets: immutable.Seq[Outlet[_]] = _outlets
@ -25,14 +31,14 @@ abstract class FanOutShape[I](init: FanOutShape.Init[I]) extends Shape {
private var _outlets: Vector[Outlet[_]] = Vector.empty
protected def newOutlet[T](name: String): Outlet[T] = {
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Outlet[T]] else new Outlet[T](s"${_name}.$name")
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Outlet[T]] else Outlet[T](s"${_name}.$name")
_outlets :+= p
p
}
protected def construct(init: Init[I]): FanOutShape[I]
def deepCopy(): FanOutShape[I] = construct(Ports[I](new Inlet(_in.toString), outlets.map(i => new Outlet(i.toString))))
def deepCopy(): FanOutShape[I] = construct(Ports[I](_in.carbonCopy(), outlets.map(_.carbonCopy())))
final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanOutShape[I] = {
require(outlets.size == _outlets.size, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanOutShape")
require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit FanOutShape")

View file

@ -26,13 +26,26 @@ sealed abstract class OutPort
* is the InPort (which does not bear an element type because Modules only
* express the internal structural hierarchy of stream topologies).
*/
final class Inlet[-T](override val toString: String) extends InPort
object Inlet {
def apply[T](toString: String): Inlet[T] = new Inlet[T](toString)
}
final class Inlet[-T] private (override val toString: String) extends InPort {
def carbonCopy(): Inlet[T] = Inlet(toString)
}
/**
* An Outlet is a typed output to a Shape. Its partner in the Module view
* is the OutPort (which does not bear an element type because Modules only
* express the internal structural hierarchy of stream topologies).
*/
final class Outlet[+T](override val toString: String) extends OutPort
object Outlet {
def apply[T](toString: String): Outlet[T] = new Outlet[T](toString)
}
final class Outlet[+T] private (override val toString: String) extends OutPort {
def carbonCopy(): Outlet[T] = Outlet(toString)
}
/**
* A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
@ -148,9 +161,7 @@ object ClosedShape extends ClosedShape {
* meaningful type of Shape when the building is finished.
*/
case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]) extends Shape {
override def deepCopy() = AmorphousShape(
inlets.map(i new Inlet[Any](i.toString)),
outlets.map(o new Outlet[Any](o.toString)))
override def deepCopy() = AmorphousShape(inlets.map(_.carbonCopy()), outlets.map(_.carbonCopy()))
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = AmorphousShape(inlets, outlets)
}
@ -162,7 +173,7 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = Nil
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
override def deepCopy(): SourceShape[T] = SourceShape(new Outlet(outlet.toString))
override def deepCopy(): SourceShape[T] = SourceShape(outlet.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit SourceShape")
require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit SourceShape")
@ -179,7 +190,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
override def deepCopy(): FlowShape[I, O] = FlowShape(new Inlet(inlet.toString), new Outlet(outlet.toString))
override def deepCopy(): FlowShape[I, O] = FlowShape(inlet.carbonCopy(), outlet.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit FlowShape")
require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FlowShape")
@ -194,7 +205,7 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = Nil
override def deepCopy(): SinkShape[T] = SinkShape(new Inlet(inlet.toString))
override def deepCopy(): SinkShape[T] = SinkShape(inlet.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit SinkShape")
require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit SinkShape")
@ -229,7 +240,7 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1],
def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.inlet, top.outlet, bottom.inlet, bottom.outlet)
override def deepCopy(): BidiShape[In1, Out1, In2, Out2] =
BidiShape(new Inlet(in1.toString), new Outlet(out1.toString), new Inlet(in2.toString), new Outlet(out2.toString))
BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.size == 2, s"proposed inlets [${inlets.mkString(", ")}] do not fit BidiShape")
require(outlets.size == 2, s"proposed outlets [${outlets.mkString(", ")}] do not fit BidiShape")

View file

@ -128,9 +128,8 @@ private[akka] case class ActorMaterializerImpl(
case Identity(attr) (new VirtualProcessor, ())
case _
val (opprops, mat) = ActorProcessorFactory.props(ActorMaterializerImpl.this, op, effectiveAttributes)
val processor = ActorProcessorFactory[Any, Any](
actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher))
processor -> mat
ActorProcessorFactory[Any, Any](
actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat
}
private def materializeJunction(op: JunctionModule,

View file

@ -92,9 +92,7 @@ private[akka] final class CancellingSubscriber[T] extends Subscriber[T] {
private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try {
ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "Publisher")
} catch {
try rejectAdditionalSubscriber(subscriber, "Publisher") catch {
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]

View file

@ -37,6 +37,13 @@ private[akka] object FanIn {
}
}
type State = Byte
final val Marked = 1
final val Pending = 2
final val Depleted = 4
final val Completed = 8
final val Cancelled = 16
abstract class InputBunch(inputCount: Int, bufferSize: Int, pump: Pump) {
private var allCancelled = false
@ -46,22 +53,39 @@ private[akka] object FanIn {
}
}
private val marked = Array.ofDim[Boolean](inputCount)
private[this] final val states = Array.ofDim[State](inputCount)
private var markCount = 0
private val pending = Array.ofDim[Boolean](inputCount)
private var markedPending = 0
private val depleted = Array.ofDim[Boolean](inputCount)
private val completed = Array.ofDim[Boolean](inputCount)
private var markedDepleted = 0
private val cancelled = Array.ofDim[Boolean](inputCount)
private[this] final def hasState(index: Int, flag: Int): Boolean =
(states(index) & flag) != 0
private[this] final def setState(index: Int, flag: Int, on: Boolean): Unit =
states(index) = if (on) (states(index) | flag).toByte else (states(index) & ~flag).toByte
private[this] final def cancelled(index: Int): Boolean = hasState(index, Cancelled)
private[this] final def cancelled(index: Int, on: Boolean): Unit = setState(index, Cancelled, on)
private[this] final def completed(index: Int): Boolean = hasState(index, Completed)
private[this] final def completed(index: Int, on: Boolean): Unit = setState(index, Completed, on)
private[this] final def depleted(index: Int): Boolean = hasState(index, Depleted)
private[this] final def depleted(index: Int, on: Boolean): Unit = setState(index, Depleted, on)
private[this] final def pending(index: Int): Boolean = hasState(index, Pending)
private[this] final def pending(index: Int, on: Boolean): Unit = setState(index, Pending, on)
private[this] final def marked(index: Int): Boolean = hasState(index, Marked)
private[this] final def marked(index: Int, on: Boolean): Unit = setState(index, Marked, on)
override def toString: String =
s"""|InputBunch
| marked: ${marked.mkString(", ")}
| pending: ${pending.mkString(", ")}
| depleted: ${depleted.mkString(", ")}
| completed: ${completed.mkString(", ")}
| cancelled: ${cancelled.mkString(", ")}
| marked: ${states.iterator.map(marked(_)).mkString(", ")}
| pending: ${states.iterator.map(pending(_)).mkString(", ")}
| depleted: ${states.iterator.map(depleted(_)).mkString(", ")}
| completed: ${states.iterator.map(completed(_)).mkString(", ")}
| cancelled: ${states.iterator.map(cancelled(_)).mkString(", ")}
|
| mark=$markCount pend=$markedPending depl=$markedDepleted pref=$preferredId""".stripMargin
private var preferredId = 0
@ -81,7 +105,7 @@ private[akka] object FanIn {
def cancel(input: Int) =
if (!cancelled(input)) {
inputs(input).cancel()
cancelled(input) = true
cancelled(input, true)
unmarkInput(input)
}
@ -93,7 +117,7 @@ private[akka] object FanIn {
if (!marked(input)) {
if (depleted(input)) markedDepleted += 1
if (pending(input)) markedPending += 1
marked(input) = true
marked(input, true)
markCount += 1
}
}
@ -102,7 +126,7 @@ private[akka] object FanIn {
if (marked(input)) {
if (depleted(input)) markedDepleted -= 1
if (pending(input)) markedPending -= 1
marked(input) = false
marked(input, false)
markCount -= 1
}
}
@ -147,11 +171,11 @@ private[akka] object FanIn {
val elem = input.dequeueInputElement()
if (!input.inputsAvailable) {
if (marked(id)) markedPending -= 1
pending(id) = false
pending(id, false)
}
if (input.inputsDepleted) {
if (marked(id)) markedDepleted += 1
depleted(id) = true
depleted(id, true)
onDepleted(id)
}
elem
@ -198,15 +222,15 @@ private[akka] object FanIn {
inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription))
case OnNext(id, elem)
if (marked(id) && !pending(id)) markedPending += 1
pending(id) = true
pending(id, true)
inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem))
case OnComplete(id)
if (!pending(id)) {
if (marked(id) && !depleted(id)) markedDepleted += 1
depleted(id) = true
depleted(id, true)
onDepleted(id)
}
completed(id) = true
completed(id, true)
inputs(id).subreceive(ActorSubscriberMessage.OnComplete)
case OnError(id, e)
onError(id, e)

View file

@ -14,8 +14,8 @@ private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module {
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
val inPort = new Inlet[In]("Flow.in")
val outPort = new Outlet[Out]("Flow.out")
val inPort = Inlet[In]("Flow.in")
val outPort = Outlet[Out]("Flow.out")
override val shape = new FlowShape(inPort, outPort)
override def subModules: Set[Module] = Set.empty

View file

@ -5,13 +5,9 @@ package akka.stream.impl
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.FlexiRoute.RouteLogic
import akka.stream.Attributes
import akka.stream.{ Inlet, Outlet, Shape, InPort, OutPort }
import akka.stream.scaladsl.FlexiMerge.MergeLogic
import akka.stream.UniformFanInShape
import akka.stream.UniformFanOutShape
import akka.stream.FanOutShape2
import akka.stream.scaladsl.MergePreferred
import akka.stream.{ Attributes, Inlet, Outlet, Shape, InPort, OutPort, UniformFanInShape, UniformFanOutShape, FanOutShape2 }
import akka.event.Logging.simpleName
/**

View file

@ -30,20 +30,16 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out
// This is okay since the only caller of this method is right below.
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]
override def carbonCopy: Module = {
val out = new Outlet[Out](shape.outlet.toString)
newInstance(SourceShape(out))
}
override def carbonCopy: Module = newInstance(SourceShape(shape.outlet.carbonCopy()))
override def subModules: Set[Module] = Set.empty
def amendShape(attr: Attributes): SourceShape[Out] = {
def amendShape(attr: Attributes): SourceShape[Out] =
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = new Outlet(name + ".out"))
case Some(name) shape.copy(outlet = Outlet(name + ".out"))
}
}
}

View file

@ -32,10 +32,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
// This is okay since we the only caller of this method is right below.
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
override def carbonCopy: Module = {
val in = new Inlet[In](shape.inlet.toString)
newInstance(SinkShape(in))
}
override def carbonCopy: Module = newInstance(SinkShape(shape.inlet.carbonCopy()))
override def subModules: Set[Module] = Set.empty
@ -43,7 +40,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(inlet = new Inlet(name + ".in"))
case Some(name) shape.copy(inlet = Inlet(name + ".in"))
}
}
}

View file

@ -300,12 +300,7 @@ private[stream] object VirtualProcessor {
case object Completed extends Termination
case class Failed(ex: Throwable) extends Termination
private object InertSubscriber extends Subscriber[Any] {
override def onSubscribe(s: Subscription): Unit = s.cancel()
override def onNext(elem: Any): Unit = ()
override def onError(thr: Throwable): Unit = ()
override def onComplete(): Unit = ()
}
private val InertSubscriber = new CancellingSubscriber[Any]
}
private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
@ -403,12 +398,12 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
* INTERNAL API
*/
private[stream] final case class MaterializedValueSource[M](
shape: SourceShape[M] = SourceShape[M](new Outlet[M]("Materialized.out")),
shape: SourceShape[M] = SourceShape[M](Outlet[M]("Materialized.out")),
attributes: Attributes = Attributes.name("Materialized")) extends StreamLayout.Module {
override def subModules: Set[Module] = Set.empty
override def withAttributes(attr: Attributes): Module = this.copy(shape = amendShape(attr), attributes = attr)
override def carbonCopy: Module = this.copy(shape = SourceShape(new Outlet[M]("Materialized.out")))
override def carbonCopy: Module = this.copy(shape = SourceShape(Outlet[M]("Materialized.out")))
override def replaceShape(s: Shape): Module =
if (s == shape) this
@ -418,7 +413,7 @@ private[stream] final case class MaterializedValueSource[M](
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = new Outlet(name + ".out"))
case Some(name) shape.copy(outlet = Outlet(name + ".out"))
}
}

View file

@ -124,10 +124,10 @@ object SslTls {
private[akka] object TlsModule {
def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing): TlsModule = {
val name = attributes.nameOrDefault(s"StreamTls($role)")
val cipherIn = new Inlet[ByteString](s"$name.cipherIn")
val cipherOut = new Outlet[ByteString](s"$name.cipherOut")
val plainIn = new Inlet[SslTlsOutbound](s"$name.transportIn")
val plainOut = new Outlet[SslTlsInbound](s"$name.transportOut")
val cipherIn = Inlet[ByteString](s"$name.cipherIn")
val cipherOut = Outlet[ByteString](s"$name.cipherOut")
val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn")
val plainOut = Outlet[SslTlsInbound](s"$name.transportOut")
val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut)
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing)
}

View file

@ -4,15 +4,13 @@
package akka.stream.io
import java.io.File
import akka.stream.impl.io.SynchronousFileSource
import akka.stream.scaladsl.Source
import akka.stream.{ ActorAttributes, Attributes, javadsl }
import akka.util.ByteString
import scala.concurrent.Future
object SynchronousFileSource {
import akka.stream.impl.io.SynchronousFileSource
final val DefaultChunkSize = 8192
final val DefaultAttributes = Attributes.name("synchronousFileSource")

View file

@ -17,11 +17,9 @@ object FlattenStrategy {
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/
def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat[T, U]()
def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, U], T]]
/**
* INTERNAL API
*/
private[akka] final case class Concat[T, U]() extends FlattenStrategy[Source[T, U], T]
private[akka] final case object Concat extends FlattenStrategy[Any, Nothing]
}

View file

@ -15,7 +15,7 @@ object FlattenStrategy {
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/
def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat[T]()
def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, Any], T]]
private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, Any], T]
private[akka] final case object Concat extends FlattenStrategy[Any, Nothing]
}

View file

@ -245,7 +245,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/** INTERNAL API */
override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = {
//No need to copy here, op is a fresh instance
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]]
if (op.isInstanceOf[Stages.Identity]) this.asInstanceOf[Repr[U, Mat]]
else if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]]
else new Flow(module.growConnect(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort)))
}
@ -288,7 +289,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
object Flow extends FlowApply {
private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(new Inlet(name + ".in"), new Outlet(name + ".out"))
private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(Inlet(name + ".in"), Outlet(name + ".out"))
/**
* Helper to create `Flow` without a [[Source]] or a [[Sink]].
@ -311,6 +312,7 @@ object Flow extends FlowApply {
*/
def wrap[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) M): Flow[I, O, M] =
Flow(sink, source)(f) { implicit b (in, out) (in.inlet, out.outlet) }
}
/**
@ -922,7 +924,7 @@ trait FlowOps[+Out, +Mat] {
*
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match {
case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out, _] andThen(ConcatAll())
case scaladsl.FlattenStrategy.Concat | javadsl.FlattenStrategy.Concat andThen(ConcatAll())
case _
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
}

View file

@ -693,7 +693,5 @@ object FlowGraph extends GraphApply {
implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet
}
}
}

View file

@ -3,8 +3,6 @@
*/
package akka.stream.scaladsl
import akka.stream.impl.{ PublisherSink, SubscriberSource, StreamLayout }
/**
* Convenience functions for often-encountered purposes like keeping only the
* left (first) or only the right (second) of two input values.

View file

@ -47,7 +47,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
object Sink extends SinkApply {
/** INTERNAL API */
private[stream] def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in"))
private[stream] def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in"))
/**
* A graph with the shape of a sink logically is a sink, this method makes

View file

@ -172,7 +172,7 @@ object Source extends SourceApply {
new Source(module)
/** INTERNAL API */
private[stream] def shape[T](name: String): SourceShape[T] = SourceShape(new Outlet(name + ".out"))
private[stream] def shape[T](name: String): SourceShape[T] = SourceShape(Outlet(name + ".out"))
/**
* Helper to create [[Source]] from `Publisher`.

View file

@ -140,7 +140,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
halfClose: Boolean = false,
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = {
new Source(new BindSource(new InetSocketAddress(interface, port), backlog, options, halfClose, idleTimeout,
Attributes.none, SourceShape(new Outlet("BindSource.out"))))
Attributes.none, SourceShape(Outlet("BindSource.out"))))
}
/**

View file

@ -60,36 +60,40 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
* INTERNAL API
*/
private[stream] def enterAndPush(elem: Out): Unit = {
context.enter()
context.push(elem)
context.execute()
val c = context
c.enter()
c.push(elem)
c.execute()
}
/**
* INTERNAL API
*/
private[stream] def enterAndPull(): Unit = {
context.enter()
context.pull()
context.execute()
val c = context
c.enter()
c.pull()
c.execute()
}
/**
* INTERNAL API
*/
private[stream] def enterAndFinish(): Unit = {
context.enter()
context.finish()
context.execute()
val c = context
c.enter()
c.finish()
c.execute()
}
/**
* INTERNAL API
*/
private[stream] def enterAndFail(e: Throwable): Unit = {
context.enter()
context.fail(e)
context.execute()
val c = context
c.enter()
c.fail(e)
c.execute()
}
/**
@ -186,7 +190,6 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
* if there are any state that should be cleared before restarting, e.g. by returning a new instance.
*/
def restart(): Stage[In, Out] = this
}
/**
@ -341,8 +344,8 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
*/
abstract class State extends StageState[In, Out]
private var emitting = false
private var _current: StageState[In, Out] = _
private[this] var emitting = false
private[this] var _current: StageState[In, Out] = _
become(initial)
/**