From c63b9c801a2dc979fe086c8f9cf665a700d34b50 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 20 Apr 2015 15:03:03 +0200 Subject: [PATCH] =str #16699 fix some FIXMEs --- .../akka/stream/OperationAttributes.scala | 19 ++++++++++++++--- .../impl/ActorFlowMaterializerImpl.scala | 4 ++-- .../akka/stream/impl/ActorProcessor.scala | 1 - .../akka/stream/impl/FanoutProcessor.scala | 2 -- .../akka/stream/impl/FuturePublisher.scala | 2 +- .../scala/akka/stream/impl/fusing/Ops.scala | 2 +- .../main/scala/akka/stream/javadsl/Flow.scala | 6 +++--- .../scala/akka/stream/javadsl/Source.scala | 21 +++++++++++++++---- .../scala/akka/stream/javadsl/japi/Util.scala | 17 --------------- .../stream/javadsl/japi/WithVariance.scala | 10 ++++----- .../akka/stream/scaladsl/FlexiRoute.scala | 1 - 11 files changed, 45 insertions(+), 40 deletions(-) delete mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala diff --git a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index 4007489864..d24d4b24cf 100644 --- a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -62,9 +62,22 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio * INTERNAL API */ private[akka] def nameLifted: Option[String] = - attributes.collect { - case Name(name) ⇒ name - }.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead + if (attributes.isEmpty) + None + else { + val sb = new java.lang.StringBuilder + val iter = attributes.iterator + while (iter.hasNext) { + iter.next() match { + case Name(name) ⇒ + if (sb.length == 0) sb.append(name) + else sb.append("-").append(name) + case _ ⇒ + } + } + if (sb.length == 0) None + else Some(sb.toString) + } /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 95de18ec38..869c22883b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -118,7 +118,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out) case ConcatModule(shape, _) ⇒ - require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // FIXME + require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO (Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out) case zip: ZipWithModule ⇒ @@ -269,7 +269,7 @@ private[akka] object ActorProcessorFactory { case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case SplitWhen(p, _) ⇒ (SplitWhenProcessorImpl.props(settings, p), ()) - case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) //FIXME closes over the materializer, is this good? + case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer), ()) case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) case MaterializingStageFactory(mkStageAndMat, _) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 16e8b838f8..b831845b6e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -248,7 +248,6 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali with ActorLogging with Pump { - // FIXME: make pump a member protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index ed37d664bd..fb3ac2c8a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -79,11 +79,9 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu case SubscribePending ⇒ subscribePending() case RequestMore(subscription, elements) ⇒ - // FIXME can we avoid this cast? moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], elements) pump.pump() case Cancel(subscription) ⇒ - // FIXME can we avoid this cast? unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]]) pump.pump() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index 5fd0d05ab3..7b6141e25f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -142,7 +142,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate } } - override def postStop(): Unit = // FIXME if something blows up, are the subscribers onErrored? + override def postStop(): Unit = if (exposedPublisher ne null) exposedPublisher.shutdown(shutdownReason) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index b74022efb0..1bdbcd25de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -173,7 +173,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective = if (ctx.isFinishing) { val elem = buf.result() - buf.clear() //FIXME null out the reference to the `buf`? + buf.clear() left = n ctx.pushAndFinish(elem) } else ctx.pull() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f1f173a93c..2733bdb052 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -220,7 +220,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * `n` must be positive, otherwise IllegalArgumentException is thrown. */ def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = - new Flow(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step + new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step /** * Similar to `fold` but is not a terminal operation, @@ -246,7 +246,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * IllegalArgumentException is thrown. */ def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = - new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** * Discard the given number of elements at the beginning of the stream. @@ -368,7 +368,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * the element is dropped and the stream and substreams continue. */ def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = - new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step + new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step /** * This operation applies the given predicate to all incoming elements and diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c53d389648..deafabb4a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,6 +3,7 @@ */ package akka.stream.javadsl +import scala.collection.immutable import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } import akka.japi.Util @@ -95,9 +96,21 @@ object Source { * Iterator, but every Subscriber directly attached to the Publisher of this * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as a `Source`. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. */ - def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O, Unit] = - new Source(scaladsl.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable))) + def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O, Unit] = { + // this adapter is not immutable if the the underlying java.lang.Iterable is modified + // but there is not anything we can do to prevent that from happening. + // ConcurrentModificationException will be thrown in some cases. + val scalaIterable = new immutable.Iterable[O] { + import collection.JavaConverters._ + override def iterator: Iterator[O] = iterable.iterator().asScala + } + new Source(scaladsl.Source(scalaIterable)) + } /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -363,7 +376,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @param n must be positive, and `d` must be greater than 0 seconds, otherwise [[IllegalArgumentException]] is thrown. */ def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = - new Source(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** * Discard the given number of elements at the beginning of the stream. @@ -473,7 +486,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * to consume only one of them. */ def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = - new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step + new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step /** * This operation applies the given predicate to all incoming elements and diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala deleted file mode 100644 index 2024476d39..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl.japi - -import scala.collection.immutable - -object Util { - - import collection.JavaConverters._ - // FIXME this does not make something an immutable iterable!! - def immutableIterable[T](iterable: java.lang.Iterable[T]): immutable.Iterable[T] = - new immutable.Iterable[T] { - override def iterator: Iterator[T] = iterable.iterator().asScala - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala index 522ddd2bb4..5b31a67ac1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala @@ -9,7 +9,7 @@ package akka.stream.javadsl.japi /** * A Function interface. Used to create first-class-functions is Java. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Function[-T, +R] { @throws(classOf[Exception]) def apply(param: T): R @@ -18,7 +18,7 @@ trait Function[-T, +R] { /** * A Function interface. Used to create 2-arg first-class-functions is Java. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Function2[-T1, -T2, +R] { @throws(classOf[Exception]) def apply(arg1: T1, arg2: T2): R @@ -27,7 +27,7 @@ trait Function2[-T1, -T2, +R] { /** * A constructor/factory, takes no parameters but creates a new value of type T every call. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Creator[+T] extends Serializable { /** * This method must return a different instance upon every call. @@ -39,7 +39,7 @@ trait Creator[+T] extends Serializable { /** * A Procedure is like a Function, but it doesn't produce a return value. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Procedure[-T] { @throws(classOf[Exception]) def apply(param: T): Unit @@ -48,7 +48,7 @@ trait Procedure[-T] { /** * Java API: Defines a criteria and determines whether the parameter meets this criteria. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Predicate[-T] { def test(param: T): Boolean } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index e16ac52926..b982fba64e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -240,7 +240,6 @@ abstract class FlexiRoute[In, S <: Shape](val shape: S, attributes: OperationAtt case None ⇒ super.toString } - // FIXME what to do about this? override def withAttributes(attr: OperationAttributes): Graph[S, Unit] = throw new UnsupportedOperationException( "withAttributes not supported by default by FlexiRoute, subclass may override and implement it")