diff --git a/akka-actor/src/main/java/akka/japi/JAPI.java b/akka-actor/src/main/java/akka/japi/JAPI.java index 4c040220f3..886a0ea3d7 100644 --- a/akka-actor/src/main/java/akka/japi/JAPI.java +++ b/akka-actor/src/main/java/akka/japi/JAPI.java @@ -4,6 +4,7 @@ import scala.collection.Seq; public class JAPI { + @SafeVarargs public static Seq seq(T... ts) { return Util.immutableSeq(ts); } diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/FormData.java b/akka-http-core/src/main/java/akka/http/javadsl/model/FormData.java index 46c0b22843..1564d8097f 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/FormData.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/FormData.java @@ -40,6 +40,7 @@ public final class FormData { /** * Creates the FormData from the given parameters. */ + @SafeVarargs public static FormData create(Pair... params) { return new FormData(Query.create(params)); } diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/Query.java b/akka-http-core/src/main/java/akka/http/javadsl/model/Query.java index 6a1472de98..74e3d1141f 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/Query.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/Query.java @@ -94,6 +94,7 @@ public abstract class Query { /** * Returns a Query from the given parameters. */ + @SafeVarargs public static Query create(Pair... params) { return new JavaQuery(UriJavaAccessor.queryApply(params)); } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/EnhancedInetSocketAddress.scala b/akka-http-core/src/main/scala/akka/http/impl/util/EnhancedInetSocketAddress.scala deleted file mode 100644 index ee00e6c833..0000000000 --- a/akka-http-core/src/main/scala/akka/http/impl/util/EnhancedInetSocketAddress.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.http.impl.util - -import java.lang.reflect.{ InvocationTargetException, Method } -import java.net.InetSocketAddress - -import scala.util.control.NonFatal - -/** - * Provides getHostString support for Java 6. - * - * TODO: can be removed once support for Java 6 is dropped. - * - * Internal API - */ -private[http] class EnhancedInetSocketAddress(val address: InetSocketAddress) extends AnyVal { - /** - * Retrieve the original host string that was given (IP or DNS name) if the current JDK has - * a `getHostString` method with the right signature that can be made accessible. - * - * This avoids a reverse DNS query from calling getHostName() if the original host string is an IP address. - * If the reflective call doesn't work it falls back to getHostName. - */ - def getHostStringJava6Compatible: String = EnhancedInetSocketAddress.getHostStringFunction(address) -} - -/** - * Internal API - */ -private[http] object EnhancedInetSocketAddress { - private[http] val getHostStringFunction: InetSocketAddress ⇒ String = { - def fallbackToGetHostName = (_: InetSocketAddress).getHostName - def callReflectively(m: Method) = - (address: InetSocketAddress) ⇒ - try m.invoke(address).asInstanceOf[String] - catch { - case ite: InvocationTargetException ⇒ throw ite.getTargetException - } - - try { - val m = classOf[InetSocketAddress].getDeclaredMethod("getHostString") - - val candidate = - if (m.getReturnType == classOf[String] && m.getParameterTypes.isEmpty) { - if (!m.isAccessible) m.setAccessible(true) - callReflectively(m) - } else fallbackToGetHostName - - // probe so that we can be sure a reflective problem only turns up once - // here during construction - candidate(new InetSocketAddress("127.0.0.1", 80)) - candidate - } catch { - case NonFatal(_) ⇒ fallbackToGetHostName - } - } -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/Java6Compat.scala b/akka-http-core/src/main/scala/akka/http/impl/util/Java6Compat.scala deleted file mode 100644 index 2d3fb3eb19..0000000000 --- a/akka-http-core/src/main/scala/akka/http/impl/util/Java6Compat.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.http.impl.util - -import java.lang.reflect.{ InvocationTargetException, Method } -import javax.net.ssl.SSLParameters - -import scala.util.control.NonFatal - -/** - * INTERNAL API - * - * Enables accessing SslParameters even if compiled against Java 6. - */ -private[http] object Java6Compat { - - def isJava6: Boolean = - System.getProperty("java.version").take(4) match { - case "1.6." ⇒ true - case _ ⇒ false - } - - /** - * Returns true if setting the algorithm was successful. - */ - def trySetEndpointIdentificationAlgorithm(parameters: SSLParameters, algorithm: String): Boolean = - setEndpointIdentificationAlgorithmFunction(parameters, algorithm) - - private[this] val setEndpointIdentificationAlgorithmFunction: (SSLParameters, String) ⇒ Boolean = { - def unsupported: (SSLParameters, String) ⇒ Boolean = (_, _) ⇒ false - - def callReflectively(m: Method) = - (params: SSLParameters, algorithm: String) ⇒ - try { - m.invoke(params, algorithm) - true - } catch { - case ite: InvocationTargetException ⇒ throw ite.getTargetException - } - - try { - val m = classOf[SSLParameters].getMethod("setEndpointIdentificationAlgorithm", classOf[java.lang.String]) - - val candidate = - if (m.getReturnType == Void.TYPE && m.getParameterTypes.toSeq == Seq(classOf[java.lang.String])) { - if (!m.isAccessible) m.setAccessible(true) - callReflectively(m) - } else unsupported - - // probe so that we can be sure a reflective problem only turns up once - // here during construction - candidate(new SSLParameters(), "https") - candidate - } catch { - case NonFatal(_) ⇒ unsupported - } - } -} diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/SettingsCompanion.scala b/akka-http-core/src/main/scala/akka/http/impl/util/SettingsCompanion.scala index 0b17652f46..7091fd4de8 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/SettingsCompanion.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/SettingsCompanion.scala @@ -49,7 +49,7 @@ private[http] abstract class SettingsCompanion[T](protected val prefix: String) private[http] object SettingsCompanion { lazy val configAdditions: Config = { val localHostName = - try new InetSocketAddress(InetAddress.getLocalHost, 80).getHostStringJava6Compatible + try new InetSocketAddress(InetAddress.getLocalHost, 80).getHostString catch { case NonFatal(_) ⇒ "" } ConfigFactory.parseMap(Map("akka.http.hostname" -> localHostName).asJava) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 1e69fa95b1..71afba836b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -4,12 +4,9 @@ package akka.http.impl -import java.net.InetSocketAddress - import language.implicitConversions import language.higherKinds import java.nio.charset.Charset -import java.util.concurrent.atomic.AtomicInteger import com.typesafe.config.Config import akka.stream.scaladsl.{ Flow, Source } import akka.stream.stage._ @@ -18,7 +15,6 @@ import scala.concurrent.{ Await, Future } import scala.reflect.ClassTag import scala.util.{ Failure, Success } import scala.util.matching.Regex -import akka.event.LoggingAdapter import akka.util.ByteString import akka.actor._ @@ -40,8 +36,6 @@ package object util { private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config) private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s) private[http] implicit def enhanceRegex(regex: Regex): EnhancedRegex = new EnhancedRegex(regex) - private[http] implicit def enhanceInetSocketAddress(address: InetSocketAddress): EnhancedInetSocketAddress = - new EnhancedInetSocketAddress(address) private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce = new EnhancedByteStringTraversableOnce(byteStrings) private[http] implicit def enhanceByteStringsMat[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] = @@ -75,7 +69,7 @@ package object util { private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = { synchronized { if (eventStreamLogger == null) - eventStreamLogger = system.actorOf(Props[util.EventStreamLogger].withDeploy(Deploy.local), name = "event-stream-logger") + eventStreamLogger = system.actorOf(Props[util.EventStreamLogger]().withDeploy(Deploy.local), name = "event-stream-logger") } system.eventStream.subscribe(eventStreamLogger, channel) } @@ -178,6 +172,4 @@ package util { } } } - - private[http] class ReadTheDocumentationException(message: String) extends RuntimeException(message) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 4a1edf67fb..67fbc939cf 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -16,7 +16,7 @@ import akka.http.impl.engine.HttpConnectionTimeoutException import akka.http.impl.engine.client._ import akka.http.impl.engine.server._ import akka.http.impl.engine.ws.WebsocketClientBlueprint -import akka.http.impl.util.{ Java6Compat, ReadTheDocumentationException, StreamUtils } +import akka.http.impl.util.StreamUtils import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Host import akka.http.scaladsl.model.ws.{ WebsocketUpgradeResponse, WebsocketRequest, Message } @@ -770,12 +770,7 @@ trait DefaultSSLContextCreation { defaultParams.setCipherSuites(cipherSuites) // hostname! - if (!Java6Compat.trySetEndpointIdentificationAlgorithm(defaultParams, "https")) { - log.info("Unable to use JDK built-in hostname verification, please consider upgrading your Java runtime to " + - "a more up to date version (JDK7+). Using Typesafe ssl-config hostname verification.") - // enabling the JDK7+ solution did not work, however this is fine since we do handle hostname - // verification directly in SslTlsCipherActor manually by applying an ssl-config provider verifier - } + defaultParams.setEndpointIdentificationAlgorithm("https") HttpsContext(sslContext, sslParameters = Some(defaultParams)) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index 7df3a213c6..99e2d92e2b 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -10,7 +10,6 @@ import java.security.MessageDigest import java.util import javax.net.ssl.SSLSession -import akka.event.Logging import akka.stream.io.ScalaSessionAPI import scala.reflect.ClassTag @@ -151,7 +150,7 @@ sealed abstract case class Expect private () extends ModeledHeader { // http://tools.ietf.org/html/rfc7230#section-5.4 object Host extends ModeledCompanion[Host] { def apply(authority: Uri.Authority): Host = apply(authority.host, authority.port) - def apply(address: InetSocketAddress): Host = apply(address.getHostStringJava6Compatible, address.getPort) + def apply(address: InetSocketAddress): Host = apply(address.getHostString, address.getPort) def apply(host: String): Host = apply(host, 0) def apply(host: String, port: Int): Host = apply(Uri.Host(host), port) val empty = Host("") diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala index 0948a75c05..a6d066dc4a 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala @@ -69,19 +69,14 @@ class TlsEndpointVerificationSpec extends AkkaSpec(""" val ex = intercept[Exception] { Http().singleRequest(req).futureValue } - if (Java6Compat.isJava6) { - // our manual verification - ex.getMessage should include("Hostname verification failed") - } else { - // JDK built-in verification - val expectedMsg = "No subject alternative DNS name matching www.howsmyssl.com found" + // JDK built-in verification + val expectedMsg = "No subject alternative DNS name matching www.howsmyssl.com found" - var e: Throwable = ex - while (e.getCause != null) e = e.getCause + var e: Throwable = ex + while (e.getCause != null) e = e.getCause - info("TLS failure cause: " + e.getMessage) - e.getMessage should include(expectedMsg) - } + info("TLS failure cause: " + e.getMessage) + e.getMessage should include(expectedMsg) } "pass hostname verification on https://www.playframework.com/" in { diff --git a/akka-http-core/src/test/scala/akka/http/impl/util/EnhancedInetSocketAddressSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/util/EnhancedInetSocketAddressSpec.scala deleted file mode 100644 index 16d0fbc6b3..0000000000 --- a/akka-http-core/src/test/scala/akka/http/impl/util/EnhancedInetSocketAddressSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.http.impl.util - -import java.net.{ InetAddress, InetSocketAddress } - -import org.scalatest.{ Matchers, WordSpec } - -class EnhancedInetSocketAddressSpec extends WordSpec with Matchers { - "getHostStringJava6Compatible" should { - "return IPv4 address if InetSocketAddress was created with the address" in { - val addr = likelyReverseResolvableAddress - val socketAddress = new InetSocketAddress(addr, 80) - socketAddress.getHostStringJava6Compatible shouldEqual addr.getHostAddress - } - "return host name if InetSocketAddress was created with host name" in { - val address = new InetSocketAddress("github.com", 80) - address.getHostStringJava6Compatible shouldEqual "github.com" - } - } - - /** - * Returns an InetAddress that can likely be reverse looked up, so that - * getHostName returns a DNS address and not the IP. Unfortunately, we - * cannot be sure that a host name was already cached somewhere in which - * case getHostString may still return a host name even without doing - * a reverse lookup at this time. If this start to fail non-deterministically, - * it may be decided that this test needs to be disabled. - */ - def likelyReverseResolvableAddress: InetAddress = - InetAddress.getByAddress(InetAddress.getByName("google.com").getAddress) -} diff --git a/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala b/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala index 82750ce9f1..7211b79605 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala @@ -43,7 +43,7 @@ object ExampleHttpContexts { context.init(null, certManagerFactory.getTrustManagers, new SecureRandom) val params = new SSLParameters() - Java6Compat.trySetEndpointIdentificationAlgorithm(params, "https") + params.setEndpointIdentificationAlgorithm("https") HttpsContext(context, sslParameters = Some(params)) } diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/values/FormFieldsTest.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/values/FormFieldsTest.java index 886a9889ff..b54bf1fbc7 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/values/FormFieldsTest.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/values/FormFieldsTest.java @@ -11,11 +11,9 @@ import akka.http.javadsl.server.RequestVal; import akka.http.javadsl.testkit.JUnitRouteTest; import akka.http.javadsl.testkit.TestRoute; import akka.japi.Option; +import akka.japi.Pair; import org.junit.Test; -import java.util.AbstractMap; -import java.util.Map; - public class FormFieldsTest extends JUnitRouteTest { static FormField stringParam = FormFields.stringValue("stringParam"); static FormField byteParam = FormFields.byteValue("byteParam"); @@ -33,20 +31,21 @@ public class FormFieldsTest extends JUnitRouteTest { static RequestVal nameWithDefault = FormFields.stringValue("nameWithDefault").withDefault("John Doe"); static RequestVal> optionalIntParam = FormFields.intValue("optionalIntParam").optional(); - private Map.Entry entry(String name, String value) { - return new AbstractMap.SimpleImmutableEntry(name, value); + private Pair param(String name, String value) { + return Pair.create(name, value); } - private HttpRequest urlEncodedRequest(Map.Entry... entries) { + @SafeVarargs + final private HttpRequest urlEncodedRequest(Pair... params) { StringBuilder sb = new StringBuilder(); boolean next = false; - for (Map.Entry entry: entries) { + for (Pair param: params) { if (next) { sb.append('&'); - next = true; } - sb.append(entry.getKey()); + next = true; + sb.append(param.first()); sb.append('='); - sb.append(entry.getValue()); + sb.append(param.second()); } return @@ -54,7 +53,7 @@ public class FormFieldsTest extends JUnitRouteTest { .withEntity(MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toContentType(HttpCharsets.UTF_8), sb.toString()); } private HttpRequest singleParameterUrlEncodedRequest(String name, String value) { - return urlEncodedRequest(entry(name, value)); + return urlEncodedRequest(param(name, value)); } @Test diff --git a/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala b/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala index 887f8e27b2..e576499b95 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/coding/Deflate.scala @@ -4,7 +4,6 @@ package akka.http.scaladsl.coding -import java.lang.reflect.{ Method, InvocationTargetException } import java.util.zip.{ Inflater, Deflater } import akka.stream.stage._ import akka.util.{ ByteStringBuilder, ByteString } @@ -14,8 +13,6 @@ import akka.http.impl.util._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpEncodings -import scala.util.control.NonFatal - class Deflate(val messageFilter: HttpMessage ⇒ Boolean) extends Coder with StreamDecoder { val encoding = HttpEncodings.deflate def newCompressor = new DeflateCompressor @@ -47,7 +44,10 @@ class DeflateCompressor extends Compressor { deflater.setInput(input.toArray) drainDeflater(deflater, buffer) } - protected def flushWithBuffer(buffer: Array[Byte]): ByteString = DeflateCompressor.flush(deflater, buffer) + protected def flushWithBuffer(buffer: Array[Byte]): ByteString = { + val written = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH) + ByteString.fromArray(buffer, 0, written) + } protected def finishWithBuffer(buffer: Array[Byte]): ByteString = { deflater.finish() val res = drainDeflater(deflater, buffer) @@ -72,40 +72,6 @@ class DeflateCompressor extends Compressor { private[http] object DeflateCompressor { val MinBufferSize = 1024 - // TODO: remove reflective call once Java 6 support is dropped - /** - * Compatibility mode: reflectively call deflate(..., flushMode) if available or use a hack otherwise - */ - private[this] val flushImplementation: (Deflater, Array[Byte]) ⇒ ByteString = { - def flushHack(deflater: Deflater, buffer: Array[Byte]): ByteString = { - // hack: change compression mode to provoke flushing - deflater.deflate(EmptyByteArray, 0, 0) - deflater.setLevel(Deflater.NO_COMPRESSION) - val res1 = drainDeflater(deflater, buffer) - deflater.setLevel(Deflater.BEST_COMPRESSION) - val res2 = drainDeflater(deflater, buffer) - res1 ++ res2 - } - def reflectiveDeflateWithSyncMode(method: Method, syncFlushConstant: Int)(deflater: Deflater, buffer: Array[Byte]): ByteString = - try { - val written = method.invoke(deflater, buffer, 0: java.lang.Integer, buffer.length: java.lang.Integer, syncFlushConstant: java.lang.Integer).asInstanceOf[Int] - ByteString.fromArray(buffer, 0, written) - } catch { - case t: InvocationTargetException ⇒ throw t.getTargetException - } - - try { - val deflateWithFlush = classOf[Deflater].getMethod("deflate", classOf[Array[Byte]], classOf[Int], classOf[Int], classOf[Int]) - require(deflateWithFlush.getReturnType == classOf[Int]) - val flushModeSync = classOf[Deflater].getField("SYNC_FLUSH").get(null).asInstanceOf[Int] - reflectiveDeflateWithSyncMode(deflateWithFlush, flushModeSync) - } catch { - case NonFatal(e) ⇒ flushHack - } - } - - def flush(deflater: Deflater, buffer: Array[Byte]): ByteString = flushImplementation(deflater, buffer) - @tailrec def drainDeflater(deflater: Deflater, buffer: Array[Byte], result: ByteStringBuilder = new ByteStringBuilder()): ByteString = { val len = deflater.deflate(buffer) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 2257c64c7e..1096aa715e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -240,7 +240,7 @@ private[transport] object NettyTransport { def addressFromSocketAddress(addr: SocketAddress, schemeIdentifier: String, systemName: String, hostName: Option[String], port: Option[Int]): Option[Address] = addr match { case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName, - hostName.getOrElse(sa.getAddress.getHostAddress), port.getOrElse(sa.getPort))) // perhaps use getHostString in jdk 1.7 + hostName.getOrElse(sa.getHostString), port.getOrElse(sa.getPort))) case _ ⇒ None } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index d1f5111496..8fece86294 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -183,7 +183,7 @@ object TestSubscriber { trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent final case class OnNext[I](element: I) extends SubscriberEvent - final case object OnComplete extends SubscriberEvent + case object OnComplete extends SubscriberEvent final case class OnError(cause: Throwable) extends SubscriberEvent { override def toString: String = { val str = new StringWriter @@ -397,7 +397,7 @@ object TestSubscriber { * See also [[#expectSubscriptionAndComplete(Throwable, Boolean)]] if no demand should be signalled. */ def expectSubscriptionAndError(cause: Throwable): Self = - expectSubscriptionAndError(cause, true) + expectSubscriptionAndError(cause, signalDemand = true) /** * Fluent DSL @@ -555,8 +555,7 @@ object TestSubscriber { @tailrec def drain(): immutable.Seq[I] = self.expectEvent(deadline.timeLeft) match { case OnError(ex) ⇒ - // TODO once on JDK7+ this could be made an AssertionError, since it can carry ex in its cause param - throw new AssertionError(s"toStrict received OnError(${ex.getMessage}) while draining stream! Accumulated elements: ${b.result()}") + throw new AssertionError(s"toStrict received OnError while draining stream! Accumulated elements: ${b.result()}", ex) case OnComplete ⇒ b.result() case OnNext(i: I @unchecked) ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 8d7e2bfcb0..e7f697600b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -9,10 +9,9 @@ import akka.actor.ActorSystem import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children -import akka.stream.scaladsl.{ FileIO, Sink, Source } +import akka.stream.scaladsl.{ FileIO, Source } import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.testkit.StreamTestKit import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.ActorAttributes diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 993d73c9a5..9487456e91 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl.io -import java.io.{ File, RandomAccessFile } +import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel @@ -26,8 +26,9 @@ private[akka] object FilePublisher { .withDeploy(Deploy.local) } - private final case object Continue extends DeadLetterSuppression + private case object Continue extends DeadLetterSuppression + val Read = java.util.Collections.singleton(java.nio.file.StandardOpenOption.READ) } /** INTERNAL API */ @@ -41,13 +42,11 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long] var readBytesTotal = 0L var availableChunks: Vector[ByteString] = Vector.empty // TODO possibly resign read-ahead-ing and make fusable as Stage - private var raf: RandomAccessFile = _ private var chan: FileChannel = _ override def preStart() = { try { - raf = new RandomAccessFile(f, "r") // best way to express this in JDK6, OpenOption are available since JDK7 - chan = raf.getChannel + chan = FileChannel.open(f.toPath, FilePublisher.Read) } catch { case ex: Exception ⇒ onErrorThenStop(ex) @@ -80,7 +79,7 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long] } /** BLOCKING I/O READ */ - @tailrec final def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] = + @tailrec def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] = if (chunks.size <= maxChunks && isActive) { (try chan.read(buf) catch { case NonFatal(ex) ⇒ onErrorThenStop(ex); Int.MinValue }) match { case -1 ⇒ // EOF @@ -98,13 +97,12 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long] } } else chunks - private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue + private def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue override def postStop(): Unit = { super.postStop() bytesReadPromise.trySuccess(readBytesTotal) - try if (chan ne null) chan.close() - finally if (raf ne null) raf.close() + if (chan ne null) chan.close() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index b282e524e3..807ffcde3a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -3,8 +3,9 @@ */ package akka.stream.impl.io -import java.io.{ File, RandomAccessFile } +import java.io.File import java.nio.channels.FileChannel +import java.util.Collections import akka.actor.{ Deploy, ActorLogging, Props } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } @@ -19,6 +20,9 @@ private[akka] object FileSubscriber { Props(classOf[FileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local) } + import java.nio.file.StandardOpenOption._ + val Write = Collections.singleton(WRITE) + val Append = Collections.singleton(APPEND) } /** INTERNAL API */ @@ -28,17 +32,13 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize) - private var raf: RandomAccessFile = _ private var chan: FileChannel = _ private var bytesWritten: Long = 0 override def preStart(): Unit = try { - raf = new RandomAccessFile(f, "rw") // best way to express this in JDK6, OpenOption are available since JDK7 - chan = raf.getChannel - - // manually supporting appending to files - in Java 7 we could use OpenModes: FileChannel.open(f, openOptions.asJava) - if (append) chan.position(chan.size()) + val openOptions = if (append) FileSubscriber.Append else FileSubscriber.Write + chan = FileChannel.open(f.toPath, openOptions) super.preStart() } catch { @@ -75,7 +75,6 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bytesWrittenPromise.trySuccess(bytesWritten) if (chan ne null) chan.close() - if (raf ne null) raf.close() super.postStop() } }