=str #16380 Cleaned up Source/Sink/Key overlap

This commit is contained in:
Björn Antonsson 2014-12-02 16:38:14 +01:00
parent fb7fc2054d
commit 3e14cd05a9
23 changed files with 136 additions and 142 deletions

View file

@ -218,4 +218,4 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): HttpExt =
new HttpExt(system.settings.config getConfig "akka.http")(system)
}
}

View file

@ -19,7 +19,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
val sSinkClass = classOf[akka.stream.scaladsl.Sink[_]]
val jSinkClass = classOf[akka.stream.javadsl.Sink[_]]
val sKeyClass = classOf[akka.stream.scaladsl.Key]
val sKeyClass = classOf[akka.stream.scaladsl.Key[_]]
val jKeyClass = classOf[akka.stream.javadsl.Key[_]]
val sMaterializedMapClass = classOf[akka.stream.scaladsl.MaterializedMap]

View file

@ -35,19 +35,19 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) ::
(classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) ::
(classOf[akka.stream.scaladsl.Source[_]], classOf[akka.stream.javadsl.Source[_]]) ::
(classOf[akka.stream.scaladsl.KeyedSource[_]], classOf[akka.stream.javadsl.KeyedSource[_, _]]) ::
(classOf[akka.stream.scaladsl.KeyedSource[_, _]], classOf[akka.stream.javadsl.KeyedSource[_, _]]) ::
(classOf[akka.stream.scaladsl.Sink[_]], classOf[akka.stream.javadsl.Sink[_]]) ::
(classOf[akka.stream.scaladsl.KeyedSink[_]], classOf[akka.stream.javadsl.KeyedSink[_, _]]) ::
(classOf[akka.stream.scaladsl.KeyedSink[_, _]], classOf[akka.stream.javadsl.KeyedSink[_, _]]) ::
(classOf[akka.stream.scaladsl.Flow[_, _]], classOf[akka.stream.javadsl.Flow[_, _]]) ::
(classOf[akka.stream.scaladsl.FlowGraph], classOf[akka.stream.javadsl.FlowGraph]) ::
(classOf[akka.stream.scaladsl.PartialFlowGraph], classOf[akka.stream.javadsl.PartialFlowGraph]) ::
Nil
// format: ON
val sKeyedSource = classOf[scaladsl.KeyedSource[_]]
val sKeyedSource = classOf[scaladsl.KeyedSource[_, _]]
val jKeyedSource = classOf[javadsl.KeyedSource[_, _]]
val sKeyedSink = classOf[scaladsl.KeyedSink[_]]
val sKeyedSink = classOf[scaladsl.KeyedSink[_, _]]
val jKeyedSink = classOf[javadsl.KeyedSink[_, _]]
val sSource = classOf[scaladsl.Source[_]]

View file

