In this section, I will present an implementation of linear channels to support asynchronous communication between threads. This is also a very fitting occasion for me to advocate what I often refer to as programmer-centric program verification.
A communication channel between threads is essentially a queue wrapped in some kind of protection mechanism needed for guarding against race conditions. Assume that a queue is of a fixed capacity, that is, the capacity of the queue is fixed after its creation. If the queue is full, then inserting an element into it results in a failure. If the queue is empty, then removing an element from it results in a failure. In order to prevent inserting into a full queue or removing from an empty queue, I could first introduce a linear abstract type for queues as follows:
absvtype queue_vtype(a:vt@ype+, int(*m*), int(*n*)) vtypedef queue(a:vt@ype,m:int,n:int) = queue_vtype(a,m,n)
// fun{a:vt0p} queue_insert {m,n:nat | m > n} (!queue(a, m, n) >> queue(a, m, n+1), a): void // fun{a:vt0p} queue_remove {m,n:nat | n > 0}(!queue(a, m, n) >> queue(a, m, n-1)): (a) //
Following is another version of abstract type queue:
// absvtype queue_vtype(a:vt@ype+, int(*id*)) = ptr // vtypedef queue(a:vt0p, id:int) = queue_vtype(a, id) vtypedef queue(a:vt0p) = [id:int] queue(a, id) //
// absprop ISNIL(id:int, b:bool) // fun {a:vt0p} queue_isnil{id:int}(!queue(a, id)): [b:bool] (ISNIL(id, b) | bool(b)) //
// absprop ISFUL(id:int, b:bool) // fun {a:vt0p} queue_isful{id:int}(!queue(a, id)): [b:bool] (ISFUL(id, b) | bool(b)) //
The functions queue_insert and queue_remove for inserting into and removing from a given queue can now be given the following interface:
// extern fun {a:vt0p} queue_insert {id:int} ( ISFUL(id, false) | xs: !queue(a, id) >> queue(a, id2), x: a ) : #[id2:int] void // extern fun {a:vt0p} queue_remove {id:int} ( ISNIL(id, false) | xs: !queue(a, id) >> queue(a, id2) ) : #[id2:int] a // end-of-fun //
In order to call queue_insert on a given queue, one needs to have a proof attesting to the queue being not full. Such a proof is obtained if calling queue_isful on the queue returns false. Similarly, in order to call queue_remove on a given queue, one can first call queue_isnil on the queue to obtain a proof attesting to the queue being not empty.
What is really of concern here is not to actually verify that queue_isnil and queue_isful have the interface assigned to them. Instead, the focus is on ensuring that queue_insert is never applied to a full queue and queue_remove is never applied to an empty queue under the assumption that queue_isnil and queue_isful have the assigned interface. I refer to this form of program verification as being programmer-centric because its correctness is not formally established in an objective manner. I myself find that programmer-centric program verification is very flexible and effective in practice. If we believe that constructing informal mathematical proofs can help one check whether the proven statements are valid, then it is only natural to believe that programmer-centric program verification can also help one check whether verified programs are correct.
Let us now start to implement linear channels for asynchronous communication between threads. First, let us declare a linear abstract type channel as follows:
The function for inserting an element into a channel is given the following interface: The caller of channel_insert is blocked if the channel is full. Similarly, the function for removing an element from a channel is given the following interface: The caller of channel_remove is blocked if the channel is empty.Let a channel be represented as follows:
// datavtype channel_ = { l0,l1,l2,l3:agz } CHANNEL of @{ cap=intGt(0) , spin=spin_vt(l0) , rfcnt=intGt(0) , mutex=mutex_vt(l1) , CVisnil=condvar_vt(l2) , CVisful=condvar_vt(l3) , queue=ptr // deqarray } (* end of [channel] *) // assume channel_vtype(a:vt0p) = channel_ //
The function channel_insert is given the following implementation:
implement {a}(*tmp*) channel_insert (chan, x0) = let // val+CHANNEL {l0,l1,l2,l3}(ch) = chan val mutex = unsafe_mutex_vt2t(ch.mutex) val (pfmut | ()) = mutex_lock (mutex) val xs = $UN.castvwtp0{queue(a)}((pfmut | ch.queue)) val ((*void*)) = channel_insert2<a> (chan, xs, x0) prval pfmut = $UN.castview0{locked_v(l1)}(xs) val ((*void*)) = mutex_unlock (pfmut | mutex) // in // nothing end // end of [channel_insert]
implement {a}(*tmp*) channel_insert2 (chan, xs, x0) = let // val+CHANNEL {l0,l1,l2,l3}(ch) = chan // val (pf | isful) = queue_isful (xs) // in // if isful then let prval (pfmut, fpf) = __assert () where { extern praxi __assert (): vtakeout0(locked_v(l1)) } val mutex = unsafe_mutex_vt2t(ch.mutex) val CVisful = unsafe_condvar_vt2t(ch.CVisful) val ((*void*)) = condvar_wait (pfmut | CVisful, mutex) prval ((*void*)) = fpf (pfmut) in channel_insert2 (chan, xs, x0) end // end of [then] else let val isnil = queue_isnil (xs) val ((*void*)) = queue_insert (pf | xs, x0) val ((*void*)) = if isnil.1 then condvar_broadcast(unsafe_condvar_vt2t(ch.CVisnil)) // end of [if] in // nothing end // end of [else] // end // end of [channel_insert2]
By following the above implementation for channel_insert (and channel_insert2), it should be pretty straightforward for one to figure out an implementation for channel_remove. I leave it as an exercise.
Please find on-line the file channel_vt.dats containing the entirety of the code presented in this section plus some code for testing.