1 /**
2 Copyright: Copyright (c) 2018 Frank McSherry
3 License: APACHE-2.0
4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Port of DataFrog (Datalog engine in Rust) to D.
7 
8 The intended design is that one has static `Relation` types that are sets of
9 tuples, and `Variable` types that represent monotonically increasing sets of
10 tuples.
11 
12 The types are mostly wrappers around `Vec<Tuple>` indicating sorted-ness, and
13 the intent is that this code can be dropped in the middle of an otherwise
14 normal Rust program, run to completion, and then the results extracted as
15 vectors again.
16 */
17 module datacat;
18 
19 import logger = std.experimental.logger;
20 import std.traits : hasMember;
21 import std.typecons : Tuple;
22 import std.parallelism : TaskPool, taskPool;
23 
24 public import datacat.join : fromJoin, fromAntiJoin;
25 public import datacat.map : fromMap;
26 public import datacat.range;
27 
28 version (unittest) {
29     import std.typecons : tuple;
30     import unit_threaded;
31 }
32 
33 @safe:
34 
35 alias KVTuple(K, V) = Tuple!(K, "key", V, "value");
36 alias KVTuple(K) = Tuple!(K, "key");
37 
38 /// Convenient function for creating a key/value tuple.
39 auto kvTuple(K, V)(auto ref K k, auto ref V v) {
40     return KVTuple!(K, V)(k, v);
41 }
42 
43 /// ditto
44 auto kvTuple(K)(auto ref K k) {
45     return KVTuple!(K)(k);
46 }
47 
48 /// A static, ordered list of key-value pairs.
49 ///
50 /// A relation represents a fixed set of key-value pairs. In many places in a
51 /// Datalog computation we want to be sure that certain relations are not able
52 /// to vary (for example, in antijoins).
53 struct Relation(TupleT) {
54     import std.range : isInputRange, ElementType;
55 
56     /// Convenient alias to retrieve the tuple type.
57     alias TT = TupleT;
58 
59     /// Sorted list of distinct tuples.
60     TupleT[] elements;
61     alias elements this;
62 
63     /** Convenient function to create a `Relation` from an array containing
64      * values with the length 2.
65      *
66      * Returns: a `Relation`.
67      */
68     static auto from(T, ThreadStrategy TS = ThreadStrategy.single)(T values) {
69         import std.algorithm : map;
70 
71         static if (hasKeyValueFields!TupleT) {
72             return Relation!(TupleT)(values.map!(a => TupleT(a[0], a[1])));
73         } else static if (hasKeyField!TupleT) {
74             return Relation!(TupleT)(values.map!(a => TupleT(a)));
75         } else {
76             static assert(0,
77                     "Mismatch between Relations (" ~ TupleT.stringof
78                     ~ ") and provided type " ~ T.stringof);
79         }
80     }
81 
82     /** Create an instance.
83      *
84      * If the input is sorted no further sorting is done.
85      *
86      * Params:
87      *  other = the source to pull elements from.
88      */
89     this(ThreadStrategy TS = ThreadStrategy.single, T, ARGS...)(T other, auto ref ARGS args)
90             if (isInputRange!T && is(ElementType!T == TupleT)
91                 && (TS == ThreadStrategy.parallel && ARGS.length == 1
92                 && is(ARGS[0] == TaskPool) || TS == ThreadStrategy.single)) {
93         import std.algorithm : copy, sort, until, uniq;
94         import std.array : appender;
95         import std.range : SortedRange, hasLength;
96 
97         auto app = appender!(TupleT[])();
98 
99         static if (hasLength!T)
100             app.reserve(other.length);
101 
102         other.copy(app);
103 
104         static if (is(T : SortedRange!U, U)) {
105             // do nothing
106         } else static if (TS == ThreadStrategy.parallel) {
107             // code copied from std.parallelism
108             static void parallelSort(T)(T[] data, TaskPool pool) @safe {
109                 import std.parallelism : task;
110                 import std.algorithm : swap, partition;
111 
112                 // Sort small subarrays serially.
113                 if (data.length < 100) {
114                     static import std.algorithm;
115 
116                     std.algorithm.sort(data);
117                     return;
118                 }
119 
120                 // Partition the array.
121                 swap(data[$ / 2], data[$ - 1]);
122                 auto pivot = data[$ - 1];
123                 bool lessThanPivot(T elem) {
124                     return elem < pivot;
125                 }
126 
127                 auto greaterEqual = partition!lessThanPivot(data[0 .. $ - 1]);
128                 swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
129 
130                 auto less = data[0 .. $ - greaterEqual.length - 1];
131                 greaterEqual = data[$ - greaterEqual.length .. $];
132 
133                 // Execute both recursion branches in parallel.
134                 auto recurseTask = task!parallelSort(greaterEqual, pool);
135                 pool.put(recurseTask);
136                 parallelSort(less, pool);
137                 recurseTask.yieldForce;
138             }
139 
140             TaskPool pool = args[0];
141             parallelSort(app.data, pool);
142         } else {
143             sort(app.data);
144         }
145 
146         elements.length = app.data.length;
147         elements.length -= app.data.uniq.copy(elements).length;
148     }
149 
150     /// Merges two relations into their union.
151     auto merge(T)(T other)
152             if (hasMember!(T, "elements") && is(ElementType!(typeof(other.elements)) == TupleT)) {
153         import std.algorithm : until;
154         import std.array : appender, empty, popFront, front;
155 
156         // If one of the element lists is zero-length, we don't need to do any work
157         if (elements.length == 0) {
158             elements = other.elements;
159             return this;
160         } else if (other.elements.length == 0) {
161             return this;
162         }
163 
164         auto elements2 = other.elements;
165 
166         // Make sure that elements starts with the lower element
167         if (elements[0] > elements2[0]) {
168             elements2 = elements;
169             elements = other.elements;
170         }
171 
172         // Fast path for when all the new elements are after the exiting ones
173         if (elements[$ - 1] < elements2[0]) {
174             elements ~= elements2;
175             return this;
176         }
177 
178         const len = elements.length;
179 
180         auto elem = appender!(TupleT[])();
181         elem.reserve(len + elements2.length);
182 
183         elem.put(elements[0]);
184         elements.popFront;
185         if (elem.data[0] == elements2[0]) {
186             elements2.popFront;
187         }
188 
189         foreach (e; elements) {
190             foreach (e2; elements2[].until!(a => a >= e)) {
191                 elem.put(e2);
192                 elements2.popFront;
193             }
194             foreach (_; elements2[].until!(a => a > e))
195                 elements2.popFront;
196             elem.put(e);
197         }
198 
199         // Finish draining second list
200         foreach (e; elements2)
201             elem.put(e);
202 
203         elements = elem.data;
204         return this;
205     }
206 
207     bool empty() const {
208         return elements.length == 0;
209     }
210 
211     void clear() {
212         elements = null;
213     }
214 
215     import std.range : isOutputRange;
216     import std.format : FormatSpec;
217 
218     string toString() {
219         import std.exception : assumeUnique;
220         import std.format : FormatSpec;
221 
222         char[] buf;
223         buf.reserve(100);
224         auto fmt = FormatSpec!char("%s");
225         toString((const(char)[] s) { buf ~= s; }, fmt);
226         auto trustedUnique(T)(T t) @trusted {
227             return assumeUnique(t);
228         }
229 
230         return trustedUnique(buf);
231     }
232 
233     void toString(Writer, Char)(scope Writer w, FormatSpec!Char fmt) const {
234         import std.format : formattedWrite;
235         import std.range.primitives : put;
236 
237         put(w, "[");
238         foreach (e; elements) {
239             static if (__traits(hasMember, TT, "value"))
240                 formattedWrite(w, "[%s,%s], ", e.key, e.value);
241             else
242                 formattedWrite(w, "[%s], ", e.key);
243         }
244         put(w, "]");
245     }
246 }
247 
248 /// Create a Relation type with a tuple of the provided types (`Args`).
249 template relation(Args...) {
250     import std.typecons : Tuple;
251     import std.variant : Variant;
252 
253     static if (Args.length == 1) {
254         alias relation = Relation!(Tuple!(Args[0], "key"));
255     } else static if (Args.length == 2) {
256         alias relation = Relation!(Tuple!(Args[0], "key", Args[1], "value"));
257     } else {
258         import std.conv : to;
259 
260         static assert(0, "1 or 2 parameters required. " ~ Args.length.to!string ~ " provided");
261     }
262 }
263 
264 @("shall create a Relation using the parallel sort strategy")
265 unittest {
266     import std.algorithm : map;
267 
268     Relation!(KVTuple!(int, int)) a;
269     a.__ctor!(ThreadStrategy.parallel)([kvTuple(1, 2)].map!"a", taskPool);
270 }
271 
272 @("shall merge two relations")
273 unittest {
274     auto a = relation!(int, int).from([[1, 0], [2, -1], [5, -20]]);
275     auto b = relation!(int, int).from([[3, 0], [5, -10], [7, -20]]);
276 
277     a.merge(b);
278     a.should == [tuple(1, 0), tuple(2, -1), tuple(3, 0), tuple(5, -20),
279         tuple(5, -10), tuple(7, -20)];
280 }
281 
282 @("shall create a sorted relation from unsorted elements")
283 unittest {
284     auto a = relation!(int, int).from([[3, -1], [2, -3], [5, -24]]);
285     a.should == [tuple(2, -3), tuple(3, -1), tuple(5, -24)];
286 }
287 
288 /// True iff `T` has a key field.
289 package enum hasKeyField(T) = hasMember!(T, "key");
290 
291 /// True iff `T` has a value field.
292 package enum hasValueField(T) = hasMember!(T, "value");
293 
294 /// True iff `T` has the fields key and value.
295 package enum hasKeyValueFields(T) = hasKeyField!T && hasValueField!T;
296 
297 /// True iff `T0` and `T1` keys are the same type.
298 package enum isSameKeyType(T0, T1) = hasKeyField!T0 && hasKeyField!T1
299         && is(typeof(T0.key) == typeof(T1.key));
300 
301 @("shall check that the keys are the same type")
302 unittest {
303     auto a0 = kvTuple(1, "a0");
304     auto a1 = kvTuple(2, "a1");
305 
306     static if (!isSameKeyType!(typeof(a0), typeof(a1)))
307         static assert(0, "isSameKeyType: expected to pass because the key types are the same");
308 
309     auto b0 = kvTuple(3, "b0");
310     auto b1 = kvTuple(1.1, "b1");
311 
312     static if (isSameKeyType!(typeof(b0), typeof(b1)))
313         static assert(0, "isSameKeyType: expected to fail because the key types are different");
314 }
315 
316 /// A type that can report on whether it has changed.
317 /// changed = Reports whether the variable has changed since it was last asked.
318 package enum isVariable(T) = is(T : VariableTrait);
319 
320 enum ThreadStrategy {
321     single,
322     parallel
323 }
324 
325 alias Iteration = IterationImpl!(ThreadStrategy.single);
326 alias ParallelIteration = IterationImpl!(ThreadStrategy.parallel);
327 
328 /** Make an `Iteration`.
329  *
330  * It affects all `Variable`s created through the `variable` method.
331  *
332  * Returns: a parallel `Iteration`
333  */
334 auto makeIteration(ThreadStrategy Kind)(TaskPool tpool = null) {
335     import std.parallelism : taskPool;
336 
337     static if (Kind == ThreadStrategy.parallel) {
338         return ParallelIteration(() {
339             if (tpool is null)
340                 return taskPool;
341             else
342                 return tpool;
343         }());
344     } else
345         return Iteration();
346 }
347 
348 /// An iterative context for recursive evaluation.
349 ///
350 /// An `Iteration` tracks monotonic variables, and monitors their progress.
351 /// It can inform the user if they have ceased changing, at which point the
352 /// computation should be done.
353 struct IterationImpl(ThreadStrategy Kind) {
354     static if (Kind == ThreadStrategy.parallel) {
355         TaskPool taskPool;
356         invariant {
357             assert(taskPool !is null, "the taskpool is required to be initialized for a parallel");
358         }
359     }
360 
361     VariableTrait[] variables;
362 
363     /// Reports whether any of the monitored variables have changed since
364     /// the most recent call.
365     bool changed() {
366         import std.algorithm : map;
367 
368         bool r = false;
369         foreach (a; variables.map!"a.changed") {
370             if (a)
371                 r = true;
372         }
373 
374         return r;
375         //TODO why didnt this work?
376         //return variables.reduce!((a, b) => a.changed || b.changed);
377     }
378 
379     /// Creates a new named variable associated with the iterative context.
380     scope auto variable(T0, T1)(string s) {
381         static if (Kind == ThreadStrategy.single) {
382             auto v = new Variable!(KVTuple!(T0, T1), Kind)(s);
383         } else {
384             auto v = new Variable!(KVTuple!(T0, T1), Kind)(s, taskPool);
385         }
386         variables ~= v;
387         return v;
388     }
389 
390     /// Creates a new named variable associated with the iterative context.
391     ///
392     /// This variable will not be maintained distinctly, and may advertise tuples as
393     /// recent multiple times (perhaps unbounded many times).
394     scope auto variableInDistinct(T0, T1)(string s) {
395         static if (Kind == ThreadStrategy.single) {
396             auto v = new Variable!(KVTuple!(T0, T1), Kind)(s);
397         } else {
398             auto v = new Variable!(KVTuple!(T0, T1), Kind)(s, taskPool);
399         }
400         v.distinct = false;
401         return v;
402     }
403 
404     /// Returns: a range that continue until all variables stop changing.
405     IteratorRange!(typeof(this)) range() {
406         return typeof(return)(&this);
407     }
408 }
409 
410 /// A type that has a key and value member.
411 enum isTuple(T) = hasMember!(T, "key") && hasMember!(T, "value");
412 
413 /// A type that can report on whether it has changed.
414 interface VariableTrait {
415     /// Reports whether the variable has changed since it was last asked.
416     bool changed();
417 }
418 
419 /// An monotonically increasing set of `Tuple`s.
420 ///
421 /// There are three stages in the lifecycle of a tuple:
422 ///
423 ///   1. A tuple is added to `this.toAdd`, but is not yet visible externally.
424 ///   2. Newly added tuples are then promoted to `this.recent` for one iteration.
425 ///   3. After one iteration, recent tuples are moved to `this.stable` for posterity.
426 ///
427 /// Each time `this.changed()` is called, the `recent` relation is folded into `stable`,
428 /// and the `toAdd` relations are merged, potentially deduplicated against `stable`, and
429 /// then made  `recent`. This way, across calls to `changed()` all added tuples are in
430 /// `recent` at least once and eventually all are in `stable`.
431 ///
432 /// A `Variable` may optionally be instructed not to de-duplicate its tuples, for reasons
433 /// of performance. Such a variable cannot be relied on to terminate iterative computation,
434 /// and it is important that any cycle of derivations have at least one de-duplicating
435 /// variable on it.
436 /// TODO: tuple should be constrainted to something with Key/Value.
437 // dfmt off
438 final class Variable(TupleT, ThreadStrategy TS = ThreadStrategy.single) : VariableTrait if (isTuple!TupleT) {
439     import std.range : isInputRange, ElementType, isOutputRange;
440 
441     static if (TS == ThreadStrategy.parallel)
442         TaskPool taskPool;
443 
444     /// Convenient aliases to retrieve properties about the variable
445     alias TT = TupleT;
446     alias ThisTS = TS;
447 
448     version (unittest) {
449         /// Used for testing purpose to ensure both paths produce the same result.
450         bool forceFastPath;
451     }
452 
453     /// Should the variable be maintained distinctly.
454     bool distinct = true;
455 
456     /// A useful name for the variable.
457     string name;
458 
459     /// A list of relations whose union are the accepted tuples.
460     Relation!TupleT[] stable;
461 
462     /// A list of recent tuples, still to be processed.
463     Relation!TupleT recent;
464 
465     /// A list of future tuples, to be introduced.
466     Relation!TupleT[] toAdd;
467 
468     static if (TS == ThreadStrategy.single) {
469         this() {
470         }
471 
472         this(string name) {
473             this.name = name;
474         }
475     } else {
476         this(TaskPool tp) {
477             this.taskPool = tp;
478         }
479 
480         this(string name, TaskPool tp) {
481             this.name = name;
482             this.taskPool = tp;
483         }
484     }
485 
486     /// Inserts a relation into the variable.
487     ///
488     /// This is most commonly used to load initial values into a variable.
489     /// it is not obvious that it should be commonly used otherwise, but
490     /// it should not be harmful.
491     void insert(Relation!TupleT relation) {
492         if (!relation.empty)
493             toAdd ~= relation;
494     }
495 
496     /// ditto
497     void insert(T)(T relation) if (is(T : TupleT[]) || isInputRange!T && is(ElementType!T == TupleT)) {
498         import std.range : hasLength;
499 
500         static if (hasLength!T) {
501             if (relation.length == 0)
502                 return;
503         }
504 
505         Relation!TupleT rel;
506         static if (TS == ThreadStrategy.parallel)
507             rel.__ctor!(TS)(relation, taskPool);
508         else
509             rel.__ctor!(TS)(relation);
510 
511         toAdd ~= rel;
512     }
513 
514     /// ditto
515     void insert(T)(T[][] relation) {
516         import std.algorithm : map;
517 
518         if (relation.length == 0)
519             return;
520 
521         this.insert(relation.map!(a => KVTuple!(T,T)(a[0], a[1])));
522     }
523 
524     /// Consumes the variable and returns a relation.
525     ///
526     /// This method removes the ability for the variable to develop, and
527     /// flattens all internal tuples down to one relation. The method
528     /// asserts that iteration has completed, in that `self.recent` and
529     /// `self.to_add` should both be empty.
530     Relation!TupleT complete() {
531         import std.array : empty;
532 
533         assert(recent.empty);
534         assert(toAdd.empty);
535 
536         typeof(return) result;
537         foreach (batch; stable) {
538             result.merge(batch);
539         }
540         stable = null;
541 
542         return result;
543     }
544 
545     override bool changed() {
546         import std.array : popBack, back, appender, empty;
547 
548         // 1. Merge self.recent into self.stable.
549         if (!recent.empty) {
550             while (!stable.empty && stable.back.length <= 2 * recent.length) {
551                 auto last = stable[$ - 1];
552                 stable.popBack;
553                 recent.merge(last);
554             }
555             stable ~= recent;
556             recent.clear;
557         }
558 
559         if (toAdd.empty)
560             return false;
561 
562         // 2. Move this.toAdd into this.recent.
563         auto to_add = () { auto a = toAdd[$ - 1]; toAdd.popBack; return a; }();
564 
565         // 2b. merge the rest of this.toAdd into to_add
566         foreach (to_add_more; toAdd) {
567             to_add.merge(to_add_more);
568         }
569         toAdd = null;
570 
571         // 2c. Restrict `to_add` to tuples not in `self.stable`.
572         if (distinct) {
573             foreach (batch; stable) {
574                 auto retained = appender!(TupleT[])();
575                 void fastPath() {
576                     foreach (elem; to_add) {
577                         import datacat.join : gallop;
578 
579                         batch = batch.gallop!(y => y < elem);
580                         if (batch.length == 0 || batch[0] != elem)
581                             retained.put(elem);
582                     }
583                 }
584 
585                 void slowPath() {
586                     version (unittest) {
587                         if (forceFastPath) {
588                             fastPath;
589                             return;
590                         }
591                     }
592 
593                     foreach (elem; to_add) {
594                         while (batch.length > 0 && batch[0] < elem) {
595                             batch = batch[1 .. $];
596                         }
597                         if (batch.length == 0 || batch[0] != elem)
598                             retained.put(elem);
599                     }
600                 }
601 
602                 if (batch.length > 4 * to_add.length) {
603                     fastPath;
604                 } else {
605                     slowPath;
606                 }
607                 to_add = retained.data;
608             }
609         }
610         recent = to_add;
611 
612         return !recent.empty;
613     }
614 
615     /// Returns: total elements in stable.
616     size_t countStable() {
617         import std.algorithm : map, sum;
618 
619         return stable.map!"a.length".sum;
620     }
621 
622     void toString(Writer)(ref Writer w) if (isOutputRange!(Writer, char)) {
623         import std.format : formattedWrite;
624 
625         if (!name.empty)
626             formattedWrite(w, "name:%s ", name);
627         formattedWrite(w, "distinc:%s stable:%s recent:%s toAdd:%s", distinct,
628                 stable, recent, toAdd);
629     }
630 }
631 // dfmt on
632 
633 /// Create a Variable type with a tuple of the provided types (`Args`).
634 template Variable(Args...) {
635     import std.typecons : Tuple;
636     import std.variant : Variant;
637 
638     static if (Args.length == 1) {
639         alias Variable = Variable!(Tuple!(Args[0], "key"));
640     } else static if (Args.length == 2) {
641         alias Variable = Variable!(Tuple!(Args[0], "key", Args[1], "value"));
642     } else {
643         import std.conv : to;
644 
645         static assert(0, "1 or 2 parameters required. " ~ Args.length.to!string ~ " provided");
646     }
647 }
648 
649 @("shall complete a variable")
650 unittest {
651     // arrange
652     auto a = new Variable!(int, int);
653     a.insert(relation!(int, int).from([[1, 10], [5, 51]]));
654     a.insert(relation!(int, int).from([[1, 10], [5, 52]]));
655 
656     // act
657     while (a.changed) {
658     }
659 
660     // assert
661     a.complete.should == [kvTuple(1, 10), kvTuple(5, 51), kvTuple(5, 52)];
662 }
663 
664 @("shall progress a variable by moving newly added to the recent state")
665 unittest {
666     import std.array : empty;
667 
668     // arrange
669     auto a = new Variable!(int, int);
670     a.insert(relation!(int, int).from([[1, 10], [2, 20], [5, 50]]));
671     a.toAdd.empty.should == false;
672     a.recent.empty.should == true;
673     a.stable.empty.should == true;
674 
675     // act
676     a.changed.shouldBeTrue;
677 
678     // assert
679     a.toAdd.empty.should == true;
680     a.recent.empty.should == false;
681     a.stable.empty.should == true;
682 }
683 
684 @("shall progress from toAdd to stable after two `changed`")
685 unittest {
686     import std.array : empty;
687 
688     // arrange
689     auto a = new Variable!(int, int);
690     a.insert(relation!(int, int).from([[1, 10], [2, 20], [5, 50]]));
691 
692     // act
693     a.changed.shouldBeTrue;
694     a.changed.shouldBeFalse;
695 
696     // assert
697     a.toAdd.empty.should == true;
698     a.recent.empty.should == true;
699     a.stable.empty.should == false;
700 }
701 
702 @("shall be chunks in stable that have a size about 2x of recent")
703 unittest {
704     import std.algorithm : map, count;
705     import std.range : iota;
706 
707     // arrange
708     Iteration iter;
709     auto variable = iter.variable!(int, int)("source");
710     variable.insert(iota(10).map!(x => kvTuple(x, x + 1)));
711     variable.insert(iota(10).map!(x => kvTuple(x + 1, x)));
712 
713     // act
714     while (iter.changed) {
715         variable.fromJoin!((k, v1, v2) => kvTuple(v1, v2))(variable, variable);
716     }
717 
718     // assert
719     variable.stable.map!(a => a.count).should == [91, 30];
720 }
721 
722 @("shall produce the same result between the fast and slow path when forcing distinct facts")
723 unittest {
724     import std.algorithm : map, count;
725     import std.range : iota;
726 
727     // arrange
728     Iteration iter;
729     auto fast = iter.variable!(int, int)("fast");
730     fast.forceFastPath = true;
731     auto slow = iter.variable!(int, int)("slow");
732     foreach (a; [fast, slow]) {
733         a.insert(iota(10).map!(x => kvTuple(x, x + 1)));
734         a.insert(iota(10).map!(x => kvTuple(x + 1, x)));
735     }
736 
737     // act
738     while (iter.changed) {
739         static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) {
740             return kvTuple(v1, v2);
741         }
742 
743         fast.fromJoin!(helper)(fast, fast);
744         slow.fromJoin!(helper)(slow, slow);
745     }
746 
747     // assert
748     fast.complete.should == slow.complete;
749 }
750 
751 @("shall produce the same result between the single and multithreaded Iteration")
752 unittest {
753     import std.algorithm : map, count;
754     import std.range : iota;
755 
756     // arrange
757     auto iter_s = makeIteration!(ThreadStrategy.single);
758     auto single = iter_s.variable!(int, int)("fast");
759     single.insert(iota(10).map!(x => kvTuple(x, x + 1)));
760     single.insert(iota(10).map!(x => kvTuple(x + 1, x)));
761 
762     auto iter_p = makeIteration!(ThreadStrategy.parallel);
763     auto parallel = iter_p.variable!(int, int)("slow");
764     parallel.insert(iota(10).map!(x => kvTuple(x, x + 1)));
765     parallel.insert(iota(10).map!(x => kvTuple(x + 1, x)));
766 
767     // act
768     static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) {
769         return kvTuple(v1, v2);
770     }
771 
772     while (iter_s.changed)
773         single.fromJoin!(helper)(single, single);
774     while (iter_p.changed)
775         parallel.fromJoin!(helper)(parallel, parallel);
776 
777     // assert
778     single.complete.should == parallel.complete;
779 }
780 
781 @("shall count the elements in the nested arrays in stable")
782 unittest {
783     auto var = new Variable!(int, int);
784     var.stable ~= relation!(int, int)([kvTuple(3, 2), kvTuple(4, 2)]);
785     var.stable ~= relation!(int, int)([kvTuple(3, 2), kvTuple(4, 2)]);
786 
787     var.countStable.should == 4;
788 }