In a previous blog post, a simple text mining application was developed using Akka Streams. In that application, the graph-building create() DSL method was used to build the stream topology that consists of the routing logic of the various stream components.
This time, we’re going to try something a little different. Rather than using the create() graph-builder, we’ll directly use some fan-out/fan-in functions to process stream data. In particular, we’ll process a messaging stream and dynamically demultiplex the stream into substreams based on the group the messages belong to.
For illustration purpose, the messaging element in the stream source is modeled as a case class with group and content info. The substreams dynamically split by message group will go into individual file sinks, each of which is wrapped along with the file name in another case class.
val groups = List("a", "b", "c") case class Message(group: String, content: String) case class FSink(name: String, sink: Sink[ByteString, Future[IOResult]])
Next, we create a map with message group as key and the corresponding sink (which has the actual file path) as value:
val fileSinks = groups.map{ g => val sink = FSink( s"file-${g}.txt", FileIO.toPath(Paths.get(s"/path/to/file-${g}.txt")) ) (g, sink) }.toMap
Demultiplexing via Akka Streams groupBy()
We then use Akka Streams groupBy() to split the stream by message group into substreams:
val messageSource: Source[Message, NotUsed] = Source(List( // ... )) messageSource.map(m => (m.group, m)). groupBy(groups.size, _._1). fold(("", List.empty[Message])) { case ((_, list), (g, m)) => (g, m :: list) }. mapAsync(parallelism = groups.size) { case (g: String, list: List[Message]) => Source(list.reverse).map(_.content).map(ByteString(_)). runWith(fileSinks(g).sink) }. mergeSubstreams. runWith(Sink.ignore)
Note that after applying groupBy() (see method signature), the split substreams are processed in parallel for each message group using mapAsync() which transforms the stream by applying a specified function to each of the elements. Since our goal is to create the individual files, the final ‘mergeSubstreams’ is just for combining the substreams to be executed with an unused sink.
Putting all the above pieces altogether:
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.util.ByteString import akka.{NotUsed, Done} import akka.stream.IOResult import scala.concurrent.Future import java.nio.file.Paths implicit val system = ActorSystem("sys") implicit val materializer = ActorMaterializer() import system.dispatcher val groups = List("a", "b", "c") case class Message(group: String, content: String) case class FSink(name: String, sink: Sink[ByteString, Future[IOResult]]) val messageSource: Source[Message, NotUsed] = Source(List( Message("a", "Got an apple.\n"), Message("a", "A couple of avocados.\n"), Message("a", "Here's a dozen of apricots.\n"), Message("b", "Brought a banana.\n"), Message("b", "A lot of black-eyed peas.\n"), Message("b", "Here's some broccoli.\n"), Message("c", "Lots of cherries.\n"), Message("c", "A big pile of chilli peppers over here.\n") )) val fileSinks = groups.map{ g => val sink = FSink( s"file-${g}.txt", FileIO.toPath(Paths.get(s"/path/to/file-${g}.txt")) ) (g, sink) }.toMap messageSource.map(m => (m.group, m)). groupBy(groups.size, _._1). fold(("", List.empty[Message])) { case ((_, list), (g, m)) => (g, m :: list) }. mapAsync(parallelism = groups.size) { case (g: String, list: List[Message]) => Source(list.reverse).map(_.content).map(ByteString(_)). runWith(fileSinks(g).sink) }. mergeSubstreams. runWith(Sink.ignore)
Merging stream sources with flatMapConcat
Conversely, given the split files, we can merge them back into a single stream using flatMapConcat. The word ‘merge’ is being used loosely here, as flatMapConcat (see method signature) actually consumes the individual sources one after another by flattening the source stream elements using concatenation.
val files = groups.map(g => s"/path/to/file-${g}.txt") val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f))) val sink: Sink[ByteString, Future[IOResult]] = FileIO.toPath( Paths.get("/path/to/file-merged.txt") ) source.runWith(sink)
In case the files are large and processing them in measured chunks for memory reservation is preferred:
val chunking = Framing.delimiter( ByteString("\n"), maximumFrameLength = 10240, allowTruncation = true ) source.via(chunking).runWith(sink)
Line-feed (i.e. “\n”) is being used as the delimiter for each frame here, but it can be set to anything else.