diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index ceb8cba01a..7eb7cce5e0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -85,6 +85,12 @@ object DispatchersSpec { queue add handle } } + + // Workaround to narrow the type of unapplySeq of Regex since the unapplySeq(Any) will be removed in Scala 2.13 + case class R(s: String) { + private val r = s.r + def unapplySeq(arg: CharSequence) = r.unapplySeq(arg) + } } class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSender { @@ -118,7 +124,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend def assertMyDispatcherIsUsed(actor: ActorRef): Unit = { actor ! "what's the name?" - val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)".r + val Expected = R("(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)") expectMsgPF() { case Expected(x) ⇒ } @@ -172,7 +178,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for thread-pool-executor" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.thread-pool-dispatcher")) ! "what's the name?" - val Expected = "(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)".r + val Expected = R("(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)") expectMsgPF() { case Expected(x) ⇒ } @@ -180,7 +186,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for default-dispatcher" in { system.actorOf(Props[ThreadNameEcho]) ! "what's the name?" - val Expected = "(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*)".r + val Expected = R("(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*)") expectMsgPF() { case Expected(x) ⇒ } @@ -188,7 +194,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for pinned dispatcher" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher")) ! "what's the name?" - val Expected = "(DispatchersSpec-myapp.my-pinned-dispatcher-[1-9][0-9]*)".r + val Expected = R("(DispatchersSpec-myapp.my-pinned-dispatcher-[1-9][0-9]*)") expectMsgPF() { case Expected(x) ⇒ } @@ -196,7 +202,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for balancing dispatcher" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.balancing-dispatcher")) ! "what's the name?" - val Expected = "(DispatchersSpec-myapp.balancing-dispatcher-[1-9][0-9]*)".r + val Expected = R("(DispatchersSpec-myapp.balancing-dispatcher-[1-9][0-9]*)") expectMsgPF() { case Expected(x) ⇒ } @@ -216,7 +222,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend pool ! Identify(None) val routee = expectMsgType[ActorIdentity].ref.get routee ! "what's the name?" - val Expected = """(DispatchersSpec-akka\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""".r + val Expected = R("""(DispatchersSpec-akka\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""") expectMsgPF() { case Expected(x) ⇒ } @@ -224,7 +230,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "use balancing-pool router with special routees mailbox of deployment config" in { system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "balanced") ! "what's the name?" - val Expected = """(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""".r + val Expected = R("""(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""") expectMsgPF() { case Expected(x) ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index fd520eee22..a604ace16b 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -20,6 +20,7 @@ import java.lang.{ IllegalStateException, ArithmeticException } import java.util.concurrent._ import scala.reflect.{ ClassTag, classTag } import scala.util.{ Failure, Success, Try } +import akka.compat object FutureSpec { @@ -404,7 +405,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "fold" in { - Await.result(Future.fold((1 to 10).toList map { i ⇒ Future(i) })(0)(_ + _), remainingOrDefault) should ===(55) + Await.result(compat.Future.fold((1 to 10).toList map { i ⇒ Future(i) })(0)(_ + _), remainingOrDefault) should ===(55) } "zip" in { @@ -436,7 +437,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa case 6 ⇒ Future(throw new IllegalArgumentException("shouldFoldResultsWithException: expected")) case i ⇒ Future(i) } - intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), remainingOrDefault) }.getMessage should ===("shouldFoldResultsWithException: expected") + intercept[Throwable] { Await.result(compat.Future.fold(futures)(0)(_ + _), remainingOrDefault) }.getMessage should ===("shouldFoldResultsWithException: expected") } } @@ -444,7 +445,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa import scala.collection.mutable.ArrayBuffer def test(testNumber: Int) { val fs = (0 to 1000) map (i ⇒ Future(i)) - val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) { + val f = compat.Future.fold(fs)(ArrayBuffer.empty[AnyRef]) { case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l } @@ -457,12 +458,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "return zero value if folding empty list" in { - Await.result(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) should ===(0) + Await.result(compat.Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) should ===(0) } "reduce results" in { val futures = (1 to 10).toList map { i ⇒ Future(i) } - assert(Await.result(Future.reduce(futures)(_ + _), remainingOrDefault) === 55) + assert(Await.result(compat.Future.reduce(futures)(_ + _), remainingOrDefault) === 55) } "reduce results with Exception" in { @@ -471,20 +472,20 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa case 6 ⇒ Future(throw new IllegalArgumentException("shouldReduceResultsWithException: expected")) case i ⇒ Future(i) } - intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), remainingOrDefault) }.getMessage should ===("shouldReduceResultsWithException: expected") + intercept[Throwable] { Await.result(compat.Future.reduce(futures)(_ + _), remainingOrDefault) }.getMessage should ===("shouldReduceResultsWithException: expected") } } "throw IllegalArgumentException on empty input to reduce" in { filterException[IllegalArgumentException] { - intercept[java.util.NoSuchElementException] { Await.result(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) } + intercept[java.util.NoSuchElementException] { Await.result(compat.Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) } } } - "execute onSuccess when received ask reply" in { + "execute foreach when received ask reply" in { val latch = new TestLatch val actor = system.actorOf(Props[TestActor]) - actor ? "Hello" onSuccess { case "World" ⇒ latch.open() } + actor ? "Hello" foreach { case "World" ⇒ latch.open() } FutureSpec.ready(latch, 5 seconds) system.stop(actor) } @@ -518,12 +519,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val f2 = Future { FutureSpec.ready(latch, 5 seconds); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) - f2 onSuccess { case _ ⇒ throw new ThrowableTest("dispatcher receive") } + f2 foreach { _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open() assert(Await.result(f2, timeout.duration) === "success") f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) - f2 onSuccess { case _ ⇒ throw new ThrowableTest("current thread receive") } + f2 foreach { _ ⇒ throw new ThrowableTest("current thread receive") } assert(Await.result(f3, timeout.duration) === "SUCCESS") } } @@ -675,7 +676,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "perform action on result" in { f { (future, result) ⇒ val p = Promise[Any]() - future.onSuccess { case x ⇒ p.success(x) } + future.foreach { x ⇒ p.success(x) } Await.result(p.future, timeout.duration) should ===(result) } } @@ -715,7 +716,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "perform action on exception" in { f { (future, message) ⇒ val p = Promise[Any]() - future.onFailure { case _ ⇒ p.success(message) } + future.failed.foreach { _ ⇒ p.success(message) } Await.result(p.future, timeout.duration) should ===(message) } } diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 70ef4b1179..9a5838ffa2 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -12,7 +12,6 @@ import java.io._ import scala.concurrent.Await import akka.util.Timeout import scala.concurrent.duration._ -import scala.beans.BeanInfo import com.typesafe.config._ import akka.pattern.ask import org.apache.commons.codec.binary.Hex.encodeHex @@ -44,9 +43,8 @@ object SerializationTests { } """ - @BeanInfo final case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") } - @BeanInfo + final case class Person(name: String, age: Int, address: Address) { def this() = this("", 0, null) } final case class Record(id: Int, person: Person) diff --git a/akka-actor/build.sbt b/akka-actor/build.sbt index e37b72e7b3..025b0daa28 100644 --- a/akka-actor/build.sbt +++ b/akka-actor/build.sbt @@ -5,5 +5,9 @@ Formatting.formatSettings OSGi.actor Dependencies.actor Version.versionSettings +unmanagedSourceDirectories in Compile += { + val ver = scalaVersion.value.take(4) + (scalaSource in Compile).value.getParentFile / s"scala-$ver" +} enablePlugins(spray.boilerplate.BoilerplatePlugin) diff --git a/akka-actor/src/main/scala-2.11/akka/compat/Future.scala b/akka-actor/src/main/scala-2.11/akka/compat/Future.scala new file mode 100644 index 0000000000..8a3e74a20b --- /dev/null +++ b/akka-actor/src/main/scala-2.11/akka/compat/Future.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.compat + +import akka.annotation.InternalApi +import scala.concurrent.{ ExecutionContext, Future ⇒ SFuture } + +/** + * INTERNAL API + * + * Compatibility wrapper for `scala.concurrent.Future` to be able to compile the same code + * against Scala 2.11, 2.12, 2.13 + * + * Remove these classes as soon as support for Scala 2.11 is dropped! + */ +@InternalApi private[akka] object Future { + def fold[T, R](futures: TraversableOnce[SFuture[T]])(zero: R)(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.fold[T, R](futures)(zero)(op)(executor) + + def reduce[T, R >: T](futures: TraversableOnce[SFuture[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.reduce[T, R](futures)(op)(executor) + + def find[T](futures: TraversableOnce[SFuture[T]])(p: T ⇒ Boolean)(implicit executor: ExecutionContext): SFuture[Option[T]] = + SFuture.find[T](futures)(p)(executor) +} diff --git a/akka-actor/src/main/scala-2.12/akka/compat/Future.scala b/akka-actor/src/main/scala-2.12/akka/compat/Future.scala new file mode 100644 index 0000000000..ec5c59eb01 --- /dev/null +++ b/akka-actor/src/main/scala-2.12/akka/compat/Future.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.compat + +import akka.annotation.InternalApi +import scala.concurrent.{ ExecutionContext, Future ⇒ SFuture } +import scala.collection.immutable + +/** + * INTERNAL API + * + * Compatibility wrapper for `scala.concurrent.Future` to be able to compile the same code + * against Scala 2.11, 2.12, 2.13 + * + * Remove these classes as soon as support for Scala 2.11 is dropped! + */ +@InternalApi private[akka] object Future { + def fold[T, R](futures: TraversableOnce[SFuture[T]])(zero: R)(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.fold[T, R](futures)(zero)(op)(executor) + + def fold[T, R](futures: immutable.Iterable[SFuture[T]])(zero: R)(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.foldLeft[T, R](futures)(zero)(op)(executor) + + def reduce[T, R >: T](futures: TraversableOnce[SFuture[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.reduce[T, R](futures)(op)(executor) + + def reduce[T, R >: T](futures: immutable.Iterable[SFuture[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.reduceLeft[T, R](futures)(op)(executor) + + def find[T](futures: TraversableOnce[SFuture[T]])(p: T ⇒ Boolean)(implicit executor: ExecutionContext): SFuture[Option[T]] = + SFuture.find[T](futures)(p)(executor) + + def find[T](futures: immutable.Iterable[SFuture[T]])(p: T ⇒ Boolean)(implicit executor: ExecutionContext): SFuture[Option[T]] = + SFuture.find[T](futures)(p)(executor) +} + diff --git a/akka-actor/src/main/scala-2.13/akka/compat/Future.scala b/akka-actor/src/main/scala-2.13/akka/compat/Future.scala new file mode 100644 index 0000000000..b3f4a66170 --- /dev/null +++ b/akka-actor/src/main/scala-2.13/akka/compat/Future.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.compat + +import akka.annotation.InternalApi +import scala.concurrent.{ ExecutionContext, Future ⇒ SFuture } +import scala.collection.immutable + +/** + * INTERNAL API + * + * Compatibility wrapper for `scala.concurrent.Future` to be able to compile the same code + * against Scala 2.11, 2.12, 2.13 + * + * Remove these classes as soon as support for Scala 2.11 is dropped! + */ +@InternalApi private[akka] object Future { + def fold[T, R](futures: TraversableOnce[SFuture[T]])(zero: R)(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = { + // This will have performance implications since the elements are copied to a Vector + SFuture.foldLeft[T, R](futures.to[immutable.Iterable])(zero)(op)(executor) + } + + def fold[T, R](futures: immutable.Iterable[SFuture[T]])(zero: R)(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.foldLeft[T, R](futures)(zero)(op)(executor) + + def reduce[T, R >: T](futures: TraversableOnce[SFuture[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = { + // This will have performance implications since the elements are copied to a Vector + SFuture.reduceLeft[T, R](futures.to[immutable.Iterable])(op)(executor) + } + + def reduce[T, R >: T](futures: immutable.Iterable[SFuture[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): SFuture[R] = + SFuture.reduceLeft[T, R](futures)(op)(executor) + + def find[T](futures: TraversableOnce[SFuture[T]])(p: T ⇒ Boolean)(implicit executor: ExecutionContext): SFuture[Option[T]] = { + // This will have performance implications since the elements are copied to a Vector + SFuture.find[T](futures.to[immutable.Iterable])(p)(executor) + } + + def find[T](futures: immutable.Iterable[SFuture[T]])(p: T ⇒ Boolean)(implicit executor: ExecutionContext): SFuture[Option[T]] = + SFuture.find[T](futures)(p)(executor) +} diff --git a/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala b/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala index ee3a656015..8357971965 100644 --- a/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala @@ -92,17 +92,16 @@ class LightArrayRevolverScheduler( delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { checkMaxDelay(roundUp(delay).toNanos) - val preparedEC = executor.prepare() try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self ⇒ compareAndSet(InitialRepeatMarker, schedule( - preparedEC, + executor, new AtomicLong(clock() + initialDelay.toNanos) with Runnable { override def run(): Unit = { try { runnable.run() val driftNanos = clock() - getAndAdd(delay.toNanos) if (self.get != null) - swap(schedule(preparedEC, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) + swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) } catch { case _: SchedulerException ⇒ // ignore failure to enqueue or terminated target actor } @@ -132,7 +131,7 @@ class LightArrayRevolverScheduler( } override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = - try schedule(executor.prepare(), runnable, roundUp(delay)) + try schedule(executor, runnable, roundUp(delay)) catch { case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index dbff480f92..2435afb243 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -13,6 +13,8 @@ import java.util.concurrent.{ Executor, ExecutorService, Callable } import scala.util.{ Try, Success, Failure } import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture +import scala.collection.immutable +import akka.compat /** * ExecutionContexts is the Java API for ExecutionContexts @@ -127,7 +129,7 @@ object Futures { */ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = { implicit val ec = executor - Future.find[T](futures.asScala)(predicate.apply(_))(executor) map JOption.fromScalaOption + compat.Future.find[T](futures.asScala)(predicate.apply(_))(executor) map JOption.fromScalaOption } /** @@ -143,13 +145,13 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] = - Future.fold(futures.asScala)(zero)(fun.apply)(executor) + compat.Future.fold(futures.asScala)(zero)(fun.apply)(executor) /** * Reduces the results of the supplied futures and binary function. */ def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] = - Future.reduce[T, R](futures.asScala)(fun.apply)(executor) + compat.Future.reduce[T, R](futures.asScala)(fun.apply)(executor) /** * Simple version of [[#traverse]]. Transforms a JIterable[Future[A]] into a Future[JIterable[A]]. diff --git a/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala b/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala index ad5c7ad90d..d59539e624 100644 --- a/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala +++ b/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala @@ -15,7 +15,7 @@ class SimpleDnsCache extends Dns with PeriodicCacheCleanup { private val cache = new AtomicReference(new Cache( immutable.SortedSet()(ExpiryEntryOrdering), - immutable.Map(), clock)) + immutable.Map(), clock _)) private val nanoBase = System.nanoTime() diff --git a/akka-bench-jmh/src/main/scala/akka/BenchRunner.scala b/akka-bench-jmh/src/main/scala/akka/BenchRunner.scala index 44e5856ecb..534b6c05ac 100644 --- a/akka-bench-jmh/src/main/scala/akka/BenchRunner.scala +++ b/akka-bench-jmh/src/main/scala/akka/BenchRunner.scala @@ -6,7 +6,7 @@ import org.openjdk.jmh.runner.options.CommandLineOptions object BenchRunner { def main(args: Array[String]) = { - import scala.collection.JavaConversions._ + import scala.collection.JavaConverters._ val args2 = args.toList.flatMap { case "quick" => "-i 1 -wi 1 -f1 -t1".split(" ").toList @@ -18,9 +18,9 @@ object BenchRunner { val opts = new CommandLineOptions(args2: _*) val results = new Runner(opts).run() - val report = results.map { result: RunResult ⇒ + val report = results.asScala.map { result: RunResult ⇒ val bench = result.getParams.getBenchmark - val params = result.getParams.getParamsKeys.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_") + val params = result.getParams.getParamsKeys.asScala.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_") val score = result.getAggregatedResult.getPrimaryResult.getScore.round val unit = result.getAggregatedResult.getPrimaryResult.getScoreUnit s"\t${bench}_${params}\t$score\t$unit" diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala index 014d6abf4f..1687a2cb39 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -10,7 +10,7 @@ import akka.actor.ActorSystem import akka.stream.scaladsl._ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ -import scala.concurrent.Lock +import java.util.concurrent.Semaphore import scala.util.Success import akka.stream.impl.fusing.GraphStages import org.reactivestreams._ @@ -119,7 +119,7 @@ class FlowMapBenchmark { @Benchmark @OperationsPerInvocation(100000) def flow_map_100k_elements(): Unit = { - val lock = new Lock() // todo rethink what is the most lightweight way to await for a streams completion + val lock = new Semaphore(1) // todo rethink what is the most lightweight way to await for a streams completion lock.acquire() flow.runWith(Sink.onComplete(_ ⇒ lock.release()))(materializer) diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala index 7219f644b2..99cc7f708e 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala @@ -78,8 +78,8 @@ private[camel] class CamelExchangeAdapter(val exchange: Exchange) { * in the exchange. */ def toAkkaCamelException(headers: Map[String, Any]): AkkaCamelException = { - import scala.collection.JavaConversions._ - new AkkaCamelException(exchange.getException, headers ++ response.getHeaders) + import scala.collection.JavaConverters._ + new AkkaCamelException(exchange.getException, headers ++ response.getHeaders.asScala) } /** @@ -94,8 +94,8 @@ private[camel] class CamelExchangeAdapter(val exchange: Exchange) { * in the Camel message. */ def toFailureResult(headers: Map[String, Any]): FailureResult = { - import scala.collection.JavaConversions._ - FailureResult(exchange.getException, headers ++ response.getHeaders) + import scala.collection.JavaConverters._ + FailureResult(exchange.getException, headers ++ response.getHeaders.asScala) } /** diff --git a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala index 1ee0d5f8ec..6fbf40b4de 100644 --- a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala @@ -133,8 +133,8 @@ class Registrar(val start: Int, val number: Int, activationsPromise: Promise[Lis actorRefs.foreach { aref ⇒ context.stop(aref) val result = camel.deactivationFutureFor(aref) - result.onFailure { - case e ⇒ log.error("deactivationFutureFor {} failed: {}", aref, e.getMessage) + result.failed.foreach { + e ⇒ log.error("deactivationFutureFor {} failed: {}", aref, e.getMessage) } deActivations += result if (deActivations.size == number * 2) { @@ -147,8 +147,8 @@ class Registrar(val start: Int, val number: Int, activationsPromise: Promise[Lis val ref = context.actorOf(Props(actor), name) actorRefs = actorRefs + ref val result = camel.activationFutureFor(ref) - result.onFailure { - case e ⇒ log.error("activationFutureFor {} failed: {}", ref, e.getMessage) + result.failed.foreach { + e ⇒ log.error("activationFutureFor {} failed: {}", ref, e.getMessage) } activations += result } diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala index ac056f31f6..78f03929ee 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala @@ -78,7 +78,7 @@ trait ReceivePipeline extends Actor { val zipped = pipeline.foldRight(innerReceiveHandler) { (outerInterceptor, innerHandler) ⇒ outerInterceptor.andThen { case Inner(msg) ⇒ innerHandler(msg) - case InnerAndAfter(msg, after) ⇒ try innerHandler(msg) finally after() + case InnerAndAfter(msg, after) ⇒ try innerHandler(msg) finally after(()) case HandledCompletely ⇒ Done } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 8d713e4633..b69005d344 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -549,7 +549,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) def ormapToProto(ormap: ORMap[_, _]): rd.ORMap = { val ormapBuilder = rd.ORMap.newBuilder() - val entries: jl.Iterable[rd.ORMap.Entry] = getEntries(ormap.values, rd.ORMap.Entry.newBuilder, otherMessageToProto) + val entries: jl.Iterable[rd.ORMap.Entry] = getEntries(ormap.values, rd.ORMap.Entry.newBuilder _, otherMessageToProto) ormapBuilder.setKeys(orsetToProto(ormap.keys)).addAllEntries(entries).build() } @@ -731,7 +731,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) def lwwmapToProto(lwwmap: LWWMap[_, _]): rd.LWWMap = { val lwwmapBuilder = rd.LWWMap.newBuilder() - val entries: jl.Iterable[rd.LWWMap.Entry] = getEntries(lwwmap.underlying.entries, rd.LWWMap.Entry.newBuilder, lwwRegisterToProto) + val entries: jl.Iterable[rd.LWWMap.Entry] = getEntries(lwwmap.underlying.entries, rd.LWWMap.Entry.newBuilder _, lwwRegisterToProto) lwwmapBuilder.setKeys(orsetToProto(lwwmap.underlying.keys)).addAllEntries(entries).build() } @@ -747,7 +747,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = { val pncountermapBuilder = rd.PNCounterMap.newBuilder() - val entries: jl.Iterable[rd.PNCounterMap.Entry] = getEntries(pncountermap.underlying.entries, rd.PNCounterMap.Entry.newBuilder, pncounterToProto) + val entries: jl.Iterable[rd.PNCounterMap.Entry] = getEntries(pncountermap.underlying.entries, rd.PNCounterMap.Entry.newBuilder _, pncounterToProto) pncountermapBuilder.setKeys(orsetToProto(pncountermap.underlying.keys)).addAllEntries(entries).build() } @@ -763,7 +763,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = { val ormultimapBuilder = rd.ORMultiMap.newBuilder() - val entries: jl.Iterable[rd.ORMultiMap.Entry] = getEntries(multimap.underlying.entries, rd.ORMultiMap.Entry.newBuilder, orsetToProto) + val entries: jl.Iterable[rd.ORMultiMap.Entry] = getEntries(multimap.underlying.entries, rd.ORMultiMap.Entry.newBuilder _, orsetToProto) ormultimapBuilder.setKeys(orsetToProto(multimap.underlying.keys)).addAllEntries(entries) if (multimap.withValueDeltas) ormultimapBuilder.setWithValueDeltas(true) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 974e5901f5..04e0d98847 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -242,7 +242,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode)) - cmdFuture onSuccess { + cmdFuture foreach { case true ⇒ self ! ToServer(Done) case _ ⇒ throw new RuntimeException("Throttle was requested from the TestConductor, but no transport " + "adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig") diff --git a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala index 5d65d33d8f..f344cc4fa2 100644 --- a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala +++ b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala @@ -5,7 +5,7 @@ package akka.osgi import de.kalpatec.pojosr.framework.launch.{ BundleDescriptor, PojoServiceRegistryFactory, ClasspathScanner } -import scala.collection.JavaConversions.seqAsJavaList +import scala.collection.JavaConverters._ import org.apache.commons.io.IOUtils.copy import org.osgi.framework._ @@ -40,7 +40,7 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { System.setProperty("org.osgi.framework.storage", "target/akka-osgi/" + UUID.randomUUID().toString) val bundles = new ClasspathScanner().scanForBundles() - bundles.addAll(testBundles) + bundles.addAll(testBundles.asJava) config.put(PojoServiceRegistryFactory.BUNDLE_DESCRIPTORS, bundles) val oldErr = System.err diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index ba47643c08..2f061e0111 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -150,8 +150,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { highSeqNr ⇒ RecoverySuccess(highSeqNr) }.recover { case e ⇒ ReplayMessagesFailure(e) - }.pipeTo(replyTo).onSuccess { - case _ ⇒ if (publish) context.system.eventStream.publish(r) + }.pipeTo(replyTo).foreach { + _ ⇒ if (publish) context.system.eventStream.publish(r) } case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 561c8b6777..b00838403c 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString.UTF_8 import akka.util.OptionVal +import scala.collection.immutable /** * INTERNAL API @@ -575,10 +576,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val accepting: Receive = { case ManagementCommand(cmd) ⇒ - val allStatuses = transportMapping.values map { transport ⇒ - transport.managementCommand(cmd) - } - Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender() + val allStatuses: immutable.Seq[Future[Boolean]] = + transportMapping.values.map(transport ⇒ transport.managementCommand(cmd))(scala.collection.breakOut) + akka.compat.Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender() case Quarantine(address, uidToQuarantineOption) ⇒ // Stop writers diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 45d2c13093..b06c2ecbc4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -761,9 +761,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // tear down the upstream hub part if downstream lane fails // lanes are not completed with success by themselves so we don't have to care about onSuccess - completed.onFailure { - case reason: Throwable ⇒ hubKillSwitch.abort(reason) - } + completed.failed.foreach { reason ⇒ hubKillSwitch.abort(reason) } (resourceLife, compressionAccess, completed) } @@ -788,7 +786,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { implicit val ec = materializer.executionContext - streamCompleted.onFailure { + streamCompleted.failed.foreach { case ShutdownSignal ⇒ // shutdown as expected case _: AeronTerminated ⇒ // shutdown already in progress case cause if isShutdown ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 6d30a841bc..c3d39c41d8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -616,10 +616,10 @@ private[remote] class Association( val completed = Future.sequence(laneCompletedValues).flatMap(_ ⇒ aeronSinkCompleted) // tear down all parts if one part fails or completes - completed.onFailure { - case reason: Throwable ⇒ streamKillSwitch.abort(reason) + completed.failed.foreach { + reason ⇒ streamKillSwitch.abort(reason) } - (laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ streamKillSwitch.shutdown() }) + (laneCompletedValues :+ aeronSinkCompleted).foreach(_.foreach { _ ⇒ streamKillSwitch.shutdown() }) queueValues.zip(wrappers).zipWithIndex.foreach { case ((q, w), i) ⇒ @@ -675,7 +675,7 @@ private[remote] class Association( } implicit val ec = materializer.executionContext - streamCompleted.onFailure { + streamCompleted.failed.foreach { case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected // countDown the latch in case threads are waiting on the latch in outboundControlIngress method diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index a54ff4b139..05aad2cacf 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -302,12 +302,12 @@ private[remote] object Decoder { * External call from ChangeInboundCompression materialized value */ override def runNextActorRefAdvertisement(): Unit = - runNextActorRefAdvertisementCb.invoke() + runNextActorRefAdvertisementCb.invoke(()) /** * External call from ChangeInboundCompression materialized value */ override def runNextClassManifestAdvertisement(): Unit = - runNextClassManifestAdvertisementCb.invoke() + runNextClassManifestAdvertisementCb.invoke(()) } private[remote] class AccessInboundCompressionFailed diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 9f00b6f22d..394197ae4e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -81,11 +81,11 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor listenAddress: Address, listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener] = { log.warning("FailureInjectorTransport is active on this system. Gremlins might munch your packets.") - listenerFuture.onSuccess { + listenerFuture.foreach { // Side effecting: As this class is not an actor, the only way to safely modify state is through volatile vars. // Listen is called only during the initialization of the stack, and upstreamListener is not read before this // finishes. - case listener: AssociationEventListener ⇒ upstreamListener = Some(listener) + listener ⇒ upstreamListener = Some(listener) } Future.successful(this) } @@ -151,8 +151,8 @@ private[remote] final case class FailureInjectorHandle( @volatile private var upstreamListener: HandleEventListener = null override val readHandlerPromise: Promise[HandleEventListener] = Promise() - readHandlerPromise.future.onSuccess { - case listener: HandleEventListener ⇒ + readHandlerPromise.future.foreach { + listener ⇒ upstreamListener = listener wrappedHandle.readHandlerPromise.success(this) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index d1addab50f..ecc19c6241 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -179,8 +179,8 @@ private[netty] trait CommonHandlers extends NettyHelpers { NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), None) match { case Some(localAddress) ⇒ val handle = createHandle(channel, localAddress, remoteAddress) - handle.readHandlerPromise.future.onSuccess { - case listener: HandleEventListener ⇒ + handle.readHandlerPromise.future.foreach { + listener ⇒ registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress]) channel.setReadable(true) } @@ -203,8 +203,8 @@ private[netty] abstract class ServerHandler( final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { channel.setReadable(false) - associationListenerFuture.onSuccess { - case listener: AssociationEventListener ⇒ + associationListenerFuture.foreach { + listener ⇒ val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier, transport.system.name, hostName = None, port = None).getOrElse( throw new NettyTransportException(s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]")) @@ -432,7 +432,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") } localAddress = address - associationListenerPromise.future.onSuccess { case listener ⇒ newServerChannel.setReadable(true) } + associationListenerPromise.future.foreach { _ ⇒ newServerChannel.setReadable(true) } (address, associationListenerPromise) case None ⇒ throw new NettyTransportException(s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") } @@ -470,8 +470,8 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA readyChannel.getRemoteAddress match { case addr: InetSocketAddress ⇒ val handle = new UdpAssociationHandle(localAddress, remoteAddress, readyChannel, NettyTransport.this) - handle.readHandlerPromise.future.onSuccess { - case listener ⇒ udpConnectionTable.put(addr, listener) + handle.readHandlerPromise.future.foreach { + listener ⇒ udpConnectionTable.put(addr, listener) } handle case unknown ⇒ throw new NettyTransportException(s"Unknown outbound remote address type [${unknown.getClass.getName}]") diff --git a/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala index d6d8415b82..315fb9e9eb 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala @@ -8,7 +8,7 @@ import com.typesafe.config.ConfigFactory import org.scalatest.FlatSpec import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.Set import scala.concurrent.duration._ import scala.language.postfixOps @@ -38,7 +38,7 @@ class RemoteInitErrorSpec extends FlatSpec with Matchers { def currentThreadIds(): Set[Long] = { val threads = Thread.getAllStackTraces().keySet() - threads.collect({ case t: Thread if (!t.isDaemon()) ⇒ t.getId() }) + threads.asScala.collect({ case t: Thread if (!t.isDaemon()) ⇒ t.getId() }) } "Remoting" must "shut down properly on RemoteActorRefProvider initialization failure" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index 605880e838..6122d921c9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -18,8 +18,8 @@ class FlowForeachSpec extends StreamSpec { "A Foreach" must { "call the procedure for each element" in assertAllStagesStopped { - Source(1 to 3).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" + Source(1 to 3).runForeach(testActor ! _) foreach { + _ ⇒ testActor ! "done" } expectMsg(1) expectMsg(2) @@ -28,16 +28,16 @@ class FlowForeachSpec extends StreamSpec { } "complete the future for an empty stream" in assertAllStagesStopped { - Source.empty[String].runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" + Source.empty[String].runForeach(testActor ! _) foreach { + _ ⇒ testActor ! "done" } expectMsg("done") } "yield the first error" in assertAllStagesStopped { val p = TestPublisher.manualProbe[Int]() - Source.fromPublisher(p).runForeach(testActor ! _) onFailure { - case ex ⇒ testActor ! ex + Source.fromPublisher(p).runForeach(testActor ! _).failed foreach { + ex ⇒ testActor ! ex } val proc = p.expectSubscription() proc.expectRequest() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 9cc37f01cc..f12188713d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -109,7 +109,7 @@ class QueueSinkSpec extends StreamSpec { sub.sendComplete() Await.result(queue.pull(), noMsgTimeout) should be(None) - queue.pull().onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + queue.pull().failed.foreach { e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } } "keep on sending even after the buffer has been full" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 8c75abf413..2687ef82cc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -215,7 +215,7 @@ class QueueSourceSpec extends StreamSpec { sub.cancel() expectMsg(Done) - queue.offer(1).onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + queue.offer(1).failed.foreach { e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } } "not share future across materializations" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index cf261c70ef..6489390aef 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -243,7 +243,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "fail if getting the supplier fails" in { def failedSupplier(): Supplier[Array[Int]] = throw TE("") val future = Source(1 to 100).runWith(StreamConverters.javaCollector( - () ⇒ new TestCollector(failedSupplier, accumulator, combiner, finisher))) + () ⇒ new TestCollector(failedSupplier _, accumulator _, combiner _, finisher _))) a[TE] shouldBe thrownBy { Await.result(future, 300.millis) } @@ -254,7 +254,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { override def get(): Array[Int] = throw TE("") } val future = Source(1 to 100).runWith(StreamConverters.javaCollector( - () ⇒ new TestCollector(failedSupplier, accumulator, combiner, finisher))) + () ⇒ new TestCollector(failedSupplier _, accumulator _, combiner _, finisher _))) a[TE] shouldBe thrownBy { Await.result(future, 300.millis) } @@ -264,7 +264,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { def failedAccumulator(): BiConsumer[Array[Int], Int] = throw TE("") val future = Source(1 to 100).runWith(StreamConverters.javaCollector( - () ⇒ new TestCollector(supplier, failedAccumulator, combiner, finisher))) + () ⇒ new TestCollector(supplier _, failedAccumulator _, combiner _, finisher _))) a[TE] shouldBe thrownBy { Await.result(future, 300.millis) } @@ -276,7 +276,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } val future = Source(1 to 100).runWith(StreamConverters.javaCollector( - () ⇒ new TestCollector(supplier, failedAccumulator, combiner, finisher))) + () ⇒ new TestCollector(supplier _, failedAccumulator _, combiner _, finisher _))) a[TE] shouldBe thrownBy { Await.result(future, 300.millis) } @@ -286,7 +286,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { def failedFinisher(): function.Function[Array[Int], Int] = throw TE("") val future = Source(1 to 100).runWith(StreamConverters.javaCollector( - () ⇒ new TestCollector(supplier, accumulator, combiner, failedFinisher))) + () ⇒ new TestCollector(supplier _, accumulator _, combiner _, failedFinisher _))) a[TE] shouldBe thrownBy { Await.result(future, 300.millis) } @@ -297,7 +297,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { override def apply(a: Array[Int]): Int = throw TE("") } val future = Source(1 to 100).runWith(StreamConverters.javaCollector( - () ⇒ new TestCollector(supplier, accumulator, combiner, failedFinisher))) + () ⇒ new TestCollector(supplier _, accumulator _, combiner _, failedFinisher _))) a[TE] shouldBe thrownBy { Await.result(future, 300.millis) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 2af88c8663..ef9410378c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -79,8 +79,8 @@ import scala.concurrent.{ ExecutionContext, Promise } try { requireNonNullSubscriber(subscriber) tryOnSubscribe(subscriber, new MaybeSubscription(subscriber)) - promise.future onFailure { - case error ⇒ tryOnError(subscriber, error) + promise.future.failed.foreach { + error ⇒ tryOnError(subscriber, error) } } catch { case sv: SpecViolation ⇒ ec.reportFailure(sv) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 89082d92c3..d833e0b8e0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -293,8 +293,8 @@ import scala.util.control.NonFatal } } - private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.onSuccess { - case resource ⇒ f(resource) + private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.foreach { + resource ⇒ f(resource) } val errorHandler: PartialFunction[Throwable, Unit] = { @@ -337,7 +337,7 @@ import scala.util.control.NonFatal resource = Promise[S]() createStream(true) }) - private def closeStage(): Unit = closeAndThen(completeStage) + private def closeStage(): Unit = closeAndThen(completeStage _) } override def toString = "UnfoldResourceSourceAsync" diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index d1ed5bd749..d2288d7848 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -23,7 +23,7 @@ import akka.stream.impl.PublisherSource import akka.stream.impl.CancellingSubscriber import akka.stream.impl.{ Buffer ⇒ BufferImpl } -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * INTERNAL API @@ -245,13 +245,13 @@ import scala.collection.JavaConversions._ private def tryCompleteAll(): Boolean = if (activeSubstreamsMap.isEmpty || (!hasNextElement && firstPushCounter == 0)) { - for (value ← activeSubstreamsMap.values()) value.complete() + for (value ← activeSubstreamsMap.values().asScala) value.complete() completeStage() true } else false private def fail(ex: Throwable): Unit = { - for (value ← activeSubstreamsMap.values()) value.fail(ex) + for (value ← activeSubstreamsMap.values().asScala) value.fail(ex) failStage(ex) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index d34a0998cf..73a801bce4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -395,7 +395,7 @@ object Source { read: function.Function[S, Optional[T]], close: function.Procedure[S]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.unfoldResource[T, S]( - create.create, + create.create _, (s: S) ⇒ read.apply(s).asScala, close.apply)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index fa02d3836e..71cba30fdf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -175,7 +175,7 @@ object StreamConverters { * and the rest of flow. */ def fromJavaStream[O, S <: java.util.stream.BaseStream[O, S]](stream: function.Creator[java.util.stream.BaseStream[O, S]]): javadsl.Source[O, NotUsed] = - new Source(scaladsl.StreamConverters.fromJavaStream(stream.create)) + new Source(scaladsl.StreamConverters.fromJavaStream(stream.create _)) /** * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector`` diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 52268d8684..bdc5a2579f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -736,7 +736,7 @@ object Zip { * * '''Cancels when''' downstream cancels */ -final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Pair.apply) { +final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply) { override def toString = "Zip" } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index e7fcbdab0a..4659917dc8 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -654,7 +654,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * handler upon receiving the `onPush()` signal (before invoking the `andThen` function). */ final protected def read[T](in: Inlet[T], andThen: Procedure[T], onClose: Effect): Unit = { - read(in)(andThen.apply, onClose.apply) + read(in)(andThen.apply, onClose.apply _) } /** diff --git a/akka-typed/src/test/scala/akka/typed/AskSpec.scala b/akka-typed/src/test/scala/akka/typed/AskSpec.scala index 39fb3b3fd8..4a124e287d 100644 --- a/akka-typed/src/test/scala/akka/typed/AskSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/AskSpec.scala @@ -37,7 +37,7 @@ class AskSpec extends TypedSpec with ScalaFutures { foo.replyTo ! "foo" Same case Stop(r) ⇒ - r ! () + r ! (()) Stopped }