Effective ATS:
Streamization and Stream-Processing in Parallel

ATS is a feature-rich language, and dependent types, linear types and embeddable templates can be seen as three of its most prominent features. In this article, I intend to present a template-based approach to stream-processing in parallel.

Streams and Linear Streams

In ATS, a stream refers to a lazy-list (in contrast to a standard list consisting of nodes for storing elements) and a linear stream to a linear lazy-list. Internally, both streams and linear streams are represented as closure-functions. The fundmental difference between a stream, which is non-linear, and a linear stream is that the former caches each element computed during its evaluation (so that the element is available for use subsequently) while the latter does not. As an example, the following code creates a stream of integers from 0 to 999, inclusive, and then computes the length of the created stream:

//
val xs =
intrange_stream
  (0, 1000) where
{
//
fun
intrange_stream
  (m: int, n: int): stream(int) =
  $delay
  (
  if
  (m >= n)
  then stream_nil()
  else stream_cons(m, intrange_stream(m+1, n))
  )
//
} (* end of [where] *) // end of [val]
//
val nxs = stream_length(xs) // nxs = 1000
//
After the call to stream_length returns, the stream that xs refers to consists of 1000 nodes for storing all of the integers between 0 and 999. As a comparison, the following code creates a linear stream of integers between 0 and 999, inclusive, and then computes the length of the created stream:
//
val ys =
intrange_stream_vt
  (0, 1000) where
{
//
fun
intrange_stream_vt
  (m: int, n: int): stream_vt(int) =
  $ldelay
  (
  if
  (m >= n)
  then stream_vt_nil()
  else stream_vt_cons(m, intrange_stream_vt(m+1, n)) 
  )
//
} (* end of [val] *)
//
val nys = stream_vt_length(ys) // nys = 1000
//
After the call to stream_vt_length returns, the stream that ys refers to is consumed (and thus no longer available for any subsequent use).

When compared to non-linear streams, linear streams are significantly easier to implement and also of greater efficiency both time-wise and memory-wise. A linear type system of some kind is needed in order to assign types to linear streams, which is certainly non-trivial. As far as I can tell, ATS is currently the only functional programming language that supports programming with linear streams. In the following presentation, I shall primarily focus on stream-processing based on linear streams.

Streamization of Data Containers

The phrase data container loosely refers to a collection of data of some kind. For instance, a string can be considered as a container of the characters in it; a file name (represented as a string) can be considered as a container of the characters in it, or a container of the words in it, or a container of the lines in it; a directory name can be considered as a collection of the file names in it; etc. As an example, the following function mystring_length computes the length of a given string:

//
fun
mystring_length
  (cs: string): int =
  stream_vt_length(streamize_string_char(cs))
//
Note that the function streamize_string_char turns a given string into a linear stream of chars (of the type stream_vt(char)). Of course, the length of a string can be computed more efficiently. What is remarkable here is that the linear stream returned by streamize_string_char is represented as a closure-function, which takes only a few bytes to store, and all of the (heap) memory needed by stream_vt_length during its evaluation is just for one node (that stores the first element of the linear stream passed as the argument). In other words, the memory footprint of mystring_length can be truly considered minimal. As a simple comparison, please note that the memory needed for turing a string into a list is proportional to the length of the string as each char in the string needs to be stored in one node in the list to be created.

Streamization makes it very convenient to employ various stream-based combinators to process data. For instance, the following function computes the number of chars in the file of a given name:

//
fun
myfilename_nchar
  (fname: string): int = let
//
val opt =
streamize_filename_char(fname)
//
in
//
case+ opt of
| ~Some_vt(cs) =>
  (
    stream_vt_length(cs)
  )
| ~None_vt((*void*)) =>
  (
    prerrln!
    ("ERROR: Cannot open the file: [", fname, "]");
    exit(1) // abnormal exit
  )
//
end // end of [myfilename_nchar]
//
If we need to count the number of lines in the file of a given name, the following function can be called:
//
fun
myfilename_nline
  (fname: string): int = let
//
val opt =
streamize_filename_char(fname)
//
in
//
case+ opt of
| ~Some_vt(cs) =>
  (
    stream_vt_length
    ((cs).filter()(lam c => c = '\n'))
  )
| ~None_vt((*void*)) =>
  (
    prerrln!
    ("ERROR: Cannot open the file: [", fname, "]");
    exit(1) // abnormal exit
  )
//
end // end of [myfilename_nline]
//

In practice, streamization can also provide a highly effective approach to avoiding stack-overflow due to deeply nested recursive calls. Let us see a concrete example. The following implementation of list0_append is given in functional style:

//
fun
{a:t@ype}
list0_append
(
xs: list0(a)
,
ys: list0(a)
) : list0(a) =
auxmain(xs, ys) where
{
  fun
  auxmain
  ( xs: list0(a)
  , ys: list0(a)): list0(a) =
  (
  case+ xs of
  | list0_nil() => ys
  | list0_cons(x0, xs) => list0_cons(x0, auxmain(xs, ys))
  )
} (* end of [list0_append] *)
//
If the first argument of list0_append is a long list (e.g., one containing 1M elements), then evaluating list0_append may result in stack-overflow as the inner function auxmain recursing on its first argument is not tail-recursive. On the other hand, the following function mylist0_append does not run the risk of stack-overflow:
//
fun
{a:t@ype}
mylist0_append
(
xs: list0(a)
,
ys: list0(a)
) : list0(a) =
un_streamize_list0_elt
  (stream_vt_append(streamize_list0_elt(xs), streamize_list0_elt(ys)))
