Effective ATS:
An Amortized Queue Implementation

Queues are a common data structure in practice. In the following presentation, I would like to give a simple 2-list-based implementation of queues for which both insertion (enqueue) and removal (dequeue) are guaranteed to be amortized O(1)-time operations.

A Simple Interface for Queues

Let us first declare as follows a linear abstract type for queues:
absvtype queue_vtype (a:viewt@ype, n:int) = ptr
vtypedef queue (a:vt0p, n:int) = queue_vtype (a, n)
The full name of the declared abstract type is [queue_vtype] and it is given an alias [queue]. Given a type T, which may be linear, and an integer N, the type [queue(T, N)] is for queues containing N elements of type T. Obviously, N should be a natural number if there is ever a queue of the type [queue(T, N)]. This is captured by the type of the following function (or more precisely, proof-function):
  {a:vt0p}{n:int} (que: !queue (a, n)): [n >= 0] void
The following function is to be called to create a queue of size 0:
fun{} queue_make_nil {a:vt0p} (): queue (a, 0)
By declaring [queue_make_nil] as a function template, we have a better chance to expect that its definition be inlined during compilation. This makes sense as [queue_make_nil] should really just be a function of a tiny body.

The following function is to be called to destroy a queue of size 0 (and free the memory it occupies):

fun{} queue_free_nil {a:vt0p} (que: queue (a, 0)): void
Clearly, we also need to test whether a given queue is empty. So let us declare the following function for this purpose:
  {a:vt0p}{n:int} (que: !queue (a, n)): bool (n==0)
Similarly, let us declare the following function for testing whether a given queue is not empty:
  {a:vt0p}{n:int} (que: !queue (a, n)): bool (n > 0)
As for enqueueing, we declare the following function:
  (que: !queue (a, n) >> queue (a, n+1), x: a): void
Note that the syntax indicates that [que] is a call-by-value argument and the type of the queue it refers to changes from [queue(T, N)] to [queue(T, N+1)] for some type T and integer N. Obviously, the increased size is due to one element being inserted into the queue.

As for dequeueing, we declare the following function:

queue_takeout_atbeg{n:int | n > 0} (que: !queue (a, n) >> queue (a, n-1)): a
Note that the syntax indicates that [que] is a call-by-value argument and the type of the queue it refers to changes from [queue(T, N)] to [queue(T, N-1)] for some type T and positive integer N. Obviously, the decreased size is due to one element being removed from the queue.

Now it is ready for us to implement the abstract type [queue_vtype] and the functions associated with it.

A 2-List-Based Implementation of Queues

In a functional programming setting, a queue is often represented as two lists: the front part and the rear part. For enqueueing, the rear part is replaced with a new list whose head is the enqueued element and whose tail is the original rear part. For dequeueing, if the front part is empty, then it is first replaced with the reverse of the rear part while the rear part is replaced with the empty list; if the front part is not empty, then it is replaced with its own tail while its head is returned as the dequeued element.

Clearly, this implementation guarantees that enqueueing is always O(1)-time but dequeueing may be O(n)-time, where n is the size of the rear part of the queue. It can also be readily noted that enqueueing and dequeueing are both O(1)-time if amortized time-complexity is of the concern.

Let us now see the actual code for this 2-list-based implementation of queues. Instead of functional lists, which require the presence of GC to avoid memory leaks, the code makes use of linear singly-linked lists (of the type [sllist]), performing memory allocation/deallocation explicitly.

staload "libats/SATS/sllist.sats"

queue (a:viewt@ype+, n:int) =
  {f,r:nat | f+r==n} QUEUE of (sllist (a, f), sllist (a, r))
// end of [queue]

assume queue_vtype (a, n) = queue (a, n)
Clearly, the above code indicates that the size of a queue equals the sum of lengths of its front and rear parts.
queue_make_nil () = QUEUE (sllist_nil (), sllist_nil ())
The function [queue_make_nil] is called to create a queue of size 0, which consists of an empty front part and an empty rear part.
queue_free_nil (que) = let
val+~QUEUE (f, r) = que
prval () = lemma_sllist_param (f)
prval () = lemma_sllist_param (r)
prval () = sllist_free_nil (f)
prval () = sllist_free_nil (r)
  // nothing
end // end of [queue_free_nil]
The function [queue_free_nil] is called to destroy a queue of size 0. As the elements in a queue may be of a linear type, that is, they may contain resources, a queue can be safely freed only if it contains no elements.
queue_is_empty (que) = let
  val+QUEUE (f, r) = que
  if sllist_is_nil (f) then sllist_is_nil (r) else false
end // end of [queue_is_empty]
A queue is empty if and only if both of its front and rear parts are empty.
queue_isnot_empty (que) = let
  val+QUEUE (f, r) = que
  if sllist_is_cons (f) then true else sllist_is_cons (r)
end // end of [queue_isnot_empty]
A queue is not empty if and only if either its front or its rear is not empty. It is also possible to implement [queue_isnot_empty] directly based on [queue_is_empty] as is shown below:
queue_isnot_empty (que) = let
  prval () = lemma_queue_param (que) in not (queue_is_empty (que))
end // end of [queue_isnot_empty]
Note that the negation of a value of the type [bool(N == 0)] for some N is of the type [bool(N != 0)]. In order to show that [N != 0] implies [N > 0], we need to prove that [N >= 0], which is achieved by calling [lemma_queue_param].

The functions [queue_insert_atend] and [queue_takeout_atbeg] are implemented as follows:

queue_insert_atend (que, x) = let
val+@QUEUE (f, r) = que
val () = r := sllist_cons (x, r)
prval () = fold@ (que)
  // nothing
end // end of [queue_insert_atend]

queue_takeout_atbeg (que) = let
  val+@QUEUE (f, r) = que
val () = lemma_sllist_param (f)
val () = lemma_sllist_param (r)
val iscons = sllist_is_cons (f)
if iscons then let
  val x = sllist_uncons (f)
  prval () = fold@ (que)
end else let
  prval () = sllist_free_nil (f)
  val () = f := sllist_reverse (r)
  val () = r := sllist_nil {a} ()
  val x = sllist_uncons (f)
  prval () = fold@ (que)
end // end of [if]
end // end of [queue_takeout_atbeg]
By following the description given at the beginning of this section, one should find it straightforward to understand the code for enqueueing and dequeueing.

For a running implementation, please find in queue-sllist2.dats the entirety of the above presented code plus some code for testing.

This article is written by Hongwei Xi.