Chapter 16. Lazy Stream-Processing

It could simply be put that the real fun part of functional programming starts with streams.

A stream is like a list but it is lazy in the sense that the elements in a stream are not required to be made available at the moment when the stream is formed. Instead, each element only needs to be produced at the point where the element is actually needed for evaluation. Given a type T, the type stream(T) is for a value representing a stream of elements of the type T. Let us use the name stream-value to refer to such a value, which is completely opaque for it does not even reveal whether the stream it represents is empty or not. Internally, a stream-value is represented as a thunk, that is, a nullary closure-function. Evaluating a stream-value (that is, calling the thunk representing it) yields a stream-con value of type stream_con(T) for some T, where stream_con is the following declared datatype:

// datatype stream_con(a:t@ype) = | steam_nil of () | stream_cons of (a, stream(a)) //

With pattern matching, we can inspect whether a stream-con value represents an empty stream or not. If the stream is non-empty, then its first element can be extracted. Formally speaking, we have the following type definition in ATS:

typedef stream(a:t@ype) = lazy(stream_con(a))

where lazy is a special type constructor. Given a type T, the type lazy(T) is essentially for a thunk that returns a value of the type T. Often such a thunk is referred to as a suspended computation of the type T, which can be resumed by simply calling the thunk.

As an example, the following function int_stream_from takes an integer n and returns a stream that enumerates ascendingly all of the integers greater than or equal to n:

// fun int_stream_from (n: int): stream(int) = $delay(stream_cons(n, int_stream_from(n+1))) //

The keyword $delay indicates the construction of a thunk based on the expression appearing as its argument.

Accessing elements in a given stream is shown in the following example:

// fun {a:t@ype} stream_get_at (xs: stream(a), n: int): a = ( case+ !xs of | stream_nil() => ( $raise StreamSubscriptExn() ) | stream_cons(x, xs) => ( if n <= 0 then x else stream_get_at<a>(xs, n-1) ) ) //

The function stream_get_at is the stream-version of list_get_at: Given a stream and a position (denoted by an integer), stream_get_at returns the element in the stream at the given position. Note that the symbol ! refers to the function lazy_force, which evaluates a given stream value into a stream-con value in this case. Like list_get_at, it is almost always a poor style of programming to make use of stream_get_at excessively (for instance, inside the body of a loop).

Let us see what is involved in the evaluation of the following two lines of code:

// val xs = int_stream_from(0) val x0 = stream_get_at<int>(xs, 1000000) //

The call to int_stream_from returns immediately as all that is essentially done is creating a thunk (that is, a nullary closure-function). The call to stream_get_at returns 1000000 after creating 1000001 nodes to store the first 1000001 elements in the stream bound to the name xs. While it takes only a tiny amount of memory to store the initial stream bound to the name xs, it requires a large amount of memory to store the expanded stream created during the evaluation of the call to stream_get_at. In general, a call to lazy_force evaluates its argument (of type lazy(T)) to a value (of type T) and then caches the value internally so that the value can be returned immediately when another call to lazy_force on the same argument evaluates later. In other words, some form of memoization happens when the evaluation of a suspended computation is performed. Memoization can be expensive memory-wise and unpredicatable time-wise. There is another kind of stream (to be presented later) that is referred to as linear stream, which involves no memoization when evaluated. As a matter of fact, the internal representation of a linear stream only needs memory for storing one node (that contains the head element of the stream), resulting in great memory-efficiency.

Basically, for each a list-processing function, there is a corresponding version of stream-processing function. Let us see some concrete examples. For instance, the following function stream_append (corresponding to list0_append) concatenates two given streams:

// extern fun {a:t@ype} stream_append (xs: stream(a), ys: stream(a)): stream(a) // implement {a}(*tmp*) stream_append (xs, ys) = $delay ( case+ !xs of | stream_nil() => !ys | stream_cons(x, xs) => stream_cons(x, stream_append<a>(xs, ys)) ) //

Please note the symbol ! in front of !ys: The expression following $delay should evaluate to a stream-con value (instead of a stream value).

Improper use of streams is abundant in practice. In particular, it is very common to see streams being treated as lists, which defeats the purpose of having streams in the first place. I would like to make a point on proper stream construction by comparing three implementations of the following function stream_make_list0 for turning a given list into a stream:

fun {a:t@ype} stream_make_list0(xs: list0(a)): stream(a)

The first implementation is given as follows:

implement {a}(*tmp*) stream_make_list0 (xs) = $delay ( case xs of | list0_nil() => stream_nil() | list0_cons(x, xs) => stream_cons(x, stream_make_list0<a>(xs)) ) (* end of [stream_make_list0] *)

This one is what I consider to be a proper implementation of stream construction. It is fully lazy in the sense that the implementation does nothing when called upon a given list0-value except for constructing a thunk (that represents the returned stream). The next implementation of stream_make_list0 is a slight variant of the previous one:

implement {a}(*tmp*) stream_make_list0 (xs) = ( case xs of | list0_nil() => $delay(stream_nil()) | list0_cons(x, xs) => $delay(stream_cons(x, stream_make_list0<a>(xs))) ) (* end of [stream_make_list0] *)

Note this one is not fully lazy as it inspects whether the given list is empty before it constructs a thunk (that represents the returned stream). The implementation of stream_make_list0 that should definitely be avoided is the following one:

