Effective ATS:
Inter-Process Communication based on Redis

In this article, I present a straightforward example of inter-process communication based on the NOSQL redis. The primary purpose of this example is to show a concrete case where redis functions are directly called inside ATS code.

Message Channels

Let us first introduce a type msgchan for message channels:
abstype msgchan_type = ptr
typedef msgchan = msgchan_type
The following function is for creating a message channel:
fun msgchan_create (name: string): msgchan
In the actual implementation, a message channel is just a key referring to a queue in redis, and the function msgchan_create computes such a key based on a given name.

In order to insert a message into a given message channel, the following function msgchan_insert can be called:

fun msgchan_insert
  (chan: msgchan, msg: string, nerr: &int >> _) : void
The third argument of msgchan_insert is call-by-reference, and its value is increased to indicate a failed attempt to insert a message.

In order to take out a message for a given message channel, the following function msgchan_takeout can be called:

fun msgchan_takeout (chan: msgchan, nerr: &int >> _): stropt
The second argument of msgchan_takeout is call-by-reference, and its value is increased to indicate a failed attempt to take out a message. What msgchan_takeout returns is an optional string, which is either a regular string or a null pointer. In case a null pointer is returned, it is also an indication of failure of some sort. Note that a call to msgchan_takeout on a given channel is blocked if the channel is currently empty.

If calling msgchan_insert or msgchan_takeout on a channel results in a failure, then the redis connection associated with the channel should be re-established before a second attempt is made.

Redis Connection Setup

To set up a redis connection means to create a value of the type redisContext(l), where l refers to the memory location at which the value is stored. Let us use the name redisContext-value to refer to such a value. In case a redis connection cannot be set up successfully, the type redisContext(null) is assigned to the created redisContext-value, which is just a null pointer.

Once a redis connection is established, the redisContext-value associated with this connection is stored in a reference (that is, an array of size 1). The following function can be called to obtain the stored redisContext-value (for temporary use):

fun the_redisContext_vtget (): [l:addr] vttakeout0 (redisContext(l))
where vttakeout0 is defined as follows:
vtypedef
vttakeout0 (a:vt@ype) = (a -<lin,prf> void | a) // borrowed linear value
The following code depicts a typical calling sequence:
//
val (fpf | ctx) = the_redisContext_vtget ()
//
// Here is some code that makes use of [ctx]
//
prval () = fpf (ctx) // returning the borrowed context
//
where the_redisContext_vtget is called to obtain a redisContext-value and a proof function for returning this value, and the proof function is then called on the redisContext-value to return it (in the sense of theorem-proving) after its being used.

Testing the Implementation

The ATS code implementing message channels can be found in the following files:
msgchan.sats
msgchan.dats
redisContextSetup.dats
In the file [test_up.dats], some testing code is available for uploading the content of a file into a message channel.

In the file [test_dn.dats], some testing code is available for downloading the content of a message channel into a file.

There is also a Makefile available for compiling the ATS source code into excutables [test_up] and [test_dn] for testing. Assume the availability of a redis server running on the default port 6379 at the IP address 127.0.0.1. By executing the following command-line:

cat msgchan.sats | ./test_up & ./test_dn
one should be able to see the content of the file [msgchan.sats] being output to the console in a line-by-line fashion, where a short pause (of 3 seconds at most) is introduced between the appearance of two consecutive lines.
This article is written by Hongwei Xi.