Effective ATS:
Producer-Consumer

The Producer-Consumer problem is a classical one in concurrent programming. In this article, I present an implementation of this problem that makes highly effective use of dependent types and linear types. Due to great difficulty in debugging concurrent programs, relying on static checking to detect and fix bugs is of much more significance in concurrent programing than in sequential programming. While the style of programming I employ here may take time for one to master, the great advantages it brings can be readily appreciated. In general, ATS shines much more brightly in contexts where dynamic testing becomes more difficult to perform and/or control.

Description of the Problem

Given a buffer of finite capacity, there are multiple producers who insert items into the buffer and also multiple consumers who take out items from the buffer. If a producer wants to insert an item but the buffer is full, the producer is blocked until the buffer becomes not full. If a consumer wants to take out an item but the buffer is empty, then the consumer is blocked until the buffer is not empty.

Interface for Linear Buffer

Let us first declare as follows a linear abstract type for buffers:
//
absvtype
buffer_vtype(a:vt@ype+, m:int, n: int) = ptr
//
vtypedef
buffer(a:vt0p) = [m,n:int] buffer_vtype(a, m, n)
vtypedef
buffer(a:vt0p, m:int, n:int) = buffer_vtype(a, m, n)
//
Given a type T and two integers M and N, the type buffer(T, M, N) is for a buffer of capacity M in which N elements of the type T are stored. The following lemma can be used to establish the property that M >= N and N >= 0 (for the purpose of constraint-solving):
praxi
lemma_buffer_param{a:vt0p}
  {m,n:int}(!buffer(INV(a), m, n)): [m >= n; n >= 0] void
The function buffer_make_nil can be called to create a linear buffer of a given capacity that contains no elements:
fun{a:vt0p}
buffer_make_nil{m:pos} (cap: int m): buffer(a, m, 0)
The functions buffer_isnil and buffer_isful can be called to test whether a given buffer is empty and full, respectively:
fun buffer_isnil{a:vt0p}
  {m,n:int} (!buffer(INV(a), m, n)): bool(n==0)
fun buffer_isful{a:vt0p}
  {m,n:int} (!buffer(INV(a), m, n)): bool(m==n)
The function buffer_insert is for inserting an element into a buffer that is not full:
fun{a:vt0p}
buffer_insert{m,n:int | n < m}
(
  !buffer(INV(a), m, n) >> buffer(a, m, n+1), x: a
) : void // end of [buffer_insert]
The function buffer_insert is for taking out an element from a buffer that is not empty:
fun{a:vt0p}
buffer_takeout{m,n:int | n > 0}
  (buf: !buffer(INV(a), m, n) >> buffer(a, m, n-1)): (a)

Interface for Shared Buffer

In terms of implementation, a shared buffer wraps a protection mechanism around a linear buffer that consists of a mutex and two conditional variables. The mutex is for protecting the linear buffer and the conditional variables are introduced to avoid busy-waiting. I will present more details on the protection mechanism later.

Let us now introduce an abstract type for shared buffers:

//
abstype
sbuffer_type(a:vt@ype) = ptr
//
typedef sbuffer(a:vt0p) = sbuffer_type(a)
//
Please note that sbuffer is a non-linear type. This means that a shared buffer cannot be freed explicitly after its creation. It is also possible to make sbuffer a reference-counted linear type so that a shared buffer can be freed. The interested reader is encouraged to give this alternative design a try, which should only lead to slightly more involved implementation.

Given a linear buffer, sbuffer_make_buffer turns it into a shared buffer (by wrapping a protection mechanism around it):

fun{a:vt0p}
sbuffer_make_buffer (buffer(a)): sbuffer(a)
The function sbuffer_insert inserts an element into a shared buffer:
fun{a:vt0p}
sbuffer_insert (sbuffer(a), x: a): void // called by producer
Note that a call to sbuffer_insert may be blocked due to the linear buffer inside the shared buffer being full.

The function sbuffer_takeout takes out an element from a shared buffer:

fun{a:vt0p}
sbuffer_takeout (sbuf: sbuffer(a)): (a) // called by consumer
Note that a call to sbuffer_takeout may be blocked due to the linear buffer inside the shared buffer being empty.

