Punctuated Data Streams - Purge Invariants

Stateful operators can use punctuation to identify state that is no longer needed. Since operators are able to discard state earlier than they normally would be able to, they are able to handle more data, making them more suitable for data stream processing. A purge invariant will be defined for every input for the operator, since state might be maintained for each input. Formally, purge invariants take the form Ts' = PurgeN(op, Ts1, Ps1, , Tsn, Psn), where:
 N is the position of the operator's input that we are purging state for
 op is the operator along with any arguments
 Tsi is the set of tuples that have arrived from the i-th input
 Psi is the set of punctuation that has arrived from the i-th input
 Ts' is the set of tuples that corresponds to state maintained for that input that can be discarded
A purge invariant for an operator can assume that the pass invariant for that operator has been evaluated and all possible tuples have been passed on.

Certain query operators already purge some state. The purge invariants we discuss are in addition to purging that normally occurs. For operators that do not accumulate state, the purge invariant is trivial, returning the empty set. We explore two operators: Sort and Symmetric Hash Join.


The pass invariants for the sort operator state that it can return results early if punctuation defines a range at the beginning of the next sorted subset. Once those tuples have been passed on, there is no reason to keep the state associated with them, so it can be discarded. Thus, the purge function for sort is the pass function:
Purge(SortA, Ts, Ps) = Pass(SortA, Ts, Ps)

Symmetric Hash Join

The traditional hash join algorithm is implemented in two phases. During the first phase, all tuples are read from one of the two inputs into an in-memory hash table. During the second phase, each tuple t from the second input is hashed, and compared to tuples that hash to the same value from the first input. Each tuple that matches t on the join attribute is combined with t and output. As we have discussed, blocking on one input until it has been read entirely, as hash join does on its first input, is inappropriate for processing unbounded data streams.

Symmetric hash join (Wilschut et al.) is an enhancement to the traditional hash join that outputs results as it reads from both inputs. It maintains a hash table for each input. Tuples from each input are cached in the corresponding hash table, and then matched with tuples in the other hash table. For each match, the joined tuple is passed on. This algorithm has the desirable property that it returns results while it is reading data. However, it accumulates a lot of state in its hash tables.

We can enhance this operator using punctuations to discard some of its state. The operator can use punctuation from one input to match tuples in the other hash table to find tuples that will not be joined anymore, which can be discarded.

Formally, the purge invariants for the symmetric hash join are:

 Purge1(JoinA, Ts1, Ps1, Ts2, Ps2) = {t1ÎTs1 | "t2, joinable(A,t1,t2) Þ SetMatch(t2,Ps2) }
 Purge2(JoinA, Ts1, Ps1, Ts2, Ps2) = {t2ÎTs2 | "t1, joinable(A,t1,t2) Þ SetMatch(t1,Ps1) }
Each purge invariant takes the join operator with the join attributes A, and considers tuples from its respective input. Focusing on Purge1(), a tuple t1 can be discarded from the first input if any other tuple t2 that could be joined with t1 on A matches punctuation from the second input.

Last modified by Pete Tucker on 26 August 2005.