Punctuated Data Streams - Pass Invariants

Pass invariants define when a blocking operator can pass part of its result before all data has been received. Formally, pass invariants take the form
Ts = Pass(op, Ts1, Ps1, , Tsn, Psn), where:
 op is the operator along with any arguments
 Tsi is the set of tuples that have arrived from the i-th input
 Psi is the set of punctuation that has arrived from the i-th input
 Ts is the set of tuples that can be output from the operator.
For non-blocking operators, such as select, the pass invariant is trivial, simply op(Ts1, , Tsn). Note that pass invariants define all tuples so far that can be output. In practice, pass invariants are implemented incrementally, to calculate additional tuples based on each new tuple or punctuation received. We explore two operators: Set Difference and Sort.

Set Difference

If it computes all tuples from a relation A that are not found in relation B (A-B), then it must read all of B before outputting any results, so it can be sure which tuples from A should be in the result. Set difference can use punctuations, however, to know when it has seen enough tuples from B to begin to output tuples from A early.
Formally, the pass invariant for set difference is:
 Pass(SetDiff, Ts1, Ps1, Ts2, Ps2) = {t | t Î Ts1 Ù t Î Ts2 Ù SetMatch(t, Ps2) }
When the set difference operator receives a punctuation from its second input, it knows that no more tuples matching that pattern will be seen. The operator can then release any tuples that it has received from its first input that match the punctuation and are not in the set of tuples from the second input. Using punctuations, the set difference operator no longer has to wait for all tuples from its second input before outputting results.


Since punctuations tell an operator what values have been seen, the sort operator can use them to know whether it has enough input to return a correct prefix of its results. The punctuation must correlate with the sort attributes, which we denote as A below:
Formally, the pass invariant for sort is:
 Pass(SortA, Ts, Ps) = Sort{ t Î Ts | SetMatch(t, Initial(SortA, Ps)) }
  Initial(SortA, Ps) = maximal contiguous subset Ps' of Ps |
    ¬$ t1, t2 where (SetMatch(t1, Ps') Ù ¬SetMatch(t2, Ps') Ù (t2 <A t1)) }
The first argument to Pass is the sort operator with its list of sort attributes A. It finds all tuples that match the punctuation set returned by Initial. These tuples are the prefix of the final sorted output, and can be returned from the sort operator. Initial looks at the set of punctuations, and returns a subset of punctuations, defined so that no tuple can be later received that should appear before tuples that match the punctuation. Put another way, the tuples that match the subset of punctuations are the prefix of the complete sorted result.

Last modified by Pete Tucker on 26 August 2005.