Implementation of Shared Buffer

The linear buffer inside a shared buffer is protected by a mutex.

The function sbuffer_acquire is for acquiring the linear buffer inside a given shared buffer:

fun sbuffer_acquire{a:vt0p} (sbuf: sbuffer(a)): buffer(a)
The function sbuffer_release is for releasing a linear buffer (to the same shared buffer from which the linear buffer was previously acquired):
fun
sbuffer_release{a:vt0p} (sbuf: sbuffer(a), buf: buffer(a)): void
Let us now declare two functions as follows:
fun{a:vt0p}
sbuffer_insert2 (sbuffer(a), !buffer(INV(a)) >> _, x: a): void
fun{a:vt0p}
sbuffer_takeout2 (sbf: sbuffer(a), buf: !buffer(INV(a)) >> _): (a)
A call to the function sbuffer_insert2 tries to insert an element into a given linear buffer. In the case where the linear buffer is full, the caller is blocked to wait on some conditional variable until it is notified that the linear buffer is no longer full.

A call to the function sbuffer_takeout2 tries to take out an element from a given linear buffer. In the case where the linear buffer is empty, the caller is blocked to wait on some conditional variable until it is notified that the linear buffer is no longer empty.

It is straightforward to implement sbuffer_insert and sbuffer_takeout based on sbuffer_insert2 and sbuffer_takeout2, respectively:

implement{a}
sbuffer_insert (sbuf, x) =
{
  val buf = sbuffer_acquire (sbuf)
  val ((*void*)) = sbuffer_insert2 (sbuf, buf, x)
  val ((*void*)) = sbuffer_release (sbuf, buf)
}

implement{a}
sbuffer_takeout (sbuf) = x where
{
  val buf = sbuffer_acquire (sbuf)
  val x(*a*) = sbuffer_takeout2 (sbuf, buf)
  val ((*void*)) = sbuffer_release (sbuf, buf)
}
There are two conditional variables (CVs) inside a shared buffer. One of them is for handling the condition of the linear buffer (inside the shared buffer) being empty, and the other CV is for handling the condition of the linear buffer being full. The following functions are implemented directly based on these two conditional variables:
//
fun
sbuffer_wait_isnil
  {a:vt0p}{m:int}
  (sbuffer(a), !buffer(a, m, 0) >> buffer(a)): void
fun
sbuffer_signal_isnil{a:vt0p}{m,n:int} (sbuf: sbuffer(a)): void
//
fun
sbuffer_wait_isful
  {a:vt0p}{m:int}
  (sbuffer(a), !buffer(a, m, m) >> buffer(a)): void
fun
sbuffer_signal_isful{a:vt0p}{m,n:int} (sbuf: sbuffer(a)): void
//
The type assigned to the function sbuffer_wait_isnil means that its caller must have possession of a linear buffer when calling it and the caller should still have possession of the linear buffer, which may be empty, when the call returns. The reason for the linear buffer being still empty at the moment when a call to sbuffer_wait_isnil returns is that other callers (consumers) may have gained access to the linear buffer earlier.

The type assigned to the function sbuffer_wait_isful can be explained similarly.

Implementing sbuffer_insert2

An implementation of sbuffer_insert2 is given as follows:
implement{a}
sbuffer_insert2
  (sbuf, buf, x) = let
//
val isful = buffer_isful (buf)
//
prval () = lemma_buffer_param (buf)
//
in
//
if isful
  then let
    val () =
      sbuffer_wait_isful (sbuf, buf)
    // end of [val]
  in
    sbuffer_insert2 (sbuf, buf, x)
  end // end of [then]
  else let
    val isnil = buffer_isnil (buf)
    val ((*void*)) = buffer_insert (buf, x)
    val ((*void*)) = if isnil then sbuffer_signal_isnil (sbuf)
  in
    // nothing
  end // end of [else]
//  
end // end of [sbuffer_insert2]
The code should be self-explaining. Please note that inserting an element into an empty buffer requires a signal to be sent to the CV handling the condition of buffer being empty. If this is not done, then a deadlock may occur as the consumers waiting on the CV can never be awakened.

