diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 7ba141a2d5..e12d6d79f8 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -218,4 +218,4 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { def createExtension(system: ExtendedActorSystem): HttpExt = new HttpExt(system.settings.config getConfig "akka.http")(system) -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index d7bb4bc4bc..bdcdd91a39 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -19,7 +19,7 @@ class DslConsistencySpec extends WordSpec with Matchers { val sSinkClass = classOf[akka.stream.scaladsl.Sink[_]] val jSinkClass = classOf[akka.stream.javadsl.Sink[_]] - val sKeyClass = classOf[akka.stream.scaladsl.Key] + val sKeyClass = classOf[akka.stream.scaladsl.Key[_]] val jKeyClass = classOf[akka.stream.javadsl.Key[_]] val sMaterializedMapClass = classOf[akka.stream.scaladsl.MaterializedMap] diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 6c590e2899..375240184c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -35,19 +35,19 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) :: (classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) :: (classOf[akka.stream.scaladsl.Source[_]], classOf[akka.stream.javadsl.Source[_]]) :: - (classOf[akka.stream.scaladsl.KeyedSource[_]], classOf[akka.stream.javadsl.KeyedSource[_, _]]) :: + (classOf[akka.stream.scaladsl.KeyedSource[_, _]], classOf[akka.stream.javadsl.KeyedSource[_, _]]) :: (classOf[akka.stream.scaladsl.Sink[_]], classOf[akka.stream.javadsl.Sink[_]]) :: - (classOf[akka.stream.scaladsl.KeyedSink[_]], classOf[akka.stream.javadsl.KeyedSink[_, _]]) :: + (classOf[akka.stream.scaladsl.KeyedSink[_, _]], classOf[akka.stream.javadsl.KeyedSink[_, _]]) :: (classOf[akka.stream.scaladsl.Flow[_, _]], classOf[akka.stream.javadsl.Flow[_, _]]) :: (classOf[akka.stream.scaladsl.FlowGraph], classOf[akka.stream.javadsl.FlowGraph]) :: (classOf[akka.stream.scaladsl.PartialFlowGraph], classOf[akka.stream.javadsl.PartialFlowGraph]) :: Nil // format: ON - val sKeyedSource = classOf[scaladsl.KeyedSource[_]] + val sKeyedSource = classOf[scaladsl.KeyedSource[_, _]] val jKeyedSource = classOf[javadsl.KeyedSource[_, _]] - val sKeyedSink = classOf[scaladsl.KeyedSink[_]] + val sKeyedSink = classOf[scaladsl.KeyedSink[_, _]] val jKeyedSink = classOf[javadsl.KeyedSink[_, _]] val sSource = classOf[scaladsl.Source[_]] diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala index 3f149407ac..2888c416ff 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala @@ -45,8 +45,8 @@ class FlowGraphInitSpec extends AkkaSpec { val s = Source(1 to 5) val b = Broadcast[Int] - val sink: KeyedSink[Int] = Sink.foreach[Int](_ ⇒ ()) - val otherSink: KeyedSink[Int] = Sink.foreach[Int](i ⇒ 2 * i) + val sink = Sink.foreach[Int](_ ⇒ ()) + val otherSink = Sink.foreach[Int](i ⇒ 2 * i) FlowGraph { implicit builder ⇒ import FlowGraphImplicits._ @@ -84,8 +84,8 @@ class FlowGraphInitSpec extends AkkaSpec { val s = Sink.ignore val m = Merge[Int] - val source1: KeyedSource[Int] = Source.subscriber - val source2: KeyedSource[Int] = Source.subscriber + val source1 = Source.subscriber[Int] + val source2 = Source.subscriber[Int] FlowGraph { implicit builder ⇒ import FlowGraphImplicits._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 99cde03dd3..8fbc2e4f2f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -74,12 +74,10 @@ class SourceSpec extends AkkaSpec { "Source with additional keys" must { "materialize keys properly" in { val ks = Source.subscriber[Int] - val mk1 = new Key { - override type MaterializedType = String + val mk1 = new Key[String] { override def materialize(map: MaterializedMap) = map.get(ks).toString } - val mk2 = new Key { - override type MaterializedType = String + val mk2 = new Key[String] { override def materialize(map: MaterializedMap) = map.get(mk1).toUpperCase } val sp = StreamTestKit.SubscriberProbe[Int]() @@ -97,12 +95,10 @@ class SourceSpec extends AkkaSpec { "materialize keys properly when used in a graph" in { val ks = Source.subscriber[Int] - val mk1 = new Key { - override type MaterializedType = String + val mk1 = new Key[String] { override def materialize(map: MaterializedMap) = map.get(ks).toString } - val mk2 = new Key { - override type MaterializedType = String + val mk2 = new Key[String] { override def materialize(map: MaterializedMap) = map.get(mk1).toUpperCase } val sp = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala index 7f0053dabd..edfa398da3 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala @@ -39,8 +39,7 @@ import scala.util.control.NonFatal * * @see [[akka.persistence.stream.PersistentSourceSettings]] */ -final case class PersistentSource[Out](persistenceId: String, sourceSettings: PersistentSourceSettings = PersistentSourceSettings()) extends KeyedActorFlowSource[Out] { - override type MaterializedType = ActorRef +final case class PersistentSource[Out](persistenceId: String, sourceSettings: PersistentSourceSettings = PersistentSourceSettings()) extends KeyedActorFlowSource[Out, ActorRef] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { val (publisher, publisherRef) = create(materializer, flowName) diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 5a4ead82b3..66c8d52fc7 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -152,7 +152,7 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { * stream. The result can be highly implementation specific, ranging from * local actor chains to remote-deployed processing networks. */ - def materialize[In, Out](source: scaladsl.Source[In], sink: scaladsl.Sink[Out], ops: List[Ast.AstNode], keys: List[Key]): scaladsl.MaterializedMap + def materialize[In, Out](source: scaladsl.Source[In], sink: scaladsl.Sink[Out], ops: List[Ast.AstNode], keys: List[Key[_]]): scaladsl.MaterializedMap /** * Create publishers and subscribers for fan-in and fan-out operations. diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 47b4858ce4..f02d78da1a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -184,7 +184,7 @@ private[akka] object Ast { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } - case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key, attributes: OperationAttributes = processorWithKey) extends AstNode { + case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } @@ -360,7 +360,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting } // Ops come in reverse order - override def materialize[In, Out](source: Source[In], sink: Sink[Out], rawOps: List[Ast.AstNode], keys: List[Key]): MaterializedMap = { + override def materialize[In, Out](source: Source[In], sink: Sink[Out], rawOps: List[Ast.AstNode], keys: List[Key[_]]): MaterializedMap = { val flowName = createFlowName() //FIXME: Creates Id even when it is not used in all branches below def throwUnknownType(typeName: String, s: AnyRef): Nothing = @@ -388,6 +388,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting case s: Source[_] ⇒ throwUnknownType("Source", s) case s: Sink[_] ⇒ throwUnknownType("Sink", s) } + def addIfKeyed(m: Materializable, v: Any, map: MaterializedMap) = m match { + case km: KeyedMaterializable[_] ⇒ map.updated(km, v) + case _ ⇒ map + } val mmPromise = Promise[MaterializedMap] val mmFuture = mmPromise.future @@ -410,8 +414,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val (first, map) = processorChain(last, ops.tail, flowName, opsSize - 1, lastMap) (attachSource(first.asInstanceOf[Processor[In, Any]], flowName), attachSink(last, flowName), map) } - val sourceMap = if (source.isInstanceOf[KeyedSource[_]]) pipeMap.updated(source, sourceValue) else pipeMap - val sourceSinkMap = if (sink.isInstanceOf[KeyedSink[_]]) sourceMap.updated(sink, sinkValue) else sourceMap + val sourceSinkMap = addIfKeyed(sink, sinkValue, addIfKeyed(source, sourceValue, pipeMap)) if (keys.isEmpty) sourceSinkMap else (sourceSinkMap /: keys) { diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala index 3140d60e4f..eef6cb0597 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala @@ -141,8 +141,7 @@ class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension { backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf): ServerBinding = { - val connectionSource = new KeyedActorFlowSource[IncomingConnection] { - override type MaterializedType = (Future[InetSocketAddress], Future[() ⇒ Future[Unit]]) + val connectionSource = new KeyedActorFlowSource[IncomingConnection, (Future[InetSocketAddress], Future[() ⇒ Future[Unit]])] { override def attach(flowSubscriber: Subscriber[IncomingConnection], materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType = { @@ -194,9 +193,7 @@ private[akka] object StreamTcpExt { /** * INTERNAL API */ - class PreMaterializedOutgoingKey extends Key { - type MaterializedType = Future[InetSocketAddress] - + class PreMaterializedOutgoingKey extends Key[Future[InetSocketAddress]] { override def materialize(map: MaterializedMap) = throw new IllegalStateException("This key has already been materialized by the TCP Processor") } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6c2e05bdf5..e792cb2c2a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -381,11 +381,22 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * Flow with attached input and output, can be executed. */ trait RunnableFlow { + /** + * Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow. + */ def run(materializer: FlowMaterializer): javadsl.MaterializedMap + + /** + * Run this flow and return the value of the [[KeyedMaterializable]]. + */ + def runWith[M](key: KeyedMaterializable[M], materializer: FlowMaterializer): M } /** INTERNAL API */ private[akka] class RunnableFlowAdapter(runnable: scaladsl.RunnableFlow) extends RunnableFlow { override def run(materializer: FlowMaterializer): MaterializedMap = new MaterializedMap(runnable.run()(materializer)) + + def runWith[M](key: KeyedMaterializable[M], materializer: FlowMaterializer): M = + runnable.runWith(key.asScala)(materializer) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index 1aa5351110..b787daf453 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -652,5 +652,7 @@ class FlowGraph(delegate: scaladsl.FlowGraph) extends RunnableFlow { override def run(materializer: FlowMaterializer): javadsl.MaterializedMap = new MaterializedMap(delegate.run()(materializer)) + def runWith[M](key: KeyedMaterializable[M], materializer: FlowMaterializer): M = + delegate.runWith(key.asScala)(materializer) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala b/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala index badb3d8f7d..be672f5d55 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala @@ -18,22 +18,11 @@ class MaterializedMap(delegate: scaladsl.MaterializedMap) { def asScala: scaladsl.MaterializedMap = delegate /** - * Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[akka.stream.javadsl.Source#subscriber]]. + * Retrieve a materialized key, `Source`, `Sink` or `Key`, e.g. the `Subscriber` of a + * [[akka.stream.javadsl.Source#subscriber]]. */ - def get[T](key: javadsl.KeyedSource[_, T]): T = - delegate.get(key.asScala).asInstanceOf[T] - - /** - * Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[akka.stream.javadsl.Sink#publisher]]. - */ - def get[D](key: javadsl.KeyedSink[_, D]): D = - delegate.get(key.asScala).asInstanceOf[D] - - /** - * Retrieve a materialized `Key`. - */ - def get[T](key: Key[T]): T = - delegate.get(key.asScala).asInstanceOf[T] + def get[T](key: javadsl.KeyedMaterializable[T]): T = + delegate.get(key.asScala) /** * Merge two materialized maps. @@ -46,8 +35,8 @@ class MaterializedMap(delegate: scaladsl.MaterializedMap) { /** * Update the materialized map with a new value. */ - def updated(key: Object, value: Object): MaterializedMap = - new MaterializedMap(delegate.updated(key, value)) + def updated(key: KeyedMaterializable[_], value: Object): MaterializedMap = + new MaterializedMap(delegate.updated(key.asScala, value)) /** * Check if this map is empty. @@ -62,13 +51,22 @@ class MaterializedMap(delegate: scaladsl.MaterializedMap) { } } +/** + * Java API + * + * Common interface for keyed things that can be materialized. + */ +trait KeyedMaterializable[M] { + def asScala: scaladsl.KeyedMaterializable[M] +} + /** * Java API * * A key that is not directly tied to a sink or source instance. */ -class Key[T](delegate: scaladsl.Key) { - def asScala: scaladsl.Key = delegate +class Key[M](delegate: scaladsl.Key[M]) extends KeyedMaterializable[M] { + def asScala: scaladsl.Key[M] = delegate /** * Materialize the value for this key. All Sink and Source keys have been materialized and exist in the map. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index bd68d2907b..2fc8f77157 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -148,6 +148,6 @@ class Sink[-In](delegate: scaladsl.Sink[In]) { * to retrieve in order to access aspects of this sink (could be a completion Future * or a cancellation handle, etc.) */ -final class KeyedSink[-In, M](delegate: scaladsl.KeyedSink[In]) extends javadsl.Sink[In](delegate) { - override def asScala: scaladsl.KeyedSink[In] = super.asScala.asInstanceOf[scaladsl.KeyedSink[In]] +final class KeyedSink[-In, M](delegate: scaladsl.KeyedSink[In, M]) extends javadsl.Sink[In](delegate) with KeyedMaterializable[M] { + override def asScala: scaladsl.KeyedSink[In, M] = super.asScala.asInstanceOf[scaladsl.KeyedSink[In, M]] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 33760f9a39..59c5530fa7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -136,7 +136,7 @@ object Source { /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ - def subscriber[T](): KeyedSource[Subscriber[T], T] = + def subscriber[T](): KeyedSource[T, Subscriber[T]] = new KeyedSource(scaladsl.Source.subscriber) /** @@ -145,7 +145,7 @@ object Source { * source. */ def concat[T](first: Source[T], second: Source[T]): Source[T] = - new KeyedSource(scaladsl.Source.concat(first.asScala, second.asScala)) + new Source(scaladsl.Source.concat(first.asScala, second.asScala)) } /** @@ -463,6 +463,6 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * A `Source` that will create an object during materialization that the user will need * to retrieve in order to access aspects of this source (could be a Subscriber, a Future/Promise, etc.). */ -final class KeyedSource[+Out, T](delegate: scaladsl.Source[Out]) extends Source[Out](delegate) { - override def asScala: scaladsl.KeyedActorFlowSource[Out] = super.asScala.asInstanceOf[scaladsl.KeyedActorFlowSource[Out]] +final class KeyedSource[+Out, M](delegate: scaladsl.KeyedSource[Out, M]) extends Source[Out](delegate) with KeyedMaterializable[M] { + override def asScala: scaladsl.KeyedSource[Out, M] = super.asScala.asInstanceOf[scaladsl.KeyedSource[Out, M]] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index 7d9aaa0b13..db1ee1864d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -61,7 +61,7 @@ trait SimpleActorFlowSink[-In] extends ActorFlowSink[In] { * to retrieve in order to access aspects of this sink (could be a completion Future * or a cancellation handle, etc.) */ -trait KeyedActorFlowSink[-In] extends ActorFlowSink[In] with KeyedSink[In] +trait KeyedActorFlowSink[-In, M] extends ActorFlowSink[In] with KeyedSink[In, M] object PublisherSink { def apply[T](): PublisherSink[T] = new PublisherSink[T] @@ -75,16 +75,14 @@ object PublisherSink { * elements to fill the internal buffers it will assert back-pressure until * a subscriber connects and creates demand for elements to be emitted. */ -class PublisherSink[In] extends KeyedActorFlowSink[In] { - type MaterializedType = Publisher[In] +class PublisherSink[In] extends KeyedActorFlowSink[In, Publisher[In]] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = flowPublisher override def toString: String = "PublisherSink" } -final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In] { - type MaterializedType = Publisher[In] +final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In, Publisher[In]] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { val fanoutActor = materializer.actorOf( @@ -119,9 +117,7 @@ object HeadSink { * the Future into the corresponding failed state) or the end-of-stream * (failing the Future with a NoSuchElementException). */ -class HeadSink[In] extends KeyedActorFlowSink[In] { - - type MaterializedType = Future[In] +class HeadSink[In] extends KeyedActorFlowSink[In, Future[In]] { def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { val (sub, f) = create(materializer, flowName) @@ -196,9 +192,7 @@ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends Simple * that will be completed with `Success` when reaching the normal end of the stream, or completed * with `Failure` if there is an error is signaled in the stream. */ -final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In] { - - override type MaterializedType = Future[Unit] +final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In, Future[Unit]] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { val promise = Promise[Unit]() @@ -232,9 +226,7 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In] * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ -final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFlowSink[In] { - - type MaterializedType = Future[U] +final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFlowSink[In, Future[U]] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { val promise = Promise[U]() @@ -284,9 +276,7 @@ final case object CancelSink extends SimpleActorFlowSink[Any] { * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. */ -final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] { - - type MaterializedType = ActorRef +final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In, ActorRef] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): ActorRef = { val (subscriber, subscriberRef) = create(materializer, flowName) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index 892df12f78..399aa8179a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -58,7 +58,7 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] { override def to(sink: Sink[Out]): RunnableFlow = Pipe.empty[Out].withSource(this).to(sink) - override def withKey(key: Key): Source[Out] = Pipe.empty[Out].withSource(this).withKey(key) + override def withKey(key: Key[_]): Source[Out] = Pipe.empty[Out].withSource(this).withKey(key) /** INTERNAL API */ override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op), Nil) //FIXME raw addition of AstNodes @@ -78,15 +78,13 @@ trait SimpleActorFlowSource[+Out] extends ActorFlowSource[Out] { // FIXME Tightl * to retrieve in order to access aspects of this source (could be a Subscriber, a * Future/Promise, etc.). */ -trait KeyedActorFlowSource[+Out] extends ActorFlowSource[Out] with KeyedSource[Out] +trait KeyedActorFlowSource[+Out, M] extends ActorFlowSource[Out] with KeyedSource[Out, M] /** * Holds a `Subscriber` representing the input side of the flow. * The `Subscriber` can later be connected to an upstream `Publisher`. */ -final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors? - override type MaterializedType = Subscriber[Out] - +final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out, Subscriber[Out]] { // FIXME Why does this have anything to do with Actors? override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = flowSubscriber @@ -159,9 +157,7 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ -final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends KeyedActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors? - override type MaterializedType = Cancellable - +final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends KeyedActorFlowSource[Out, Cancellable] { // FIXME Why does this have anything to do with Actors? override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { val (pub, cancellable) = create(materializer, flowName) pub.subscribe(flowSubscriber) @@ -188,8 +184,7 @@ final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteD * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. */ -final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] { - override type MaterializedType = ActorRef +final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out, ActorRef] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { val (publisher, publisherRef) = create(materializer, flowName) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 6e017601f1..7ddfd323c4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -70,7 +70,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * The key can only use other keys if they have been added to the flow * before this key. */ - def withKey(key: Key): Flow[In, Out] + def withKey(key: Key[_]): Flow[In, Out] /** * Applies given [[OperationAttributes]] to a given section. @@ -122,7 +122,16 @@ object Flow { * Flow with attached input and output, can be executed. */ trait RunnableFlow { + /** + * Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow. + */ def run()(implicit materializer: FlowMaterializer): MaterializedMap + + /** + * Run this flow and return the value of the [[KeyedMaterializable]]. + */ + def runWith(key: KeyedMaterializable[_])(implicit materializer: FlowMaterializer): key.MaterializedType = + this.run().get(key) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index a6118f350a..158832a6f0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -498,14 +498,14 @@ private[akka] object FlowGraphInternal { */ final override def equals(other: Any): Boolean = other match { case v: SourceVertex ⇒ (source, v.source) match { - case (k1: KeyedSource[_], k2: KeyedSource[_]) ⇒ k1 == k2 - case _ ⇒ super.equals(other) + case (k1: KeyedSource[_, _], k2: KeyedSource[_, _]) ⇒ k1 == k2 + case _ ⇒ super.equals(other) } case _ ⇒ false } final override def hashCode: Int = source match { - case k: KeyedSource[_] ⇒ k.hashCode - case _ ⇒ super.hashCode + case k: KeyedSource[_, _] ⇒ k.hashCode + case _ ⇒ super.hashCode } final override private[scaladsl] def newInstance() = this.copy() @@ -520,14 +520,14 @@ private[akka] object FlowGraphInternal { */ final override def equals(other: Any): Boolean = other match { case v: SinkVertex ⇒ (sink, v.sink) match { - case (k1: KeyedSink[_], k2: KeyedSink[_]) ⇒ k1 == k2 - case _ ⇒ super.equals(other) + case (k1: KeyedSink[_, _], k2: KeyedSink[_, _]) ⇒ k1 == k2 + case _ ⇒ super.equals(other) } case _ ⇒ false } final override def hashCode: Int = sink match { - case k: KeyedSink[_] ⇒ k.hashCode - case _ ⇒ super.hashCode + case k: KeyedSink[_, _] ⇒ k.hashCode + case _ ⇒ super.hashCode } final override private[scaladsl] def newInstance() = this.copy() @@ -1056,8 +1056,8 @@ class FlowGraphBuilder private[akka] ( s"Use individual instances instead of the same one multiple times. Nodes are: ${graph.nodes}" vertex match { - case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_]] ⇒ require(!graph.contains(v), warningMessage(v.source)) - case v: SinkVertex if v.sink.isInstanceOf[KeyedSink[_]] ⇒ require(!graph.contains(v), warningMessage(v.sink)) + case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_, _]] ⇒ require(!graph.contains(v), warningMessage(v.source)) + case v: SinkVertex if v.sink.isInstanceOf[KeyedSink[_, _]] ⇒ require(!graph.contains(v), warningMessage(v.sink)) case _ ⇒ // ok } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala index 1062d4c42d..259d35864f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala @@ -116,7 +116,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( } // FIXME #16379 This key will be materalized to early - override def withKey(key: Key): Flow[In, Out] = this.copy(outPipe = outPipe.withKey(key)) + override def withKey(key: Key[_]): Flow[In, Out] = this.copy(outPipe = outPipe.withKey(key)) override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) @@ -163,7 +163,7 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou } // FIXME #16379 This key will be materalized to early - override def withKey(key: Key): Source[Out] = this.copy(outPipe = outPipe.withKey(key)) + override def withKey(key: Key[_]): Source[Out] = this.copy(outPipe = outPipe.withKey(key)) override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala index e90ff0f3d9..3d2702eefb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala @@ -10,19 +10,9 @@ package akka.stream.scaladsl trait MaterializedMap { /** - * Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. + * Retrieve a materialized key, `Source`, `Sink` or `Key`, e.g. the `Subscriber` of a [[SubscriberSource]]. */ - def get(key: Source[_]): key.MaterializedType - - /** - * Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[PublisherSink]]. - */ - def get(key: Sink[_]): key.MaterializedType - - /** - * Retrieve a materialized `Key`. - */ - def get(key: Key): key.MaterializedType + def get(key: Materializable): key.MaterializedType /** * Merge two materialized maps. @@ -32,7 +22,7 @@ trait MaterializedMap { /** * Update the materialized map with a new value. */ - def updated(key: AnyRef, value: Any): MaterializedMap + def updated(key: KeyedMaterializable[_], value: Any): MaterializedMap /** * Check if this map is empty. @@ -52,12 +42,23 @@ object MaterializedMap { } /** - * A key that is not directly tied to a sink or source instance. - * - * FIXME #16380 Clean up the overlap between Keys/Sinks/Sources + * Common trait for things that have a MaterializedType. */ -trait Key { +trait Materializable { type MaterializedType +} + +/** + * Common trait for keyed things that have a MaterializedType. + */ +trait KeyedMaterializable[M] extends Materializable { + override type MaterializedType = M +} + +/** + * A key that is not directly tied to a sink or source instance. + */ +trait Key[M] extends KeyedMaterializable[M] { /** * Materialize the value for this key. All Sink and Source keys have been materialized and exist in the map. @@ -66,35 +67,30 @@ trait Key { } private[stream] case class MaterializedMapImpl(map: Map[AnyRef, Any]) extends MaterializedMap { - private def failure(keyType: String, key: AnyRef) = new IllegalArgumentException(s"$keyType [$key] doesn't exist in this flow") - - override def get(key: Source[_]): key.MaterializedType = key match { - case _: KeyedSource[_] ⇒ map.get(key) match { - case Some(v) ⇒ v.asInstanceOf[key.MaterializedType] - case None ⇒ throw failure("Source", key) + private def failure(key: KeyedMaterializable[_]) = { + val keyType = key match { + case _: KeyedSource[_, _] ⇒ "Source" + case _: KeyedSink[_, _] ⇒ "Sink" + case _: Key[_] ⇒ "Key" + case _ ⇒ "Unknown" } - case _ ⇒ ().asInstanceOf[key.MaterializedType] + new IllegalArgumentException(s"$keyType key [$key] doesn't exist in this flow") } - override def get(key: Sink[_]): key.MaterializedType = key match { - case _: KeyedSink[_] ⇒ map.get(key) match { + override def get(key: Materializable): key.MaterializedType = key match { + case km: KeyedMaterializable[_] ⇒ map.get(key) match { case Some(v) ⇒ v.asInstanceOf[key.MaterializedType] - case None ⇒ throw failure("Sink", key) + case None ⇒ throw failure(km) } case _ ⇒ ().asInstanceOf[key.MaterializedType] } - override def get(key: Key): key.MaterializedType = map.get(key) match { - case Some(v) ⇒ v.asInstanceOf[key.MaterializedType] - case None ⇒ throw failure("Key", key) - } - override def merge(otherMap: MaterializedMap) = if (map.isEmpty) otherMap else if (otherMap.isEmpty) this else MaterializedMapImpl(map ++ otherMap.iterator) - override def updated(key: AnyRef, value: Any) = MaterializedMapImpl(map.updated(key, value)) + override def updated(key: KeyedMaterializable[_], value: Any) = MaterializedMapImpl(map.updated(key, value)) override def isEmpty = map.isEmpty diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index 652a27f714..de0a2c2f98 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -20,7 +20,7 @@ private[akka] object Pipe { Pipe(List(Ast.DirectProcessor(() ⇒ p().asInstanceOf[Processor[Any, Any]])), Nil) // FIXME #16376 should probably be replaced with an ActorFlowProcessor similar to ActorFlowSource/Sink - private[stream] def apply[In, Out](key: Key)(p: () ⇒ (Processor[In, Out], Any)): Pipe[In, Out] = + private[stream] def apply[In, Out](key: Key[_])(p: () ⇒ (Processor[In, Out], Any)): Pipe[In, Out] = Pipe(List(Ast.DirectProcessorWithKey(() ⇒ p().asInstanceOf[(Processor[Any, Any], Any)], key)), Nil) private[stream] def apply[In, Out](source: SourcePipe[_]): Pipe[In, Out] = @@ -33,7 +33,7 @@ private[akka] object Pipe { /** * Flow with one open input and one open output. */ -private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Flow[In, Out] { +private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key[_]], attributes: OperationAttributes = OperationAttributes.none) extends Flow[In, Out] { override type Repr[+O] = Pipe[In @uncheckedVariance, O] override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = Pipe(ops = attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes @@ -62,7 +62,7 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } - override def withKey(key: Key): Pipe[In, Out] = Pipe(ops, keys :+ key) + override def withKey(key: Key[_]): Pipe[In, Out] = Pipe(ops, keys :+ key) private[stream] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ::: ops, keys ::: pipe.keys) // FIXME raw addition of AstNodes } @@ -70,7 +70,7 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke /** * Pipe with open input and attached output. Can be used as a `Subscriber`. */ -private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNode], keys: List[Key]) extends Sink[In] { +private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNode], keys: List[Key[_]]) extends Sink[In] { private[stream] def withSource(in: Source[In]): RunnablePipe = RunnablePipe(in, output, ops, keys) @@ -81,7 +81,7 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod /** * Pipe with open output and attached input. Can be used as a `Publisher`. */ -private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Source[Out] { +private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key[_]], attributes: OperationAttributes = OperationAttributes.none) extends Source[Out] { override type Repr[+O] = SourcePipe[O] override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes @@ -104,13 +104,13 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As case d: Sink[Out] ⇒ this.withSink(d) } - override def withKey(key: Key): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key) + override def withKey(key: Key[_]): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key) } /** * Pipe with attached input and output, can be executed. */ -private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_], ops: List[AstNode], keys: List[Key]) extends RunnableFlow { +private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_], ops: List[AstNode], keys: List[Key[_]]) extends RunnableFlow { def run()(implicit materializer: FlowMaterializer): MaterializedMap = materializer.materialize(input, output, ops, keys) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 9aacf2c05d..34f5fadaba 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -12,8 +12,7 @@ import akka.stream.FlowMaterializer * A `Sink` is a set of stream processing steps that has one open input and an attached output. * Can be used as a `Subscriber` */ -trait Sink[-In] { - type MaterializedType +trait Sink[-In] extends Materializable { /** * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value @@ -114,4 +113,4 @@ object Sink { * to retrieve in order to access aspects of this sink (could be a completion Future * or a cancellation handle, etc.) */ -trait KeyedSink[-In] extends Sink[In] +trait KeyedSink[-In, M] extends Sink[In] with KeyedMaterializable[M] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 263a3f5d67..2c4cd52bd2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -17,8 +17,7 @@ import akka.stream.FlowMaterializer * A `Source` is a set of stream processing steps that has one open output and an attached input. * Can be used as a `Publisher` */ -trait Source[+Out] extends FlowOps[Out] { - type MaterializedType +trait Source[+Out] extends FlowOps[Out] with Materializable { override type Repr[+O] <: Source[O] /** @@ -78,7 +77,7 @@ trait Source[+Out] extends FlowOps[Out] { * The key can only use other keys if they have been added to the source * before this key. This also includes the keyed source if applicable. */ - def withKey(key: Key): Source[Out] + def withKey(key: Key[_]): Source[Out] /** * Applies given [[OperationAttributes]] to a given section. @@ -210,4 +209,4 @@ object Source { * to retrieve in order to access aspects of this source (could be a Subscriber, a * Future/Promise, etc.). */ -trait KeyedSource[+Out] extends Source[Out] +trait KeyedSource[+Out, M] extends Source[Out] with KeyedMaterializable[M]