@ -45,8 +45,8 @@ class FlowGraphInitSpec extends AkkaSpec {
val s = Source(1 to 5)
val b = Broadcast[Int]
val sink: KeyedSink[Int] = Sink.foreach[Int](_ ())
val otherSink: KeyedSink[Int] = Sink.foreach[Int](i 2 * i)
val sink = Sink.foreach[Int](_ ())
val otherSink = Sink.foreach[Int](i 2 * i)
FlowGraph { implicit builder
import FlowGraphImplicits._
@ -84,8 +84,8 @@ class FlowGraphInitSpec extends AkkaSpec {
val s = Sink.ignore
val m = Merge[Int]
val source1: KeyedSource[Int] = Source.subscriber
val source2: KeyedSource[Int] = Source.subscriber
val source1 = Source.subscriber[Int]
val source2 = Source.subscriber[Int]
FlowGraph { implicit builder
import FlowGraphImplicits._

View file

@ -74,12 +74,10 @@ class SourceSpec extends AkkaSpec {
"Source with additional keys" must {
"materialize keys properly" in {
val ks = Source.subscriber[Int]
val mk1 = new Key {
override type MaterializedType = String
val mk1 = new Key[String] {
override def materialize(map: MaterializedMap) = map.get(ks).toString
}
val mk2 = new Key {
override type MaterializedType = String
val mk2 = new Key[String] {
override def materialize(map: MaterializedMap) = map.get(mk1).toUpperCase
}
val sp = StreamTestKit.SubscriberProbe[Int]()
@ -97,12 +95,10 @@ class SourceSpec extends AkkaSpec {
"materialize keys properly when used in a graph" in {
val ks = Source.subscriber[Int]
val mk1 = new Key {
override type MaterializedType = String
val mk1 = new Key[String] {
override def materialize(map: MaterializedMap) = map.get(ks).toString
}
val mk2 = new Key {
override type MaterializedType = String
val mk2 = new Key[String] {
override def materialize(map: MaterializedMap) = map.get(mk1).toUpperCase
}
val sp = StreamTestKit.SubscriberProbe[Int]()

View file

@ -39,8 +39,7 @@ import scala.util.control.NonFatal
*
* @see [[akka.persistence.stream.PersistentSourceSettings]]
*/
final case class PersistentSource[Out](persistenceId: String, sourceSettings: PersistentSourceSettings = PersistentSourceSettings()) extends KeyedActorFlowSource[Out] {
override type MaterializedType = ActorRef
final case class PersistentSource[Out](persistenceId: String, sourceSettings: PersistentSourceSettings = PersistentSourceSettings()) extends KeyedActorFlowSource[Out, ActorRef] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (publisher, publisherRef) = create(materializer, flowName)

View file

@ -152,7 +152,7 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) {
* stream. The result can be highly implementation specific, ranging from
* local actor chains to remote-deployed processing networks.
*/
def materialize[In, Out](source: scaladsl.Source[In], sink: scaladsl.Sink[Out], ops: List[Ast.AstNode], keys: List[Key]): scaladsl.MaterializedMap
def materialize[In, Out](source: scaladsl.Source[In], sink: scaladsl.Sink[Out], ops: List[Ast.AstNode], keys: List[Key[_]]): scaladsl.MaterializedMap
/**
* Create publishers and subscribers for fan-in and fan-out operations.

View file

@ -184,7 +184,7 @@ private[akka] object Ast {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
case class DirectProcessorWithKey(p: () (Processor[Any, Any], Any), key: Key, attributes: OperationAttributes = processorWithKey) extends AstNode {
case class DirectProcessorWithKey(p: () (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
@ -360,7 +360,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
}
// Ops come in reverse order
override def materialize[In, Out](source: Source[In], sink: Sink[Out], rawOps: List[Ast.AstNode], keys: List[Key]): MaterializedMap = {
override def materialize[In, Out](source: Source[In], sink: Sink[Out], rawOps: List[Ast.AstNode], keys: List[Key[_]]): MaterializedMap = {
val flowName = createFlowName() //FIXME: Creates Id even when it is not used in all branches below
def throwUnknownType(typeName: String, s: AnyRef): Nothing =
@ -388,6 +388,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
case s: Source[_] throwUnknownType("Source", s)
case s: Sink[_] throwUnknownType("Sink", s)
}
def addIfKeyed(m: Materializable, v: Any, map: MaterializedMap) = m match {
case km: KeyedMaterializable[_] map.updated(km, v)
case _ map
}
val mmPromise = Promise[MaterializedMap]
val mmFuture = mmPromise.future
@ -410,8 +414,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
val (first, map) = processorChain(last, ops.tail, flowName, opsSize - 1, lastMap)
(attachSource(first.asInstanceOf[Processor[In, Any]], flowName), attachSink(last, flowName), map)
}
val sourceMap = if (source.isInstanceOf[KeyedSource[_]]) pipeMap.updated(source, sourceValue) else pipeMap
val sourceSinkMap = if (sink.isInstanceOf[KeyedSink[_]]) sourceMap.updated(sink, sinkValue) else sourceMap
val sourceSinkMap = addIfKeyed(sink, sinkValue, addIfKeyed(source, sourceValue, pipeMap))
if (keys.isEmpty) sourceSinkMap
else (sourceSinkMap /: keys) {

View file

@ -141,8 +141,7 @@ class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension {
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf): ServerBinding = {
val connectionSource = new KeyedActorFlowSource[IncomingConnection] {
override type MaterializedType = (Future[InetSocketAddress], Future[() Future[Unit]])
val connectionSource = new KeyedActorFlowSource[IncomingConnection, (Future[InetSocketAddress], Future[() Future[Unit]])] {
override def attach(flowSubscriber: Subscriber[IncomingConnection],
materializer: ActorBasedFlowMaterializer,
flowName: String): MaterializedType = {
@ -194,9 +193,7 @@ private[akka] object StreamTcpExt {
/**
* INTERNAL API
*/
class PreMaterializedOutgoingKey extends Key {
type MaterializedType = Future[InetSocketAddress]
class PreMaterializedOutgoingKey extends Key[Future[InetSocketAddress]] {
override def materialize(map: MaterializedMap) =
throw new IllegalStateException("This key has already been materialized by the TCP Processor")
}

View file

@ -381,11 +381,22 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* Flow with attached input and output, can be executed.
*/
trait RunnableFlow {
/**
* Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow.
*/
def run(materializer: FlowMaterializer): javadsl.MaterializedMap
/**
* Run this flow and return the value of the [[KeyedMaterializable]].
*/
def runWith[M](key: KeyedMaterializable[M], materializer: FlowMaterializer): M
}
/** INTERNAL API */
private[akka] class RunnableFlowAdapter(runnable: scaladsl.RunnableFlow) extends RunnableFlow {
override def run(materializer: FlowMaterializer): MaterializedMap =
new MaterializedMap(runnable.run()(materializer))
def runWith[M](key: KeyedMaterializable[M], materializer: FlowMaterializer): M =
runnable.runWith(key.asScala)(materializer)
}

View file

@ -652,5 +652,7 @@ class FlowGraph(delegate: scaladsl.FlowGraph) extends RunnableFlow {
override def run(materializer: FlowMaterializer): javadsl.MaterializedMap =
new MaterializedMap(delegate.run()(materializer))
def runWith[M](key: KeyedMaterializable[M], materializer: FlowMaterializer): M =
delegate.runWith(key.asScala)(materializer)
}

View file

@ -18,22 +18,11 @@ class MaterializedMap(delegate: scaladsl.MaterializedMap) {
def asScala: scaladsl.MaterializedMap = delegate
/**
* Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[akka.stream.javadsl.Source#subscriber]].
* Retrieve a materialized key, `Source`, `Sink` or `Key`, e.g. the `Subscriber` of a
* [[akka.stream.javadsl.Source#subscriber]].
*/
def get[T](key: javadsl.KeyedSource[_, T]): T =
delegate.get(key.asScala).asInstanceOf[T]
/**
* Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[akka.stream.javadsl.Sink#publisher]].
*/
def get[D](key: javadsl.KeyedSink[_, D]): D =
delegate.get(key.asScala).asInstanceOf[D]
/**
* Retrieve a materialized `Key`.
*/
def get[T](key: Key[T]): T =
delegate.get(key.asScala).asInstanceOf[T]
def get[T](key: javadsl.KeyedMaterializable[T]): T =
delegate.get(key.asScala)
/**
* Merge two materialized maps.
@ -46,8 +35,8 @@ class MaterializedMap(delegate: scaladsl.MaterializedMap) {
/**
* Update the materialized map with a new value.
*/
def updated(key: Object, value: Object): MaterializedMap =
new MaterializedMap(delegate.updated(key, value))
def updated(key: KeyedMaterializable[_], value: Object): MaterializedMap =
new MaterializedMap(delegate.updated(key.asScala, value))
/**
* Check if this map is empty.
@ -62,13 +51,22 @@ class MaterializedMap(delegate: scaladsl.MaterializedMap) {
}
}
/**
* Java API
*
* Common interface for keyed things that can be materialized.
*/
trait KeyedMaterializable[M] {
def asScala: scaladsl.KeyedMaterializable[M]
}
/**
* Java API
*
* A key that is not directly tied to a sink or source instance.
*/
class Key[T](delegate: scaladsl.Key) {
def asScala: scaladsl.Key = delegate
class Key[M](delegate: scaladsl.Key[M]) extends KeyedMaterializable[M] {
def asScala: scaladsl.Key[M] = delegate
/**
* Materialize the value for this key. All Sink and Source keys have been materialized and exist in the map.

View file

@ -148,6 +148,6 @@ class Sink[-In](delegate: scaladsl.Sink[In]) {
* to retrieve in order to access aspects of this sink (could be a completion Future
* or a cancellation handle, etc.)
*/
final class KeyedSink[-In, M](delegate: scaladsl.KeyedSink[In]) extends javadsl.Sink[In](delegate) {
override def asScala: scaladsl.KeyedSink[In] = super.asScala.asInstanceOf[scaladsl.KeyedSink[In]]
final class KeyedSink[-In, M](delegate: scaladsl.KeyedSink[In, M]) extends javadsl.Sink[In](delegate) with KeyedMaterializable[M] {
override def asScala: scaladsl.KeyedSink[In, M] = super.asScala.asInstanceOf[scaladsl.KeyedSink[In, M]]
}

View file

@ -136,7 +136,7 @@ object Source {
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def subscriber[T](): KeyedSource[Subscriber[T], T] =
def subscriber[T](): KeyedSource[T, Subscriber[T]] =
new KeyedSource(scaladsl.Source.subscriber)
/**
@ -145,7 +145,7 @@ object Source {
* source.
*/
def concat[T](first: Source[T], second: Source[T]): Source[T] =
new KeyedSource(scaladsl.Source.concat(first.asScala, second.asScala))
new Source(scaladsl.Source.concat(first.asScala, second.asScala))
}
/**
@ -463,6 +463,6 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* A `Source` that will create an object during materialization that the user will need
* to retrieve in order to access aspects of this source (could be a Subscriber, a Future/Promise, etc.).
*/
final class KeyedSource[+Out, T](delegate: scaladsl.Source[Out]) extends Source[Out](delegate) {
override def asScala: scaladsl.KeyedActorFlowSource[Out] = super.asScala.asInstanceOf[scaladsl.KeyedActorFlowSource[Out]]
final class KeyedSource[+Out, M](delegate: scaladsl.KeyedSource[Out, M]) extends Source[Out](delegate) with KeyedMaterializable[M] {
override def asScala: scaladsl.KeyedSource[Out, M] = super.asScala.asInstanceOf[scaladsl.KeyedSource[Out, M]]
}

View file

@ -61,7 +61,7 @@ trait SimpleActorFlowSink[-In] extends ActorFlowSink[In] {
* to retrieve in order to access aspects of this sink (could be a completion Future
* or a cancellation handle, etc.)
*/
trait KeyedActorFlowSink[-In] extends ActorFlowSink[In] with KeyedSink[In]
trait KeyedActorFlowSink[-In, M] extends ActorFlowSink[In] with KeyedSink[In, M]
object PublisherSink {
def apply[T](): PublisherSink[T] = new PublisherSink[T]
@ -75,16 +75,14 @@ object PublisherSink {
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
class PublisherSink[In] extends KeyedActorFlowSink[In] {
type MaterializedType = Publisher[In]
class PublisherSink[In] extends KeyedActorFlowSink[In, Publisher[In]] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = flowPublisher
override def toString: String = "PublisherSink"
}
final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In] {
type MaterializedType = Publisher[In]
final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In, Publisher[In]] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val fanoutActor = materializer.actorOf(
@ -119,9 +117,7 @@ object HeadSink {
* the Future into the corresponding failed state) or the end-of-stream
* (failing the Future with a NoSuchElementException).
*/
class HeadSink[In] extends KeyedActorFlowSink[In] {
type MaterializedType = Future[In]
class HeadSink[In] extends KeyedActorFlowSink[In, Future[In]] {
def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (sub, f) = create(materializer, flowName)
@ -196,9 +192,7 @@ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends Simple
* that will be completed with `Success` when reaching the normal end of the stream, or completed
* with `Failure` if there is an error is signaled in the stream.
*/
final case class ForeachSink[In](f: In Unit) extends KeyedActorFlowSink[In] {
override type MaterializedType = Future[Unit]
final case class ForeachSink[In](f: In Unit) extends KeyedActorFlowSink[In, Future[Unit]] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val promise = Promise[Unit]()
@ -232,9 +226,7 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In]
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
*/
final case class FoldSink[U, In](zero: U)(f: (U, In) U) extends KeyedActorFlowSink[In] {
type MaterializedType = Future[U]
final case class FoldSink[U, In](zero: U)(f: (U, In) U) extends KeyedActorFlowSink[In, Future[U]] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val promise = Promise[U]()
@ -284,9 +276,7 @@ final case object CancelSink extends SimpleActorFlowSink[Any] {
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] {
type MaterializedType = ActorRef
final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In, ActorRef] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): ActorRef = {
val (subscriber, subscriberRef) = create(materializer, flowName)

View file

@ -58,7 +58,7 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] {
override def to(sink: Sink[Out]): RunnableFlow = Pipe.empty[Out].withSource(this).to(sink)
override def withKey(key: Key): Source[Out] = Pipe.empty[Out].withSource(this).withKey(key)
override def withKey(key: Key[_]): Source[Out] = Pipe.empty[Out].withSource(this).withKey(key)
/** INTERNAL API */
override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op), Nil) //FIXME raw addition of AstNodes
@ -78,15 +78,13 @@ trait SimpleActorFlowSource[+Out] extends ActorFlowSource[Out] { // FIXME Tightl
* to retrieve in order to access aspects of this source (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait KeyedActorFlowSource[+Out] extends ActorFlowSource[Out] with KeyedSource[Out]
trait KeyedActorFlowSource[+Out, M] extends ActorFlowSource[Out] with KeyedSource[Out, M]
/**
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors?
override type MaterializedType = Subscriber[Out]
final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out, Subscriber[Out]] { // FIXME Why does this have anything to do with Actors?
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] =
flowSubscriber
@ -159,9 +157,7 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends KeyedActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors?
override type MaterializedType = Cancellable
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends KeyedActorFlowSource[Out, Cancellable] { // FIXME Why does this have anything to do with Actors?
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (pub, cancellable) = create(materializer, flowName)
pub.subscribe(flowSubscriber)
@ -188,8 +184,7 @@ final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteD
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] {
override type MaterializedType = ActorRef
final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out, ActorRef] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (publisher, publisherRef) = create(materializer, flowName)

View file

@ -70,7 +70,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* The key can only use other keys if they have been added to the flow
* before this key.
*/
def withKey(key: Key): Flow[In, Out]
def withKey(key: Key[_]): Flow[In, Out]
/**
* Applies given [[OperationAttributes]] to a given section.
@ -122,7 +122,16 @@ object Flow {
* Flow with attached input and output, can be executed.
*/
trait RunnableFlow {
/**
* Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow.
*/
def run()(implicit materializer: FlowMaterializer): MaterializedMap
/**
* Run this flow and return the value of the [[KeyedMaterializable]].
*/
def runWith(key: KeyedMaterializable[_])(implicit materializer: FlowMaterializer): key.MaterializedType =
this.run().get(key)
}
/**

View file

@ -498,14 +498,14 @@ private[akka] object FlowGraphInternal {
*/
final override def equals(other: Any): Boolean = other match {
case v: SourceVertex (source, v.source) match {
case (k1: KeyedSource[_], k2: KeyedSource[_]) k1 == k2
case _ super.equals(other)
case (k1: KeyedSource[_, _], k2: KeyedSource[_, _]) k1 == k2
case _ super.equals(other)
}
case _ false
}
final override def hashCode: Int = source match {
case k: KeyedSource[_] k.hashCode
case _ super.hashCode
case k: KeyedSource[_, _] k.hashCode
case _ super.hashCode
}
final override private[scaladsl] def newInstance() = this.copy()
@ -520,14 +520,14 @@ private[akka] object FlowGraphInternal {
*/
final override def equals(other: Any): Boolean = other match {
case v: SinkVertex (sink, v.sink) match {
case (k1: KeyedSink[_], k2: KeyedSink[_]) k1 == k2
case _ super.equals(other)
case (k1: KeyedSink[_, _], k2: KeyedSink[_, _]) k1 == k2
case _ super.equals(other)
}
case _ false
}
final override def hashCode: Int = sink match {
case k: KeyedSink[_] k.hashCode
case _ super.hashCode
case k: KeyedSink[_, _] k.hashCode
case _ super.hashCode
}
final override private[scaladsl] def newInstance() = this.copy()
@ -1056,8 +1056,8 @@ class FlowGraphBuilder private[akka] (
s"Use individual instances instead of the same one multiple times. Nodes are: ${graph.nodes}"
vertex match {
case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_]] require(!graph.contains(v), warningMessage(v.source))
case v: SinkVertex if v.sink.isInstanceOf[KeyedSink[_]] require(!graph.contains(v), warningMessage(v.sink))
case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_, _]] require(!graph.contains(v), warningMessage(v.source))
case v: SinkVertex if v.sink.isInstanceOf[KeyedSink[_, _]] require(!graph.contains(v), warningMessage(v.sink))
case _ // ok
}
}

View file

@ -116,7 +116,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](
}
// FIXME #16379 This key will be materalized to early
override def withKey(key: Key): Flow[In, Out] = this.copy(outPipe = outPipe.withKey(key))
override def withKey(key: Key[_]): Flow[In, Out] = this.copy(outPipe = outPipe.withKey(key))
override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
@ -163,7 +163,7 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou
}
// FIXME #16379 This key will be materalized to early
override def withKey(key: Key): Source[Out] = this.copy(outPipe = outPipe.withKey(key))
override def withKey(key: Key[_]): Source[Out] = this.copy(outPipe = outPipe.withKey(key))
override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))

View file

@ -10,19 +10,9 @@ package akka.stream.scaladsl
trait MaterializedMap {
/**
* Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[SubscriberSource]].
* Retrieve a materialized key, `Source`, `Sink` or `Key`, e.g. the `Subscriber` of a [[SubscriberSource]].
*/
def get(key: Source[_]): key.MaterializedType
/**
* Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[PublisherSink]].
*/
def get(key: Sink[_]): key.MaterializedType
/**
* Retrieve a materialized `Key`.
*/
def get(key: Key): key.MaterializedType
def get(key: Materializable): key.MaterializedType
/**
* Merge two materialized maps.
@ -32,7 +22,7 @@ trait MaterializedMap {
/**
* Update the materialized map with a new value.
*/
def updated(key: AnyRef, value: Any): MaterializedMap
def updated(key: KeyedMaterializable[_], value: Any): MaterializedMap
/**
* Check if this map is empty.
@ -52,12 +42,23 @@ object MaterializedMap {
}
/**
* A key that is not directly tied to a sink or source instance.
*
* FIXME #16380 Clean up the overlap between Keys/Sinks/Sources
* Common trait for things that have a MaterializedType.
*/
trait Key {
trait Materializable {
type MaterializedType
}
/**
* Common trait for keyed things that have a MaterializedType.
*/
trait KeyedMaterializable[M] extends Materializable {
override type MaterializedType = M
}
/**
* A key that is not directly tied to a sink or source instance.
*/
trait Key[M] extends KeyedMaterializable[M] {
/**
* Materialize the value for this key. All Sink and Source keys have been materialized and exist in the map.
@ -66,35 +67,30 @@ trait Key {
}
private[stream] case class MaterializedMapImpl(map: Map[AnyRef, Any]) extends MaterializedMap {
private def failure(keyType: String, key: AnyRef) = new IllegalArgumentException(s"$keyType [$key] doesn't exist in this flow")
override def get(key: Source[_]): key.MaterializedType = key match {
case _: KeyedSource[_] map.get(key) match {
case Some(v) v.asInstanceOf[key.MaterializedType]
case None throw failure("Source", key)
private def failure(key: KeyedMaterializable[_]) = {
val keyType = key match {
case _: KeyedSource[_, _] "Source"
case _: KeyedSink[_, _] "Sink"
case _: Key[_] "Key"
case _ "Unknown"
}
case _ ().asInstanceOf[key.MaterializedType]
new IllegalArgumentException(s"$keyType key [$key] doesn't exist in this flow")
}
override def get(key: Sink[_]): key.MaterializedType = key match {
case _: KeyedSink[_] map.get(key) match {
override def get(key: Materializable): key.MaterializedType = key match {
case km: KeyedMaterializable[_] map.get(key) match {
case Some(v) v.asInstanceOf[key.MaterializedType]
case None throw failure("Sink", key)
case None throw failure(km)
}
case _ ().asInstanceOf[key.MaterializedType]
}
override def get(key: Key): key.MaterializedType = map.get(key) match {
case Some(v) v.asInstanceOf[key.MaterializedType]
case None throw failure("Key", key)
}
override def merge(otherMap: MaterializedMap) =
if (map.isEmpty) otherMap
else if (otherMap.isEmpty) this
else MaterializedMapImpl(map ++ otherMap.iterator)
override def updated(key: AnyRef, value: Any) = MaterializedMapImpl(map.updated(key, value))
override def updated(key: KeyedMaterializable[_], value: Any) = MaterializedMapImpl(map.updated(key, value))
override def isEmpty = map.isEmpty

View file

@ -20,7 +20,7 @@ private[akka] object Pipe {
Pipe(List(Ast.DirectProcessor(() p().asInstanceOf[Processor[Any, Any]])), Nil)
// FIXME #16376 should probably be replaced with an ActorFlowProcessor similar to ActorFlowSource/Sink
private[stream] def apply[In, Out](key: Key)(p: () (Processor[In, Out], Any)): Pipe[In, Out] =
private[stream] def apply[In, Out](key: Key[_])(p: () (Processor[In, Out], Any)): Pipe[In, Out] =
Pipe(List(Ast.DirectProcessorWithKey(() p().asInstanceOf[(Processor[Any, Any], Any)], key)), Nil)
private[stream] def apply[In, Out](source: SourcePipe[_]): Pipe[In, Out] =
@ -33,7 +33,7 @@ private[akka] object Pipe {
/**
* Flow with one open input and one open output.
*/
private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Flow[In, Out] {
private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key[_]], attributes: OperationAttributes = OperationAttributes.none) extends Flow[In, Out] {
override type Repr[+O] = Pipe[In @uncheckedVariance, O]
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = Pipe(ops = attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes
@ -62,7 +62,7 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def withKey(key: Key): Pipe[In, Out] = Pipe(ops, keys :+ key)
override def withKey(key: Key[_]): Pipe[In, Out] = Pipe(ops, keys :+ key)
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ::: ops, keys ::: pipe.keys) // FIXME raw addition of AstNodes
}
@ -70,7 +70,7 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke
/**
* Pipe with open input and attached output. Can be used as a `Subscriber`.
*/
private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNode], keys: List[Key]) extends Sink[In] {
private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNode], keys: List[Key[_]]) extends Sink[In] {
private[stream] def withSource(in: Source[In]): RunnablePipe = RunnablePipe(in, output, ops, keys)
@ -81,7 +81,7 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod
/**
* Pipe with open output and attached input. Can be used as a `Publisher`.
*/
private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Source[Out] {
private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key[_]], attributes: OperationAttributes = OperationAttributes.none) extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes
@ -104,13 +104,13 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As
case d: Sink[Out] this.withSink(d)
}
override def withKey(key: Key): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key)
override def withKey(key: Key[_]): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key)
}
/**
* Pipe with attached input and output, can be executed.
*/
private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_], ops: List[AstNode], keys: List[Key]) extends RunnableFlow {
private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_], ops: List[AstNode], keys: List[Key[_]]) extends RunnableFlow {
def run()(implicit materializer: FlowMaterializer): MaterializedMap =
materializer.materialize(input, output, ops, keys)
}

View file

@ -12,8 +12,7 @@ import akka.stream.FlowMaterializer
* A `Sink` is a set of stream processing steps that has one open input and an attached output.
* Can be used as a `Subscriber`
*/
trait Sink[-In] {
type MaterializedType
trait Sink[-In] extends Materializable {
/**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
@ -114,4 +113,4 @@ object Sink {
* to retrieve in order to access aspects of this sink (could be a completion Future
* or a cancellation handle, etc.)
*/
trait KeyedSink[-In] extends Sink[In]
trait KeyedSink[-In, M] extends Sink[In] with KeyedMaterializable[M]

View file

@ -17,8 +17,7 @@ import akka.stream.FlowMaterializer
* A `Source` is a set of stream processing steps that has one open output and an attached input.
* Can be used as a `Publisher`
*/
trait Source[+Out] extends FlowOps[Out] {
type MaterializedType
trait Source[+Out] extends FlowOps[Out] with Materializable {
override type Repr[+O] <: Source[O]
/**
@ -78,7 +77,7 @@ trait Source[+Out] extends FlowOps[Out] {
* The key can only use other keys if they have been added to the source
* before this key. This also includes the keyed source if applicable.
*/
def withKey(key: Key): Source[Out]
def withKey(key: Key[_]): Source[Out]
/**
* Applies given [[OperationAttributes]] to a given section.
@ -210,4 +209,4 @@ object Source {
* to retrieve in order to access aspects of this source (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait KeyedSource[+Out] extends Source[Out]
trait KeyedSource[+Out, M] extends Source[Out] with KeyedMaterializable[M]