diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java index c795c37d63..6f540eb25b 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java @@ -7,7 +7,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import akka.NotUsed; -import akka.stream.io.Framing; +import akka.stream.javadsl.Framing; import docs.AbstractJavaTest; import docs.stream.SilenceSystemOut; import java.net.InetSocketAddress; diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java index f97bec000c..48fd0787b9 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java @@ -7,7 +7,7 @@ import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.Materializer; -import akka.stream.io.Framing; +import akka.stream.javadsl.Framing; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.testkit.JavaTestKit; diff --git a/akka-docs/rst/java/stream/stream-io.rst b/akka-docs/rst/java/stream/stream-io.rst index d3fb4fd808..984ad7dd92 100644 --- a/akka-docs/rst/java/stream/stream-io.rst +++ b/akka-docs/rst/java/stream/stream-io.rst @@ -24,7 +24,7 @@ which will emit an :class:`IncomingConnection` element for each new connection t Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``delimiter`` -helper Flow from ``akka.stream.io.Framing`` to chunk the inputs up into actual lines of text. The last boolean +helper Flow from ``akka.stream.javadsl.Framing`` to chunk the inputs up into actual lines of text. The last boolean argument indicates that we require an explicit line ending even for the last message before the connection is closed. In this example we simply add exclamation marks to each incoming text message and push it through the flow: diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala index 1a7d938196..6dfe654948 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala @@ -4,7 +4,7 @@ package docs.http.scaladsl.server.directives import akka.http.scaladsl.model._ -import akka.stream.io.Framing +import akka.stream.scaladsl.Framing import akka.util.ByteString import docs.http.scaladsl.server.RoutingSpec import scala.concurrent.Future diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala index 209553fc92..b20ae9c408 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala @@ -21,7 +21,7 @@ class RecipeParseLines extends RecipeSpec { ByteString("\r\n\r\n"))) //#parse-lines - import akka.stream.io.Framing + import akka.stream.scaladsl.Framing val linesStream = rawData.via(Framing.delimiter( ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true)) .map(_.utf8String) diff --git a/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index d861fe26ba..cb8c8bd62b 100644 --- a/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -39,7 +39,7 @@ class StreamTcpDocSpec extends AkkaSpec { { val (host, port) = TestUtils.temporaryServerHostnameAndPort() //#echo-server-simple-handle - import akka.stream.io.Framing + import akka.stream.scaladsl.Framing val connections: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind(host, port) @@ -66,7 +66,7 @@ class StreamTcpDocSpec extends AkkaSpec { val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 val serverProbe = TestProbe() - import akka.stream.io.Framing + import akka.stream.scaladsl.Framing //#welcome-banner-chat-server connections.runForeach { connection => @@ -97,7 +97,7 @@ class StreamTcpDocSpec extends AkkaSpec { } //#welcome-banner-chat-server - import akka.stream.io.Framing + import akka.stream.scaladsl.Framing val input = new AtomicReference("Hello world" :: "What a lovely day" :: Nil) def readLine(prompt: String): String = { diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 996d179ea2..7d0a493f05 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -189,3 +189,10 @@ Replace with:: http.cachedHostConnectionPool(toHostHttps("akka.io", 8081), materializer()); http.cachedHostConnectionPool(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext), materializer()); + +Framing moved to akka.stream.[javadsl/scaladsl] +----------------------------------------------- + +The ``Framing`` object which can be used to chunk up ``ByteString`` streams into +framing dependent chunks (such as lines) has moved to ``akka.stream.scaladsl.Framing``, +and has gotten a Java DSL equivalent type in ``akka.stream.javadsl.Framing``. diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala index 18e7f90380..3bda28bc7c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala @@ -1,6 +1,6 @@ /** - * Copyright (C) 2009-2016 Typesafe Inc. - */ + * Copyright (C) 2009-2016 Typesafe Inc. + */ package akka.persistence diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FramingTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FramingTest.java new file mode 100644 index 0000000000..4322a2a746 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FramingTest.java @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2015-2016 Typesafe Inc. + */ +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.stream.StreamTest; +import akka.stream.testkit.AkkaSpec; +import akka.util.ByteString; +import org.junit.ClassRule; +import org.junit.Test; + +public class FramingTest extends StreamTest { + public FramingTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("FramingTest", AkkaSpec.testConf()); + + @Test + public void mustBeAbleToUseFraming() throws Exception { + final Source in = Source.single(ByteString.fromString("1,3,4,5")); + in.via(Framing.delimiter(ByteString.fromString(","), Integer.MAX_VALUE, FramingTruncation.ALLOW)) + .runWith(Sink.ignore(), materializer); + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala similarity index 97% rename from akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala index 57e66385c8..aa88c260a8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala @@ -1,15 +1,14 @@ /** * Copyright (C) 2014-2016 Typesafe Inc. */ -package akka.stream.io +package akka.stream.scaladsl import java.nio.ByteOrder -import akka.stream.io.Framing.FramingException -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.scaladsl._ -import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushPullStage } +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective } import akka.stream.testkit.AkkaSpec +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.util.{ ByteString, ByteStringBuilder } import scala.collection.immutable diff --git a/akka-stream/src/main/java/akka/stream/javadsl/FramingTruncation.java b/akka-stream/src/main/java/akka/stream/javadsl/FramingTruncation.java new file mode 100644 index 0000000000..04702d1993 --- /dev/null +++ b/akka-stream/src/main/java/akka/stream/javadsl/FramingTruncation.java @@ -0,0 +1,6 @@ +package akka.stream.javadsl; + +/** Determines mode in which [[Framing]] operates. */ +public enum FramingTruncation { + ALLOW, DISALLOW +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala new file mode 100644 index 0000000000..4720927e5b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2015-2016 Typesafe Inc. + */ +package akka.stream.javadsl + +import java.nio.ByteOrder + +import akka.NotUsed +import akka.stream.scaladsl +import akka.stream.stage._ +import akka.util.ByteString + +object Framing { + + /** + * Creates a Flow that handles decoding a stream of unstructured byte chunks into a stream of frames where the + * incoming chunk stream uses a specific byte-sequence to mark frame boundaries. + * + * The decoded frames will not include the separator sequence. + * + * If there are buffered bytes (an incomplete frame) when the input stream finishes and ''allowTruncation'' is set to + * false then this Flow will fail the stream reporting a truncated frame. + * + * Default truncation behaviour is: when the last frame being decoded contains no valid delimiter this Flow + * fails the stream instead of returning a truncated frame. + * + * @param delimiter The byte sequence to be treated as the end of the frame. + * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is + * exceeded this Flow will fail the stream. + */ + def delimiter(delimiter: ByteString, maximumFrameLength: Int): Flow[ByteString, ByteString, NotUsed] = { + scaladsl.Framing.delimiter(delimiter, maximumFrameLength).asJava + } + + /** + * Creates a Flow that handles decoding a stream of unstructured byte chunks into a stream of frames where the + * incoming chunk stream uses a specific byte-sequence to mark frame boundaries. + * + * The decoded frames will not include the separator sequence. + * + * If there are buffered bytes (an incomplete frame) when the input stream finishes and ''allowTruncation'' is set to + * false then this Flow will fail the stream reporting a truncated frame. + * + * @param delimiter The byte sequence to be treated as the end of the frame. + * @param allowTruncation If set to `DISALLOW`, then when the last frame being decoded contains no valid delimiter this Flow + * fails the stream instead of returning a truncated frame. + * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is + * exceeded this Flow will fail the stream. + */ + def delimiter(delimiter: ByteString, maximumFrameLength: Int, allowTruncation: FramingTruncation): Flow[ByteString, ByteString, NotUsed] = { + val truncationAllowed = allowTruncation == FramingTruncation.ALLOW + scaladsl.Framing.delimiter(delimiter, maximumFrameLength, truncationAllowed).asJava + } + + /** + * Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that + * incoming frames have a field that encodes their length. + * + * If the input stream finishes before the last frame has been fully decoded this Flow will fail the stream reporting + * a truncated frame. + * + * The byte order used for when decoding the field defaults to little-endian. + * + * @param fieldLength The length of the "size" field in bytes + * @param fieldOffset The offset of the field from the beginning of the frame in bytes + * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is exceeded + * this Flow will fail the stream. This length *includes* the header (i.e the offset and + * the length of the size field) + */ + def lengthField(fieldLength: Int, + fieldOffset: Int, + maximumFrameLength: Int): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Framing.lengthField(fieldLength, fieldOffset, maximumFrameLength).asJava + + /** + * Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that + * incoming frames have a field that encodes their length. + * + * If the input stream finishes before the last frame has been fully decoded this Flow will fail the stream reporting + * a truncated frame. + * + * @param fieldLength The length of the "size" field in bytes + * @param fieldOffset The offset of the field from the beginning of the frame in bytes + * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is exceeded + * this Flow will fail the stream. This length *includes* the header (i.e the offset and + * the length of the size field) + * @param byteOrder The ''ByteOrder'' to be used when decoding the field + */ + def lengthField(fieldLength: Int, + fieldOffset: Int, + maximumFrameLength: Int, + byteOrder: ByteOrder): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Framing.lengthField(fieldLength, fieldOffset, maximumFrameLength, byteOrder).asJava + + /** + * Returns a BidiFlow that implements a simple framing protocol. This is a convenience wrapper over [[Framing#lengthField]] + * and simply attaches a length field header of four bytes (using big endian encoding) to outgoing messages, and decodes + * such messages in the inbound direction. The decoded messages do not contain the header. + * + * This BidiFlow is useful if a simple message framing protocol is needed (for example when TCP is used to send + * individual messages) but no compatibility with existing protocols is necessary. + * + * The encoded frames have the layout + * {{{ + * [4 bytes length field, Big Endian][User Payload] + * }}} + * The length field encodes the length of the user payload excluding the header itself. + * + * @param maximumMessageLength Maximum length of allowed messages. If sent or received messages exceed the configured + * limit this BidiFlow will fail the stream. The header attached by this BidiFlow are not + * included in this limit. + */ + def simpleFramingProtocol(maximumMessageLength: Int): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = + scaladsl.Framing.simpleFramingProtocol(maximumMessageLength).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 1ab202d56a..fd705e4c72 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -13,6 +13,8 @@ import scala.concurrent.duration.FiniteDuration final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { override def shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] + def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(this) + /** * Add the given BidiFlow as the next step in a bidirectional transformation * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom diff --git a/akka-stream/src/main/scala/akka/stream/io/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala similarity index 96% rename from akka-stream/src/main/scala/akka/stream/io/Framing.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index bcdccdac41..e506d2ac9d 100644 --- a/akka-stream/src/main/scala/akka/stream/io/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -1,14 +1,13 @@ /** - * Copyright (C) 2014-2016 Typesafe Inc. + * Copyright (C) 2015-2016 Typesafe Inc. */ -package akka.stream.io +package akka.stream.scaladsl import java.nio.ByteOrder import akka.NotUsed -import akka.stream.scaladsl.{ Keep, BidiFlow, Flow } import akka.stream.stage._ -import akka.util.{ ByteIterator, ByteStringBuilder, ByteString } +import akka.util.{ ByteIterator, ByteString } import scala.annotation.tailrec @@ -24,11 +23,10 @@ object Framing { * false then this Flow will fail the stream reporting a truncated frame. * * @param delimiter The byte sequence to be treated as the end of the frame. - * @param allowTruncation If turned on, then when the last frame being decoded contains no valid delimiter this Flow + * @param allowTruncation If `false`, then when the last frame being decoded contains no valid delimiter this Flow * fails the stream instead of returning a truncated frame. * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is * exceeded this Flow will fail the stream. - * @return */ def delimiter(delimiter: ByteString, maximumFrameLength: Int, allowTruncation: Boolean = false): Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].transform(() ⇒ new DelimiterFramingStage(delimiter, maximumFrameLength, allowTruncation)) @@ -47,7 +45,6 @@ object Framing { * this Flow will fail the stream. This length *includes* the header (i.e the offset and * the length of the size field) * @param byteOrder The ''ByteOrder'' to be used when decoding the field - * @return */ def lengthField(fieldLength: Int, fieldOffset: Int = 0, @@ -75,7 +72,6 @@ object Framing { * @param maximumMessageLength Maximum length of allowed messages. If sent or received messages exceed the configured * limit this BidiFlow will fail the stream. The header attached by this BidiFlow are not * included in this limit. - * @return */ def simpleFramingProtocol(maximumMessageLength: Int): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { val decoder = lengthField(4, 0, maximumMessageLength + 4, ByteOrder.BIG_ENDIAN).map(_.drop(4))