Effective ATS:
Parallel Sorting via Fractional Continuation

In this article, I present an implementation of parallel merge-sort in the continuation-passing style (CPS), which achieves high concurrency by completely eliminating the need for explicit synchronization. It is expected that this style of implementation can be readily applied to various other algorithms based on the so-called divide-and-conquer strategy (for problem-solving).

Note that there is an article in the Effective-ATS series available on-line that contains some detailed explanation on implementing (top-down) merge-sort. One may want read the article first before studying the current one.

Sequential Merge-Sort

Let us first declare a function mergesort as follows:

fun
{a:t@ype}
mergesort{n:int}(xs: list(a, n)): list(a, n)
which takes a list of length N and returns another list of the same length. Strictly speaking, mergesort is a function template, which can be instantiated with any given type T to form a function for sorting a list in which each element is of the type T.

A sequential implementation of mergesort is given as follows:

implement
{a}(*tmp*)
mergesort(xs) =
msort(xs, length(xs)) where
{
//
fun
msort{n:int}
(
xs: list(a, n), n: int(n)
) : list(a, n) =
(
if
(n >= 2)
then let
  val n2 = n / 2
  val (xs1, xs2) = list_split<a>(xs, n2)
  val ys1 = msort(xs1, n2) and ys2 = msort(xs2, n - n2)
in
  list_merge<a>(ys1, ys2)
end // end of [then]
else xs // end of [else]
) (* end of [msort] *)
//
} (* end of [mergesort] *)
Note that the functions list_split and list_merge are given the following types:
//
fun
{a:t@ype}
list_merge
{n1,n2:int}
  (xs: list(a, n1), ys: list(a, n2)): list(a, n1+n2)
//
fun
{a:t@ype}
list_split
{n:int}{k:nat | k <= n}
  (xs: list(a, n), k: int(k) ): (list(a, k), list(a, n-k))
//
The function list_merge merges two given lists (that are ordered) into one (that is ordered). The function list_split splits a given list into two parts where the first part is a prefix of the given list (whose length is determined by the integer passed as the second argument to list_split) and the second part consists of the rest.

The given implementation of mergesort exemplifies the so-called divide-and-conquer strategy for problem-solving. The (inner) function msort inside mergesort divides a given problem into two subproblems; after recursively solving them, it combines the returned solutions into a solution to the original problem.

Issues on Parallelizing Merge-Sort

It is straightforward to parallelize the above implementation of msort by simply initiating two threads for handling the two recursive calls in the body of msort and then joining these two threads to obtain the values they return. In a programming language like Erlang where threads are extremely light-weighted, parallelizing msort in such a manner may make sense. If threads are not light-weighted (e.g., pthreads), doing so often leads to poor performance at run-time. Also, joining threads involves explicit synchronization and can have a great impact on diminishing the level of concurrency.

One common approach to lowering the overhead of creating and destroying threads is to start a pool of threads. Instead of creating a thread to perform a particular piece of work, this piece can be inserted into some kind of store shared by the pool of threads so as to allow one of them to pick up the piece and then execute it. Also, a thread can return to the pool (instead of simply exiting) after it completes a piece of work. With this appraoch, the overhead of creating and destroying threads is spread over the pieces of work done by the threads in the pool.

However, there is a caveat that one must pay close attention to when utilizing a thread pool. In general, each piece of work passed to this pool should not cause blocking for otherwise a scenario can readily occur where all of the threads in the pool are blocked (and thus no progress can be made in terms of work completion). This is especially true when a recursive problem-solving strategy (e.g., divide-and-conquer) is involved.

It should soon become clear that msort can be implemented in a continuation-passing style (CPS) that allows a problem to be divided into non-blocking subproblems, which one can pass directly to a thread pool without running risk of potential deadlocking.

Merge-Sort of Continuation-Passing style

A (sequential) implementation of mergesort in CPS-style is given as follows:

