// absvtype buffer_vtype(a:vt@ype+, m:int, n: int) = ptr // vtypedef buffer(a:vt0p) = [m,n:int] buffer_vtype(a, m, n) vtypedef buffer(a:vt0p, m:int, n:int) = buffer_vtype(a, m, n) //Given a type T and two integers M and N, the type buffer(T, M, N) is for a buffer of capacity M in which N elements of the type T are stored. The following lemma can be used to establish the property that M >= N and N >= 0 (for the purpose of constraint-solving):
praxi
lemma_buffer_param{a:vt0p}
{m,n:int}(!buffer(INV(a), m, n)): [m >= n; n >= 0] void
The function buffer_make_nil can be called to create a linear buffer of a
given capacity that contains no elements:
fun{a:vt0p}
buffer_make_nil{m:pos} (cap: int m): buffer(a, m, 0)
The functions buffer_isnil and buffer_isful
can be called to test whether a given buffer is empty and full,
respectively:
fun buffer_isnil{a:vt0p} {m,n:int} (!buffer(INV(a), m, n)): bool(n==0) fun buffer_isful{a:vt0p} {m,n:int} (!buffer(INV(a), m, n)): bool(m==n)The function buffer_insert is for inserting an element into a buffer that is not full:
fun{a:vt0p} buffer_insert{m,n:int | n < m} ( !buffer(INV(a), m, n) >> buffer(a, m, n+1), x: a ) : void // end of [buffer_insert]The function buffer_insert is for taking out an element from a buffer that is not empty:
fun{a:vt0p}
buffer_takeout{m,n:int | n > 0}
(buf: !buffer(INV(a), m, n) >> buffer(a, m, n-1)): (a)
Let us now introduce an abstract type for shared buffers:
// abstype sbuffer_type(a:vt@ype) = ptr // typedef sbuffer(a:vt0p) = sbuffer_type(a) //Please note that sbuffer is a non-linear type. This means that a shared buffer cannot be freed explicitly after its creation. It is also possible to make sbuffer a reference-counted linear type so that a shared buffer can be freed. The interested reader is encouraged to give this alternative design a try, which should only lead to slightly more involved implementation.
Given a linear buffer, sbuffer_make_buffer turns it into a shared buffer (by wrapping a protection mechanism around it):
fun{a:vt0p}
sbuffer_make_buffer (buffer(a)): sbuffer(a)
The function sbuffer_insert inserts an element into
a shared buffer:
fun{a:vt0p} sbuffer_insert (sbuffer(a), x: a): void // called by producerNote that a call to sbuffer_insert may be blocked due to the linear buffer inside the shared buffer being full.
The function sbuffer_takeout takes out an element from a shared buffer:
fun{a:vt0p} sbuffer_takeout (sbuf: sbuffer(a)): (a) // called by consumerNote that a call to sbuffer_takeout may be blocked due to the linear buffer inside the shared buffer being empty.
The function sbuffer_acquire is for acquiring the linear buffer inside a given shared buffer:
fun sbuffer_acquire{a:vt0p} (sbuf: sbuffer(a)): buffer(a)
The function sbuffer_release is for releasing a linear
buffer (to the same shared buffer from which the linear buffer was
previously acquired):
fun
sbuffer_release{a:vt0p} (sbuf: sbuffer(a), buf: buffer(a)): void
Let us now declare two functions as follows:
fun{a:vt0p} sbuffer_insert2 (sbuffer(a), !buffer(INV(a)) >> _, x: a): void fun{a:vt0p} sbuffer_takeout2 (sbf: sbuffer(a), buf: !buffer(INV(a)) >> _): (a)A call to the function sbuffer_insert2 tries to insert an element into a given linear buffer. In the case where the linear buffer is full, the caller is blocked to wait on some conditional variable until it is notified that the linear buffer is no longer full.
A call to the function sbuffer_takeout2 tries to take out an element from a given linear buffer. In the case where the linear buffer is empty, the caller is blocked to wait on some conditional variable until it is notified that the linear buffer is no longer empty.
It is straightforward to implement sbuffer_insert and sbuffer_takeout based on sbuffer_insert2 and sbuffer_takeout2, respectively:
implement{a} sbuffer_insert (sbuf, x) = { val buf = sbuffer_acquire (sbuf) val ((*void*)) = sbuffer_insert2 (sbuf, buf, x) val ((*void*)) = sbuffer_release (sbuf, buf) } implement{a} sbuffer_takeout (sbuf) = x where { val buf = sbuffer_acquire (sbuf) val x(*a*) = sbuffer_takeout2 (sbuf, buf) val ((*void*)) = sbuffer_release (sbuf, buf) }There are two conditional variables (CVs) inside a shared buffer. One of them is for handling the condition of the linear buffer (inside the shared buffer) being empty, and the other CV is for handling the condition of the linear buffer being full. The following functions are implemented directly based on these two conditional variables:
// fun sbuffer_wait_isnil {a:vt0p}{m:int} (sbuffer(a), !buffer(a, m, 0) >> buffer(a)): void fun sbuffer_signal_isnil{a:vt0p}{m,n:int} (sbuf: sbuffer(a)): void // fun sbuffer_wait_isful {a:vt0p}{m:int} (sbuffer(a), !buffer(a, m, m) >> buffer(a)): void fun sbuffer_signal_isful{a:vt0p}{m,n:int} (sbuf: sbuffer(a)): void //The type assigned to the function sbuffer_wait_isnil means that its caller must have possession of a linear buffer when calling it and the caller should still have possession of the linear buffer, which may be empty, when the call returns. The reason for the linear buffer being still empty at the moment when a call to sbuffer_wait_isnil returns is that other callers (consumers) may have gained access to the linear buffer earlier.
The type assigned to the function sbuffer_wait_isful can be explained similarly.
implement{a} sbuffer_insert2 (sbuf, buf, x) = let // val isful = buffer_isful (buf) // prval () = lemma_buffer_param (buf) // in // if isful then let val () = sbuffer_wait_isful (sbuf, buf) // end of [val] in sbuffer_insert2 (sbuf, buf, x) end // end of [then] else let val isnil = buffer_isnil (buf) val ((*void*)) = buffer_insert (buf, x) val ((*void*)) = if isnil then sbuffer_signal_isnil (sbuf) in // nothing end // end of [else] // end // end of [sbuffer_insert2]The code should be self-explaining. Please note that inserting an element into an empty buffer requires a signal to be sent to the CV handling the condition of buffer being empty. If this is not done, then a deadlock may occur as the consumers waiting on the CV can never be awakened.
implement{a} sbuffer_takeout2 (sbuf, buf) = let // val isnil = buffer_isnil (buf) // prval () = lemma_buffer_param (buf) // in // if isnil then let val () = sbuffer_wait_isnil (sbuf, buf) // end of [val] in sbuffer_takeout2 (sbuf, buf) end // end of [then] else x where { val isful = buffer_isful (buf) val x(*a*) = buffer_takeout (buf) val ((*void*)) = if isful then sbuffer_signal_isful (sbuf) } (* end of [else] *) // end // end of [sbuffer_takeout2]The code should be self-explaining. Please note that taking out an element from a full buffer requires a signal to be sent to the CV handling the condition of buffer being full. If this is not done, then a deadlock may occur as the producers waiting on the CV can never be awakened.
struct { void *buffer ; mutex_t mutex ; cond_t CVisnil ; cond_t CVisful ; }Instead of being coded in C directly, the rest of my implementation is also written in ATS but it makes pervasive use of some unsafe programming features. Please find the entirety of the code in the files sbuffer.sats and sbuffer.dats. There is no testing code in my implementation as I feel highly confident about its correctness. A primary reason for programming in ATS is that one can make effective use of types in ATS so as to convince oneself that his or her code should perform as expected. I feel that I have done it in this case.
Suppose we have a linear type Resource for values that we want to share:
absvtype Resource
Let us introduce a (non-linear) type SharedResource for
values that are formed by wrapping some kind of protection mechnism
around linear values of the type Resource:
abstype SharedResource
The function that does the wrapping can be given the following type:
fun SharedResource_create (R: Resource): SharedResource
The linear resource inside a shared resource is protected by a lock,
and the following two functions are for taking out the linear resource
and returning it back:
fun SharedResource_acquire (SR: SharedResource): Resource fun SharedResource_release (SR: SharedResource, R: Resource): voidLet us now assume that we have the following function for processing a linear resource:
fun Resource_process (R: !Resource >> _): bool
If a call to Resource_process returns true, then the resource has been
processed properly. Otherwise, it is an indication that the state of the
resource needs to be changed before it is suitable for processing.
What we want to implement is the following function:
fun SharedResource_process (SR: SharedResource): void
When called on a shared resource, SharedResource_process tries to process
the linear resource inside it by calling Resource_process.
An implementation of SharedResource_process is sketched as follows:
// extern fun SharedResource_wait (SR: SharedResource, R: !Resource >> _): void extern fun SharedResource_process2 (SR: SharedResource, R: !Resource >> _): void // implement SharedResource_process (SR) = () where { val R = SharedResource_acquire (SR) val () = SharedResource_process2 (SR, R) val () = SharedResource_release (SR, R) } // implement SharedResource_process2 (SR, R) = let val ans = Resource_process (R) in // if ans then ( // processing is done properly // there may be a need to send signals // to some conditional variables ) else let val () = SharedResource_wait (SR, R) // end of [val] in SharedResource_process2 (SR, R) end // end of [else] // end // end of [SharedResource_process2] //A call to SharedResource_wait puts the caller on the waiting-list of some conditional variable to avoid busy-waiting. Whenever the condition needed for Resource_process to do proper processing is met, a signal should be sent to the conditional variable (by another caller).
Please find in the file SharedResource.dats the entirety of the code presented in this section.
This article is written by Hongwei Xi.