Implementing sbuffer_takeout2

implement{a}
sbuffer_takeout2
  (sbuf, buf) = let
//
val isnil = buffer_isnil (buf)
//
prval () = lemma_buffer_param (buf)
//
in
//
if isnil
  then let
    val () =
      sbuffer_wait_isnil (sbuf, buf)
    // end of [val]
  in
    sbuffer_takeout2 (sbuf, buf)
  end // end of [then]
  else x where
  {
    val isful = buffer_isful (buf)
    val x(*a*) = buffer_takeout (buf)
    val ((*void*)) = if isful then sbuffer_signal_isful (sbuf)
  } (* end of [else] *)
//  
end // end of [sbuffer_takeout2]
The code should be self-explaining. Please note that taking out an element from a full buffer requires a signal to be sent to the CV handling the condition of buffer being full. If this is not done, then a deadlock may occur as the producers waiting on the CV can never be awakened.

Summary of the Remaining Implementation

The remaining implementation of the producer-consumer problem can be readily done in C (based on pthread support for mutexes and conditional variables). For instance, sbuffer can be represented as a pointer to a value of the following struct type:
struct
{
  void *buffer ;
  mutex_t mutex ;
  cond_t CVisnil ;
  cond_t CVisful ;
}
Instead of being coded in C directly, the rest of my implementation is also written in ATS but it makes pervasive use of some unsafe programming features. Please find the entirety of the code in the files sbuffer.sats and sbuffer.dats. There is no testing code in my implementation as I feel highly confident about its correctness. A primary reason for programming in ATS is that one can make effective use of types in ATS so as to convince oneself that his or her code should perform as expected. I feel that I have done it in this case.

Safe Programming with Shared Resources

In the rest of the article, I would like to outline a general approach to safe programming with shared resources that can be seen as a generalization of the implementation of the producer-consumer problem presented above.

Suppose we have a linear type Resource for values that we want to share:

absvtype Resource
Let us introduce a (non-linear) type SharedResource for values that are formed by wrapping some kind of protection mechnism around linear values of the type Resource:
abstype SharedResource
The function that does the wrapping can be given the following type:
fun SharedResource_create (R: Resource): SharedResource
The linear resource inside a shared resource is protected by a lock, and the following two functions are for taking out the linear resource and returning it back:
fun SharedResource_acquire (SR: SharedResource): Resource
fun SharedResource_release (SR: SharedResource, R: Resource): void
Let us now assume that we have the following function for processing a linear resource:
fun Resource_process (R: !Resource >> _): bool
If a call to Resource_process returns true, then the resource has been processed properly. Otherwise, it is an indication that the state of the resource needs to be changed before it is suitable for processing.

What we want to implement is the following function:

fun SharedResource_process (SR: SharedResource): void
When called on a shared resource, SharedResource_process tries to process the linear resource inside it by calling Resource_process.

An implementation of SharedResource_process is sketched as follows:

//
extern
fun
SharedResource_wait
  (SR: SharedResource, R: !Resource >> _): void
extern
fun
SharedResource_process2
  (SR: SharedResource, R: !Resource >> _): void
//
implement
SharedResource_process
  (SR) = () where
{
  val R = SharedResource_acquire (SR)
  val () = SharedResource_process2 (SR, R)
  val () = SharedResource_release (SR, R)
}
//
implement
SharedResource_process2
  (SR, R) = let
  val ans = Resource_process (R)
in
//
if ans
  then
  (
    // processing is done properly
    // there may be a need to send signals
    // to some conditional variables
  )
  else let
    val () =
      SharedResource_wait (SR, R)
    // end of [val]
  in
    SharedResource_process2 (SR, R)
  end // end of [else]
//
end // end of [SharedResource_process2]
//
A call to SharedResource_wait puts the caller on the waiting-list of some conditional variable to avoid busy-waiting. Whenever the condition needed for Resource_process to do proper processing is met, a signal should be sent to the conditional variable (by another caller).

Please find in the file SharedResource.dats the entirety of the code presented in this section.


This article is written by Hongwei Xi.