implement {a}(*tmp*) stream_make_list0 (xs) = ( case xs of | list0_nil() => $delay(stream_nil()) | list0_cons(x, xs) => let val xs = stream_make_list0<a>(xs) in $delay(stream_cons(x, xs)) end // end of [list0_cons] ) (* end of [stream_make_list0] *)

where there is essentially no laziness involved in the construction of the stream being generated. This is a typical example that shows clearly what is meant by treating a stream as a list.

With streams, we can facilitate the use of (general) recursion in problem-solving by eliminating the risk of stack overflow caused by deeply nested recursive calls. In the case of stream_append, the evaluation of a call to stream_append returns immediately as all it does essentially is to form a thunk (for representing the resulting stream). On the other hand, the implementation of list0_append presented previously can potentially cause stack overflow if its argument is a long list (e.g., one consisting of 1000000 elements). By the way, the actual implementation of list0_append in ATSLIB/prelude (the prelude library of ATS) cannot cause stack overflow due to its being tail-recursive. However, there is no free lunch here as this library implementation is written in an advanced style that is somewhat difficult to adopt in practice.

The following function stream_map is the stream-version of list0_map:

// extern fun {a:t@ype} {b:t@ype} stream_map (xs: stream(a), fopr: cfun(a, b)): stream(b) // implement {a}{b} stream_map (xs, fopr) = $delay ( case+ !xs of | stream_nil() => stream_nil() | stream_cons(x, xs) => stream_cons(fopr(x), stream_map<a><b>(xs, fopr)) ) //

The following function stream_filter is the stream-version of list0_filter:

// extern fun {a:t@ype} stream_filter (xs: stream(a), test: cfun(a, bool)): stream(a) // implement {a}(*tmp*) stream_filter (xs, test) = $delay ( case+ !xs of | stream_nil() => stream_nil() | stream_cons(x, xs) => if test(x) then stream_cons (x, stream_filter<a>(xs, test)) // end of [then] else !(stream_filter<a>(xs, test)) // end of [if] ) //

A function like stream_map and stream_filter is often referred to as being fully lazy as its evaluation does nothing except for creating a thunk (to represent a suspended computation).

The following function sieve implements the sieve of Eratosthenes for enumerating prime numbers:

// fun sieve(): stream(int) = let // fun auxmain ( xs: stream(int) ) : stream(int) = $delay ( case- !xs of | stream_cons(x0, xs) => stream_cons(x0, auxmain(stream_filter(xs, lam(x) => x % x0 > 0))) ) // in auxmain(int_stream_from(2)) end // end of [sieve] //

A call to sieve returns a stream consisting of all the primes enumerated in the ascending order: 2, 3, 5, 7, 11, etc. Note that case- is used in place of case for the purpose of suppressing a warning message that would otherwise be issued due to pattern matching being non-exhaustive (as the case stream_nil() is not covered).

As a simple experiment, please try to evaluate the following code:

// val thePrimes = sieve() val () = println! ("stream_get_at(thePrimes, 5000):") val () = println! (stream_get_at<int>(thePrimes, 5000)) val () = println! ("stream_get_at(thePrimes, 5000):") val () = println! (stream_get_at<int>(thePrimes, 5000)) //

Note that the number 48619 is to be printed out twice. There is a clear pause before it is done for the first time, but there is virtually no delay between the first time and the second time, showing clearly the effect of memoization performed during the first call to stream_get_at on thePrimes.

Lastly, let us see a simple but telling example that demonstrates a stream-based approach to addressing the potential risk of stack overflow due to deeply nested non-tail-recursive calls. The following implementation of list0_map is standard:

// implement {a}{b} list0_map (xs, fopr) = auxmain(xs) where { // fun auxmain ( xs: list0(a) ) : list0(b) = ( case+ xs of | list0_nil() => list0_nil() | list0_cons(x, xs) => list0_cons(fopr(x), auxmain(xs)) ) // } (* end of [list0_map] *) //

Clearly, this implementation is not tail-recursive. When applied to a long list (e.g., one consisting of 1000000 elements), list0_map runs a high risk of causing stack overflow. This can become a very serious issue in practice if we ever want to apply functional programming to a domain like machine learning where processing large data is a norm rather than an exception. One possibility is to insist on using only tail-recursion in the implementation of a function like list0_map that may need to be applied to large data, but such a requirement or restriction can clearly exert negative impact on the use of recursion in problem-solving. After all, there are numerous algorithms that are natrually expressed in terms of (general) recursion. And non-trivial effort is often needed in order to implement such an algorithm based on tail-recursion only, likely diminishing programming productivity.

Another implementation of list0_map is given as follows:

// implement {a}{b} list0_map(xs, fopr) = let // fun auxmain ( xs: list0(a) ) : stream(b) = $delay ( case+ xs of | list0_nil() => stream_nil() | list0_cons(x, xs) => stream_cons(fopr(x), auxmain(xs)) ) // in g0ofg1(stream2list(auxmain(xs))) end // end of [list0_map] //

What is special about this implementation of list0_map lies in the implementation of the inner function auxmain, which turns a list into a stream. Calling auxmain runs no risk of stack overflow as it simply creates a thunk (without issuing any recursive calls). The library function stream2list turns a stream into a (linear) list, which can be cast into a list0-value in O(1)-time by a call to g0ofg1. As stream2list is implemented tail-recursively, it can be safely called to generate a long list. Consequently, this stream-based implementation of list0_map can be applied to a long list with no concern of causing stack overflow. After linear lazy streams are introduced, we are to see a significantly improved version of this stream-based approach to resolving a common type of risk of stack overflow caused by calling recursively defined functions for generating long lists.

Please find on-line the entirety of the code used in this chapter.