implement
{a}(*tmp*)
mergesort{n}(xs) =
msort(xs, length(xs), lam ys => ys) where
{
//
typedef res = list(a, n)
//
fun
msort{n:int}
(
xs: list(a, n), n: int(n), f: list(a, n) -> res
) : res =
(
if
(n >= 2)
then let
//
val n2 = n / 2
val
(xs1, xs2) =
list_split<a>(xs, n2)
//
in
//
msort
( xs1, n2
, lam(ys1) =>
  msort(xs2, n-n2, lam(ys2) => f(list_merge<a>(ys1, ys2)))
)
//
end // end of [then]
else f(xs) // end of [else]
) (* end of [msort] *)
//
} (* end of [mergesort] *)
Note that the third argument of msort (of the name k0) is a closure-function that acts like a continuation.

Parallelization via Fractional Continuation

The following code illustrates in clear and concrete terms the CPS-based approach to parallelizing merge-sort:

//
fun {
a:t@ype
} msort{n:int}
(
xs: list(a, n), n: int(n),
k0: list(a, n) -<cloref1> void
) : void = let
//
// For sending to a thread pool
// a given closure (representing work)
extern fun submit : lazy(void) -> void
//
in
//
if
(n >= 2)
then let
//
val n2 = n / 2
val
(xs1, xs2) =
list_split<a>(xs, n2)
//
// [cnt] should be
// guarded by a spinlock:
val cnt = ref<int>(0)
//
val ys1 = ref<list(a, n/2)>(_)
val ys2 = ref<list(a, n-n/2)>(_)
//
val () =
submit
(
delay( // delaying the evaluation of its argument
msort(
  xs1, n2
, lam(ys1_) =>
  (!ys1 := ys1_;
   !cnt := !cnt + 1;
   if !cnt < 2 then () else k0(list_merge<a>(!ys1, !ys2))
  )
) (* msort *)
) (* delay *)
) (* submit *)
//
val () =
submit
(
delay( // delaying the evaluation of its argument
msort(
  xs2, n-n2
, lam(ys2_) =>
  (!ys2 := ys2_;
   !cnt := !cnt + 1;
   if !cnt < 2 then () else k0(list_merge<a>(!ys1, !ys2))
  )
) (* msort *)
) (* delay *)
) (* submit *)
//
in
  // nothing
end // end of [then]
else k0(xs) // end of [else]
//
end (* end of [msort] *)
//
Note that the two recursive calls in the body of msort are turned into two closure-functions and then submitted (to some thread pool). Each of these closure-functions carries a continuation that may be thought of as a fractional one in the following sense. After one recursive call to msort is evaluated, the returned list is stored in some reference and the count stored in some counter (set to 0 initially) is increased by 1. If this count is less than 2, then the other recursive call to msort is not yet completed; otherwise, it is completed and the two lists returned by the two recursive calls to msort can be merged and then passed to the original continuation. To some extent, one may think of the original continuation as being split into two fractions, and each fraction is used to build a new continuation for a recursive call (to msort).

The function mergesort can now be implemented as follows:

implement
{a}(*tmp*)
mergesort
 {n}(xs) = let
//
extern
fun block(): void // for blocking
and unblock(): void // for unblocking
//
// For sending to a thread pool
// a given closure (representing work)
extern fun submit : lazy(void) -> void
//
val ys = ref<list(a, n)>(_)
//
val () =
submit
(
delay(
msort<a>
( xs
, length(xs)
, lam(ys_) => (!ys := ys_; unblock())
)
) (* delay *)
) (* submit *)
//
in
  block(); !ys  
end // end of [mergesort]
Note that msort is in CPS-style but mergesort is in direct style (rather than CPS-style). Therefore, some form of handshake is needed in order for the caller mergesort to learn that the callee msort is finished. The functions block and unblock are introduced precisely for this purpose.


Please find in the following files the entirety of the code presented in this article:

mergesort.dats
mergesort_cps.dats
mergesort_par_cps.dats
In addition, there is an accompanying Makefile for typechecking the code.


This article is written by Hongwei Xi.