1 /**
2 Copyright: Copyright (c) 2018 Frank McSherry
3 License: MIT
4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Port of DataFrog to D.
7 
8 A lightweight Datalog engine in Rust
9 
10 The intended design is that one has static `Relation` types that are sets of
11 tuples, and `Variable` types that represent monotonically increasing sets of
12 tuples.
13 
14 The types are mostly wrappers around `Vec<Tuple>` indicating sorted-ness, and
15 the intent is that this code can be dropped in the middle of an otherwise
16 normal Rust program, run to completion, and then the results extracted as
17 vectors again.
18 */
19 module datacat;
20 
21 import logger = std.experimental.logger;
22 import std.algorithm;
23 import std.array;
24 import std.ascii;
25 import std.conv;
26 import std.file;
27 import std.path;
28 import std.process;
29 import std.range;
30 import std.stdio;
31 import std..string;
32 import std.traits;
33 import std.typecons : Tuple;
34 
35 public import std.typecons : tuple;
36 
37 public import datacat.join;
38 public import datacat.range;
39 
40 version (unittest) import unit_threaded;
41 
42 alias KVTuple(T, V) = Tuple!(T, "key", V, "value");
43 
44 /// Convenient function for creating a key/value tuple.
45 auto kvTuple(K, V)(auto ref K k, auto ref V v) {
46     import std.typecons : tuple;
47 
48     return tuple!("key", "value")(k, v);
49 }
50 
51 /// ditto
52 auto kvTuple(K)(auto ref K k) {
53     import std.typecons : tuple;
54 
55     return tuple!("key")(k);
56 }
57 
58 /// A static, ordered list of key-value pairs.
59 ///
60 /// A relation represents a fixed set of key-value pairs. In many places in a
61 /// Datalog computation we want to be sure that certain relations are not able
62 /// to vary (for example, in antijoins).
63 struct Relation(TupleT) {
64     /// Convenient alias to retrieve the tuple type.
65     alias TT = TupleT;
66 
67     /// Sorted list of distinct tuples.
68     TupleT[] elements;
69     alias elements this;
70 
71     static auto from(T)(T values) {
72         import std.algorithm : map;
73 
74         static if (hasKeyValueFields!TupleT) {
75             return Relation!(TupleT)(values.map!(a => TupleT(a[0], a[1])));
76         } else static if (hasKeyField!TupleT) {
77             return Relation!(TupleT)(values.map!(a => TupleT(a)));
78         } else {
79             static assert(0,
80                     "Mismatch between Relations (" ~ TupleT.stringof
81                     ~ ") and provided type " ~ T.stringof);
82         }
83     }
84 
85     this(T)(T other) if (isInputRange!T && is(ElementType!T == TupleT)) {
86         import std.array : appender;
87 
88         auto app = appender!(TupleT[])();
89         other.copy(app);
90         sort(app.data);
91 
92         elements.length = app.data.length;
93         elements.length -= app.data.uniq.copy(elements).length;
94     }
95 
96     this(T)(const(T)[] other) if (is(T == TupleT)) {
97         import std.array : appender;
98 
99         auto app = appender!(TupleT[])();
100         app.reserve(other.length);
101         other.copy(app);
102         sort(app.data);
103 
104         elements.length -= app.data.uniq.copy(elements).length;
105     }
106 
107     /// Merges two relations into their union.
108     // TODO: compare the performance to the rust implementation
109     auto merge(T)(T other)
110             if (hasMember!(T, "elements") && is(ElementType!(typeof(other.elements)) == TupleT)) {
111         // If one of the element lists is zero-length, we don't need to do any work
112         if (elements.length == 0) {
113             elements = other.elements;
114             return this;
115         } else if (other.elements.length == 0) {
116             return this;
117         }
118 
119         const len = elements.length;
120         elements ~= other.elements;
121         completeSort(assumeSorted(elements[0 .. len]), elements[len .. $]);
122         elements.length -= elements.uniq.copy(elements).length;
123         return this;
124     }
125 
126     bool empty() const {
127         return elements.length == 0;
128     }
129 
130     void clear() {
131         elements = null;
132     }
133 
134     import std.range : isOutputRange;
135     import std.format : FormatSpec;
136 
137     string toString() {
138         import std.exception : assumeUnique;
139         import std.format : FormatSpec;
140 
141         char[] buf;
142         buf.reserve(100);
143         auto fmt = FormatSpec!char("%s");
144         toString((const(char)[] s) { buf ~= s; }, fmt);
145         auto trustedUnique(T)(T t) @trusted {
146             return assumeUnique(t);
147         }
148 
149         return trustedUnique(buf);
150     }
151 
152     void toString(Writer, Char)(scope Writer w, FormatSpec!Char fmt) const {
153         import std.format : formattedWrite;
154         import std.range.primitives : put;
155 
156         put(w, "[");
157         foreach (e; elements) {
158             static if (__traits(hasMember, TT, "value"))
159                 formattedWrite(w, "[%s,%s], ", e.key, e.value);
160             else
161                 formattedWrite(w, "[%s], ", e.key);
162         }
163         put(w, "]");
164     }
165 }
166 
167 /// Create a Relation type with a tuple of the provided types (`Args`).
168 template relation(Args...) {
169     import std.typecons : Tuple;
170     import std.variant : Variant;
171 
172     static if (Args.length == 1) {
173         alias relation = Relation!(Tuple!(Args[0], "key"));
174     } else static if (Args.length == 2) {
175         alias relation = Relation!(Tuple!(Args[0], "key", Args[1], "value"));
176     } else {
177         import std.conv : to;
178 
179         static assert(0, "1 or 2 parameters required. " ~ Args.length.to!string ~ " provided");
180     }
181 }
182 
183 @("shall merge two relations")
184 unittest {
185     auto a = relation!(int, int).from([[1, 0], [2, -1], [5, -20]]);
186     auto b = relation!(int, int).from([[3, 0], [5, -10], [7, -20]]);
187 
188     a.merge(b);
189     a.should == [tuple(1, 0), tuple(2, -1), tuple(3, 0), tuple(5, -20),
190         tuple(5, -10), tuple(7, -20)];
191 }
192 
193 @("shall create a sorted relation from unsorted elements")
194 unittest {
195     auto a = relation!(int, int).from([[3, -1], [2, -3], [5, -24]]);
196     a.should == [tuple(2, -3), tuple(3, -1), tuple(5, -24)];
197 }
198 
199 /// True iff `T` has a key field.
200 package enum hasKeyField(T) = hasMember!(T, "key");
201 
202 /// True iff `T` has a value field.
203 package enum hasValueField(T) = hasMember!(T, "value");
204 
205 /// True iff `T` has the fields key and value.
206 package enum hasKeyValueFields(T) = hasKeyField!T && hasValueField!T;
207 
208 /// True iff `T0` and `T1` keys are the same type.
209 package enum isSameKeyType(T0, T1) = hasKeyField!T0 && hasKeyField!T1
210         && is(typeof(T0.key) == typeof(T1.key));
211 
212 @("shall check that the keys are the same type")
213 unittest {
214     auto a0 = kvTuple(1, "a0");
215     auto a1 = kvTuple(2, "a1");
216 
217     static if (!isSameKeyType!(typeof(a0), typeof(a1)))
218         static assert(0, "isSameKeyType: expected to pass because the key types are the same");
219 
220     auto b0 = kvTuple(3, "b0");
221     auto b1 = kvTuple(1.1, "b1");
222 
223     static if (isSameKeyType!(typeof(b0), typeof(b1)))
224         static assert(0, "isSameKeyType: expected to fail because the key types are different");
225 }
226 
227 /// A type that can report on whether it has changed.
228 /// changed = Reports whether the variable has changed since it was last asked.
229 package enum isVariable(T) = is(T : VariableTrait);
230 
231 /// An iterative context for recursive evaluation.
232 ///
233 /// An `Iteration` tracks monotonic variables, and monitors their progress.
234 /// It can inform the user if they have ceased changing, at which point the
235 /// computation should be done.
236 struct Iteration {
237     VariableTrait[] variables;
238 
239     /// Reports whether any of the monitored variables have changed since
240     /// the most recent call.
241     bool changed() {
242         bool r = false;
243         foreach (a; variables) {
244             if (a.changed)
245                 r = true;
246         }
247         return r;
248         //TODO why didnt this work?
249         //return variables.reduce!((a, b) => a.changed || b.changed);
250     }
251 
252     /// Creates a new named variable associated with the iterative context.
253     scope auto variable(T0, T1)(string s) {
254         import std.typecons : Tuple;
255 
256         auto v = new Variable!(Tuple!(T0, "key", T1, "value"))(s);
257         variables ~= v;
258         return v;
259     }
260 
261     /// Creates a new named variable associated with the iterative context.
262     ///
263     /// This variable will not be maintained distinctly, and may advertise tuples as
264     /// recent multiple times (perhaps unbounded many times).
265     scope auto variableInDistinct(T0, T1)(string s) {
266         auto v = this.variable!(T0, T1)(s);
267         v.distinct = false;
268         return v;
269     }
270 
271     /// Returns: a range that continue until all variables stop changing.
272     IteratorRange!(typeof(this)) range() {
273         return typeof(return)(&this);
274     }
275 }
276 
277 /// A type that has a key and value member.
278 enum isTuple(T) = hasMember!(T, "key") && hasMember!(T, "value");
279 
280 /// A type that can report on whether it has changed.
281 interface VariableTrait {
282     /// Reports whether the variable has changed since it was last asked.
283     bool changed();
284 }
285 
286 /// An monotonically increasing set of `Tuple`s.
287 ///
288 /// There are three stages in the lifecycle of a tuple:
289 ///
290 ///   1. A tuple is added to `this.toAdd`, but is not yet visible externally.
291 ///   2. Newly added tuples are then promoted to `this.recent` for one iteration.
292 ///   3. After one iteration, recent tuples are moved to `this.stable` for posterity.
293 ///
294 /// Each time `this.changed()` is called, the `recent` relation is folded into `stable`,
295 /// and the `toAdd` relations are merged, potentially deduplicated against `stable`, and
296 /// then made  `recent`. This way, across calls to `changed()` all added tuples are in
297 /// `recent` at least once and eventually all are in `stable`.
298 ///
299 /// A `Variable` may optionally be instructed not to de-duplicate its tuples, for reasons
300 /// of performance. Such a variable cannot be relied on to terminate iterative computation,
301 /// and it is important that any cycle of derivations have at least one de-duplicating
302 /// variable on it.
303 /// TODO: tuple should be constrainted to something with Key/Value.
304 final class Variable(TupleT) : VariableTrait if (isTuple!TupleT) {
305     import std.range : isInputRange, ElementType, isOutputRange;
306 
307     /// Convenient alias to retrieve the tuple type.
308     alias TT = TupleT;
309 
310     alias This = typeof(this);
311 
312     version (unittest) {
313         /// Used for testing purpose to ensure both paths produce the same result.
314         bool forceFastPath;
315     }
316 
317     /// Should the variable be maintained distinctly.
318     bool distinct = true;
319 
320     /// A useful name for the variable.
321     string name;
322 
323     /// A list of relations whose union are the accepted tuples.
324     Relation!TupleT[] stable;
325 
326     /// A list of recent tuples, still to be processed.
327     Relation!TupleT recent;
328 
329     /// A list of future tuples, to be introduced.
330     Relation!TupleT[] toAdd;
331 
332     this() {
333     }
334 
335     this(string name) {
336         this.name = name;
337     }
338 
339     /// Adds tuples that result from joining `input1` and `input2`.
340     ///
341     /// # Examples
342     ///
343     /// This example starts a collection with the pairs (x, x+1) and (x+1, x) for x in 0 .. 10.
344     /// It then adds pairs (y, z) for which (x, y) and (x, z) are present. Because the initial
345     /// pairs are symmetric, this should result in all pairs (x, y) for x and y in 0 .. 11.
346     ///
347     /// ```
348     /// use datafrog::{Iteration, Relation};
349     ///
350     /// let mut iteration = Iteration::new();
351     /// let variable = iteration.variable::<(usize, usize)>("source");
352     /// variable.insert(Relation::from((0 .. 10).map(|x| (x, x + 1))));
353     /// variable.insert(Relation::from((0 .. 10).map(|x| (x + 1, x))));
354     ///
355     /// while iteration.changed() {
356     ///     variable.from_join(&variable, &variable, |&key, &val1, &val2| (val1, val2));
357     /// }
358     ///
359     /// let result = variable.complete();
360     /// assert_eq!(result.len(), 121);
361     /// ```
362     void fromJoin(alias Fn, Input1T, Input2T)(Input1T input1, Input2T input2) {
363         import datacat.join;
364 
365         join!Fn(input1, input2, this);
366     }
367 
368     /// Adds tuples from `input1` whose key is not present in `input2`.
369     ///
370     /// # Examples
371     ///
372     /// This example starts a collection with the pairs (x, x+1) for x in 0 .. 10. It then
373     /// adds any pairs (x+1,x) for which x is not a multiple of three. That excludes four
374     /// pairs (for 0, 3, 6, and 9) which should leave us with 16 total pairs.
375     ///
376     /// ```
377     /// use datafrog::{Iteration, Relation};
378     ///
379     /// let mut iteration = Iteration::new();
380     /// let variable = iteration.variable::<(usize, usize)>("source");
381     /// variable.insert(Relation::from((0 .. 10).map(|x| (x, x + 1))));
382     ///
383     /// let relation = Relation::from((0 .. 10).filter(|x| x % 3 == 0));
384     ///
385     /// while iteration.changed() {
386     ///     variable.from_antijoin(&variable, &relation, |&key, &val| (val, key));
387     /// }
388     ///
389     /// let result = variable.complete();
390     /// assert_eq!(result.len(), 16);
391     /// ```
392     void fromAntiJoin(alias Fn, Input1T, Input2T)(Input1T input1, Input2T input2) {
393         import datacat.join;
394 
395         antiJoin!Fn(input1, input2, this);
396     }
397 
398     /// Adds tuples that result from mapping `input`.
399     ///
400     /// # Examples
401     ///
402     /// This example starts a collection with the pairs (x, x) for x in 0 .. 10. It then
403     /// repeatedly adds any pairs (x, z) for (x, y) in the collection, where z is the Collatz
404     /// step for y: it is y/2 if y is even, and 3*y + 1 if y is odd. This produces all of the
405     /// pairs (x, y) where x visits y as part of its Collatz journey.
406     ///
407     /// ```
408     /// use datafrog::{Iteration, Relation};
409     ///
410     /// let mut iteration = Iteration::new();
411     /// let variable = iteration.variable::<(usize, usize)>("source");
412     /// variable.insert(Relation::from((0 .. 10).map(|x| (x, x))));
413     ///
414     /// while iteration.changed() {
415     ///     variable.from_map(&variable, |&(key, val)|
416     ///         if val % 2 == 0 {
417     ///             (key, val/2)
418     ///         }
419     ///         else {
420     ///             (key, 3*val + 1)
421     ///         });
422     /// }
423     ///
424     /// let result = variable.complete();
425     /// assert_eq!(result.len(), 74);
426     /// ```
427     void fromMap(alias Fn, Input1T)(Input1T input) {
428         import datacat.map;
429 
430         mapInto!Fn(input, this);
431     }
432 
433     /// Inserts a relation into the variable.
434     ///
435     /// This is most commonly used to load initial values into a variable.
436     /// it is not obvious that it should be commonly used otherwise, but
437     /// it should not be harmful.
438     void insert(Relation!TupleT relation) {
439         if (!relation.empty) {
440             toAdd ~= relation;
441         }
442     }
443 
444     /// ditto
445     void insert(TupleT[] relation) {
446         if (relation.length != 0) {
447             toAdd ~= Relation!TupleT(relation);
448         }
449     }
450 
451     /// ditto
452     void insert(T)(T relation) if (isInputRange!T && is(ElementType!T == TupleT)) {
453         if (relation.length != 0) {
454             toAdd ~= Relation!TupleT(relation);
455         }
456     }
457 
458     /// Consumes the variable and returns a relation.
459     ///
460     /// This method removes the ability for the variable to develop, and
461     /// flattens all internal tuples down to one relation. The method
462     /// asserts that iteration has completed, in that `self.recent` and
463     /// `self.to_add` should both be empty.
464     Relation!TupleT complete() {
465         assert(recent.empty);
466         assert(toAdd.empty);
467 
468         typeof(return) result;
469         foreach (batch; stable) {
470             result.merge(batch);
471         }
472         stable = null;
473 
474         return result;
475     }
476 
477     override bool changed() {
478         import std.array : popBack, back, appender;
479 
480         // 1. Merge self.recent into self.stable.
481         if (!recent.empty) {
482             while (!stable.empty && stable.back.length <= 2 * recent.length) {
483                 auto last = stable[$ - 1];
484                 stable.popBack;
485                 recent.merge(last);
486             }
487             stable ~= recent;
488             recent.clear;
489         }
490 
491         if (toAdd.empty)
492             return false;
493 
494         // 2. Move this.toAdd into this.recent.
495         auto to_add = () { auto a = toAdd[$ - 1]; toAdd.popBack; return a; }();
496 
497         // 2b. merge the rest of this.toAdd into to_add
498         foreach (to_add_more; toAdd) {
499             to_add.merge(to_add_more);
500         }
501         toAdd = null;
502 
503         // 2c. Restrict `to_add` to tuples not in `self.stable`.
504         if (distinct) {
505             foreach (batch; stable) {
506                 auto retained = appender!(TupleT[])();
507                 void fastPath() {
508                     foreach (elem; to_add) {
509                         import datacat.join : gallop;
510 
511                         batch = batch.gallop!(y => y < elem);
512                         if (batch.length == 0 || batch[0] != elem)
513                             retained.put(elem);
514                     }
515                 }
516 
517                 void slowPath() {
518                     version (unittest) {
519                         if (forceFastPath) {
520                             fastPath;
521                             return;
522                         }
523                     }
524 
525                     foreach (elem; to_add) {
526                         while (batch.length > 0 && batch[0] < elem) {
527                             batch = batch[1 .. $];
528                         }
529                         if (batch.length == 0 || batch[0] != elem)
530                             retained.put(elem);
531                     }
532                 }
533 
534                 if (batch.length > 4 * to_add.length) {
535                     fastPath;
536                 } else {
537                     slowPath;
538                 }
539                 to_add = retained.data;
540             }
541         }
542         recent = to_add;
543 
544         return !recent.empty;
545     }
546 
547     /// Returns: total elements in stable.
548     size_t countStable() {
549         return stable.map!"a.length".sum;
550     }
551 
552     void toString(Writer)(ref Writer w) if (isOutputRange!(Writer, char)) {
553         import std.format : formattedWrite;
554 
555         if (!name.empty)
556             formattedWrite(w, "name:%s ", name);
557         formattedWrite(w, "distinc:%s stable:%s recent:%s toAdd:%s", distinct,
558                 stable, recent, toAdd);
559     }
560 }
561 
562 /// Create a Variable type with a tuple of the provided types (`Args`).
563 template Variable(Args...) {
564     import std.typecons : Tuple;
565     import std.variant : Variant;
566 
567     static if (Args.length == 1) {
568         alias Variable = Variable!(Tuple!(Args[0], "key"));
569     } else static if (Args.length == 2) {
570         alias Variable = Variable!(Tuple!(Args[0], "key", Args[1], "value"));
571     } else {
572         import std.conv : to;
573 
574         static assert(0, "1 or 2 parameters required. " ~ Args.length.to!string ~ " provided");
575     }
576 }
577 
578 @("shall complete a variable")
579 unittest {
580     // arrange
581     auto a = new Variable!(int, int);
582     a.insert(relation!(int, int).from([[1, 10], [5, 51]]));
583     a.insert(relation!(int, int).from([[1, 10], [5, 52]]));
584 
585     // act
586     while (a.changed) {
587     }
588 
589     // assert
590     a.complete.should == [kvTuple(1, 10), kvTuple(5, 51), kvTuple(5, 52)];
591 }
592 
593 @("shall progress a variable by moving newly added to the recent state")
594 unittest {
595     // arrange
596     auto a = new Variable!(int, int);
597     a.insert(relation!(int, int).from([[1, 10], [2, 20], [5, 50]]));
598     a.toAdd.empty.should == false;
599     a.recent.empty.should == true;
600     a.stable.empty.should == true;
601 
602     // act
603     a.changed.shouldBeTrue;
604 
605     // assert
606     a.toAdd.empty.should == true;
607     a.recent.empty.should == false;
608     a.stable.empty.should == true;
609 }
610 
611 @("shall progress from toAdd to stable after two `changed`")
612 unittest {
613     // arrange
614     auto a = new Variable!(int, int);
615     a.insert(relation!(int, int).from([[1, 10], [2, 20], [5, 50]]));
616 
617     // act
618     a.changed.shouldBeTrue;
619     a.changed.shouldBeFalse;
620 
621     // assert
622     a.toAdd.empty.should == true;
623     a.recent.empty.should == true;
624     a.stable.empty.should == false;
625 }
626 
627 @("shall join two variables to produce all pairs (x,y) in the provided range")
628 unittest {
629     import std.algorithm : map;
630     import std.range : iota;
631 
632     // arrange
633     Iteration iter;
634     auto variable = iter.variable!(int, int)("source");
635     variable.insert(iota(3).map!(x => kvTuple(x, x + 1)));
636     // [[0,1],[1,2],[2,3],]
637     variable.insert(iota(3).map!(x => kvTuple(x + 1, x)));
638     // [[1,0],[2,1],[3,2],]
639 
640     // act
641     while (iter.changed) {
642         static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) {
643             return kvTuple(v1, v2);
644         }
645 
646         variable.fromJoin!(helper)(variable, variable);
647     }
648 
649     auto result = variable.complete;
650 
651     // assert
652     result.should == [[0, 0], [0, 1], [0, 2], [0, 3], [1, 0], [1, 1], [1, 2],
653         [1, 3], [2, 0], [2, 1], [2, 2], [2, 3], [3, 0], [3, 1], [3, 2], [3, 3]].map!(
654             a => kvTuple(a[0], a[1]));
655 }
656 
657 @("shall anti-join two variables to produce only those pairs that are not multiples of 3")
658 unittest {
659     import std.algorithm : map, filter;
660     import std.range : iota;
661 
662     // arrange
663     Iteration iter;
664     auto variable = iter.variable!(int, int)("source");
665     variable.insert(iota(10).map!(x => kvTuple(x, x + 1)));
666     auto relation_ = relation!(int).from(iota(10).filter!(x => x % 3 == 0)
667             .map!kvTuple);
668 
669     // act
670     while (iter.changed) {
671         static auto helper(T0, T1)(T0 k, T1 v) {
672             return kvTuple(v, k);
673         }
674 
675         variable.fromAntiJoin!(helper)(variable, relation_);
676     }
677 
678     auto result = variable.complete;
679 
680     // assert
681     result.length.should == 16;
682     result.should == [[0, 1], [1, 2], [2, 1], [2, 3], [3, 2], [3, 4], [4, 5],
683         [5, 4], [5, 6], [6, 5], [6, 7], [7, 8], [8, 7], [8, 9], [9, 8], [9, 10],].map!(
684             a => kvTuple(a[0], a[1]));
685 }
686 
687 @("shall be the tuples that result from applying a function on the input")
688 unittest {
689     import std.algorithm : map, filter;
690     import std.range : iota;
691 
692     // arrange
693     Iteration iter;
694     auto variable = iter.variable!(int, int)("source");
695     variable.insert(iota(10).map!(x => kvTuple(x, x)));
696 
697     // act
698     while (iter.changed) {
699         static auto helper(KV)(KV a) {
700             if (a.value % 2 == 0)
701                 return kvTuple(a.key, a.value / 2);
702             else
703                 return kvTuple(a.key, 3 * a.value + 1);
704         }
705 
706         variable.fromMap!(helper)(variable);
707     }
708 
709     auto result = variable.complete;
710 
711     // assert
712     result.length.should == 74;
713 }
714 
715 @("shall be chunks in stable that have a size about 2x of recent")
716 unittest {
717     import std.algorithm : map, count;
718     import std.range : iota;
719 
720     // arrange
721     Iteration iter;
722     auto variable = iter.variable!(int, int)("source");
723     variable.insert(iota(10).map!(x => kvTuple(x, x + 1)));
724     variable.insert(iota(10).map!(x => kvTuple(x + 1, x)));
725 
726     // act
727     while (iter.changed) {
728         static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) {
729             return kvTuple(v1, v2);
730         }
731 
732         variable.fromJoin!(helper)(variable, variable);
733     }
734 
735     // assert
736     variable.stable.map!(a => a.count).should == [91, 30];
737 }
738 
739 @("shall produce the same result between the fast and slow path when forcing distinct facts")
740 unittest {
741     import std.algorithm : map, count;
742     import std.range : iota;
743 
744     // arrange
745     Iteration iter;
746     auto fast = iter.variable!(int, int)("fast");
747     fast.forceFastPath = true;
748     auto slow = iter.variable!(int, int)("slow");
749     foreach (a; [fast, slow]) {
750         a.insert(iota(10).map!(x => kvTuple(x, x + 1)));
751         a.insert(iota(10).map!(x => kvTuple(x + 1, x)));
752     }
753 
754     // act
755     while (iter.changed) {
756         static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) {
757             return kvTuple(v1, v2);
758         }
759 
760         fast.fromJoin!(helper)(fast, fast);
761         slow.fromJoin!(helper)(slow, slow);
762     }
763 
764     // assert
765     fast.complete.should == slow.complete;
766 }
767 
768 @("shall count the elements in the nested arrays in stable")
769 unittest {
770     auto var = new Variable!(int, int);
771     var.stable ~= relation!(int, int)([kvTuple(3, 2), kvTuple(4, 2)]);
772     var.stable ~= relation!(int, int)([kvTuple(3, 2), kvTuple(4, 2)]);
773 
774     var.countStable.should == 4;
775 }