//
Given a list of elements, streamize_list0_elt turns it into a linear stream of the same elements (enumerated in the same order). Due to the very nature of lazy-evaluation, the function stream_vt_append for concatenating two given linear streams returns in O(1)-time. The function un_streamize_list0_elt for turning a linear stream into the corresponding list is given a special tail-recursive implementation. Therefore, there is no danger of stack-overflow when mylist0_append is evaluated regardless of the length of its first argument.

Stream-Processing in Parallel

Search is a common theme in programming. We may see search as a process of two phases: Streamization is performed in the first phase to construct a (linear) stream that enumerates lazily the data to be searched while the actual search is done in the second phase over the (linear) stream constructed in the first phase. In pseudo code, we can write something as follows:

//
val nxs = streamize(SomeData) // Phase 1
val nxs = stream_vt_filter_cloptr(nxs, lam(nx) => mycheck(nx)) // Phase 2
//
where the function mycheck checks whether an element in the stream (constructed in the first phase) is a valid solution. Please find in QueenPuzzle.dats a concrete example of this kind, where a package for streamizing graphs is used to construct a linear stream of chessboard configurations such that each configuration depicts a unique way in which 8 queen pieces are positioned but none can attach any other ones.

The aforementioned second phase can clearly be done in parallel. For instance, we can have two threads running on two cores concurrently to check the elements in the constructed stream, and a list is returned at the end that consists of all of the elements passing the check done by mycheck.

In the package StreamPar, the following higher-order function is available for processing a given (linear) stream in parallel:

//
fun
{a:vt@ype}
{b:vt@ype}
{r:vt@ype}
streampar_mapfold_cloref
( fws: fworkshop
, xs0: stream_vt(INV(a))
, res: r, map: cfun(a, b), fold: cfun(b, r, r)): (r)
//
The name of the function suggests that it is functionally equivalent to applying a map and then a fold to a given linear stream. It essentially requests that the workers (threads) attached to its first argument (a workshop) apply the fourth argument (a closure-function) to each element in the second argument (a linear stream) to obtain a value and then apply the fifth argument (another closure-function) to the third argument (used as an accumulator) and the obtained value to generate a new version of the accumulator.

Let us now have a stream-binge! Suppose that we want to print out all of the lines in the files contained in a given directory that match a given regular expression. The essential code for doing this is written as follows:

//
vtypedef a = string
vtypedef b = stream_vt(string)
vtypedef r = int // for [void]
//
val res =  
$StreamPar.streampar_mapfold_cloref<a><b><r>
(
  fws
, dir2fnames(dname), 0
, lam(x) => fname2lines(regex, x)
, lam(y, r) => let val () = stream_print_free(y) in r end
)
where the interface for the functions dir2fnames and fname2lines is given below:
fun
dir2fnames(dname: string): stream_vt(string)
fun
fname2lines
(regex: string, fname: string): stream_vt(string)
Given the name of a directory, dir2fnames returns a linear stream of all the files contained in the directory or any of its subdirectories. Given a regular expression and the name of a file, fname2lines returns a stream of lines in the file that match the regular expression. In addition, the function stream_print_free prints out each line in a given stream while freeing the stream simultaneously. Please find the code in StreamPar_binge.dats that implements these functions. Note that a function of the name stream_by_command is called to implement dir2fnames. Essentially, stream_by_command turns the output of executing a given command-line into a linear stream of chars. For instance, the names of all the files contained in a given directory can be obtained by executing the command-line find [dname] -type f, where [dname] refers to the name of the directory.

With streampar_mapfold_cloref, the above code can readily use 2 or more threads to process the stream of file names returned by dir2fnames. However, the code as is written can not really take advantage of threads running on multicores because calling fname2lines immediately returns a stream (represented as a closure-function) with no work actually being done to find those lines matching the given regular expression. By calling the function stream2list_vt, we can force the evaluation of the stream returned by fname2lines into a list:

//
vtypedef a = string
vtypedef b = list_vt(string)
vtypedef r = int // for [void]
//
val res =  
$StreamPar.streampar_mapfold_cloref<a><b><r>
(
  fws
, dir2fnames(dname), 0
, lam(x) => stream2list_vt(fname2lines(regex, x))
, lam(y, r) => let val () = list_vt_print_free(y) in r end
)
Note that list_vt_print_free is like stream_print_free except for taking a linear list of lines as its argument. In my own crude experiment, I observed about 25% reduction of real time (that is, wall clock time) when running two threads on two cores versus running only one.

Code for Compiling and Testing

Please find in the following files the entirety of the code presented in this article:

QueenPuzzle.dats // graph streamization
StreamPar_misc.dats // code for illustration
StreamPar_binge.dats // code for stream-binge
There is also an accompanying Makefile that can be used for compiling and testing the code (via the make utility).


This article is written by Hongwei Xi.