1 /** 2 Copyright: Copyright (c) 2018 Frank McSherry 3 License: APACHE-2.0 4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com) 5 6 Port of DataFrog (Datalog engine in Rust) to D. 7 8 The intended design is that one has static `Relation` types that are sets of 9 tuples, and `Variable` types that represent monotonically increasing sets of 10 tuples. 11 12 The types are mostly wrappers around `Vec<Tuple>` indicating sorted-ness, and 13 the intent is that this code can be dropped in the middle of an otherwise 14 normal Rust program, run to completion, and then the results extracted as 15 vectors again. 16 */ 17 module datacat; 18 19 import logger = std.experimental.logger; 20 import std.traits : hasMember; 21 import std.typecons : Tuple; 22 import std.parallelism : TaskPool, taskPool; 23 24 public import datacat.join : fromJoin, fromAntiJoin; 25 public import datacat.map : fromMap; 26 public import datacat.range; 27 28 version (unittest) { 29 import std.typecons : tuple; 30 import unit_threaded; 31 } 32 33 @safe: 34 35 alias KVTuple(K, V) = Tuple!(K, "key", V, "value"); 36 alias KVTuple(K) = Tuple!(K, "key"); 37 38 /// Convenient function for creating a key/value tuple. 39 auto kvTuple(K, V)(auto ref K k, auto ref V v) { 40 return KVTuple!(K, V)(k, v); 41 } 42 43 /// ditto 44 auto kvTuple(K)(auto ref K k) { 45 return KVTuple!(K)(k); 46 } 47 48 /// A static, ordered list of key-value pairs. 49 /// 50 /// A relation represents a fixed set of key-value pairs. In many places in a 51 /// Datalog computation we want to be sure that certain relations are not able 52 /// to vary (for example, in antijoins). 53 struct Relation(TupleT) { 54 import std.range : isInputRange, ElementType; 55 56 /// Convenient alias to retrieve the tuple type. 57 alias TT = TupleT; 58 59 /// Sorted list of distinct tuples. 60 TupleT[] elements; 61 alias elements this; 62 63 /** Convenient function to create a `Relation` from an array containing 64 * values with the length 2. 65 * 66 * Returns: a `Relation`. 67 */ 68 static auto from(T, ThreadStrategy TS = ThreadStrategy.single)(T values) { 69 import std.algorithm : map; 70 71 static if (hasKeyValueFields!TupleT) { 72 return Relation!(TupleT)(values.map!(a => TupleT(a[0], a[1]))); 73 } else static if (hasKeyField!TupleT) { 74 return Relation!(TupleT)(values.map!(a => TupleT(a))); 75 } else { 76 static assert(0, 77 "Mismatch between Relations (" ~ TupleT.stringof 78 ~ ") and provided type " ~ T.stringof); 79 } 80 } 81 82 /** Create an instance. 83 * 84 * If the input is sorted no further sorting is done. 85 * 86 * Params: 87 * other = the source to pull elements from. 88 */ 89 this(ThreadStrategy TS = ThreadStrategy.single, T, ARGS...)(T other, auto ref ARGS args) 90 if (isInputRange!T && is(ElementType!T == TupleT) 91 && (TS == ThreadStrategy.parallel && ARGS.length == 1 92 && is(ARGS[0] == TaskPool) || TS == ThreadStrategy.single)) { 93 import std.algorithm : copy, sort, until, uniq; 94 import std.array : appender; 95 import std.range : SortedRange, hasLength; 96 97 auto app = appender!(TupleT[])(); 98 99 static if (hasLength!T) 100 app.reserve(other.length); 101 102 other.copy(app); 103 104 static if (is(T : SortedRange!U, U)) { 105 // do nothing 106 } else static if (TS == ThreadStrategy.parallel) { 107 // code copied from std.parallelism 108 static void parallelSort(T)(T[] data, TaskPool pool) @safe { 109 import std.parallelism : task; 110 import std.algorithm : swap, partition; 111 112 // Sort small subarrays serially. 113 if (data.length < 100) { 114 static import std.algorithm; 115 116 std.algorithm.sort(data); 117 return; 118 } 119 120 // Partition the array. 121 swap(data[$ / 2], data[$ - 1]); 122 auto pivot = data[$ - 1]; 123 bool lessThanPivot(T elem) { 124 return elem < pivot; 125 } 126 127 auto greaterEqual = partition!lessThanPivot(data[0 .. $ - 1]); 128 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 129 130 auto less = data[0 .. $ - greaterEqual.length - 1]; 131 greaterEqual = data[$ - greaterEqual.length .. $]; 132 133 // Execute both recursion branches in parallel. 134 auto recurseTask = task!parallelSort(greaterEqual, pool); 135 pool.put(recurseTask); 136 parallelSort(less, pool); 137 recurseTask.yieldForce; 138 } 139 140 TaskPool pool = args[0]; 141 parallelSort(app.data, pool); 142 } else { 143 sort(app.data); 144 } 145 146 elements.length = app.data.length; 147 elements.length -= app.data.uniq.copy(elements).length; 148 } 149 150 /// Merges two relations into their union. 151 auto merge(T)(T other) 152 if (hasMember!(T, "elements") && is(ElementType!(typeof(other.elements)) == TupleT)) { 153 import std.algorithm : until; 154 import std.array : appender, empty, popFront, front; 155 156 // If one of the element lists is zero-length, we don't need to do any work 157 if (elements.length == 0) { 158 elements = other.elements; 159 return this; 160 } else if (other.elements.length == 0) { 161 return this; 162 } 163 164 auto elements2 = other.elements; 165 166 // Make sure that elements starts with the lower element 167 if (elements[0] > elements2[0]) { 168 elements2 = elements; 169 elements = other.elements; 170 } 171 172 // Fast path for when all the new elements are after the exiting ones 173 if (elements[$ - 1] < elements2[0]) { 174 elements ~= elements2; 175 return this; 176 } 177 178 const len = elements.length; 179 180 auto elem = appender!(TupleT[])(); 181 elem.reserve(len + elements2.length); 182 183 elem.put(elements[0]); 184 elements.popFront; 185 if (elem.data[0] == elements2[0]) { 186 elements2.popFront; 187 } 188 189 foreach (e; elements) { 190 foreach (e2; elements2[].until!(a => a >= e)) { 191 elem.put(e2); 192 elements2.popFront; 193 } 194 foreach (_; elements2[].until!(a => a > e)) 195 elements2.popFront; 196 elem.put(e); 197 } 198 199 // Finish draining second list 200 foreach (e; elements2) 201 elem.put(e); 202 203 elements = elem.data; 204 return this; 205 } 206 207 bool empty() const { 208 return elements.length == 0; 209 } 210 211 void clear() { 212 elements = null; 213 } 214 215 import std.range : isOutputRange; 216 import std.format : FormatSpec; 217 218 string toString() { 219 import std.exception : assumeUnique; 220 import std.format : FormatSpec; 221 222 char[] buf; 223 buf.reserve(100); 224 auto fmt = FormatSpec!char("%s"); 225 toString((const(char)[] s) { buf ~= s; }, fmt); 226 auto trustedUnique(T)(T t) @trusted { 227 return assumeUnique(t); 228 } 229 230 return trustedUnique(buf); 231 } 232 233 void toString(Writer, Char)(scope Writer w, FormatSpec!Char fmt) const { 234 import std.format : formattedWrite; 235 import std.range.primitives : put; 236 237 put(w, "["); 238 foreach (e; elements) { 239 static if (__traits(hasMember, TT, "value")) 240 formattedWrite(w, "[%s,%s], ", e.key, e.value); 241 else 242 formattedWrite(w, "[%s], ", e.key); 243 } 244 put(w, "]"); 245 } 246 } 247 248 /// Create a Relation type with a tuple of the provided types (`Args`). 249 template relation(Args...) { 250 import std.typecons : Tuple; 251 import std.variant : Variant; 252 253 static if (Args.length == 1) { 254 alias relation = Relation!(Tuple!(Args[0], "key")); 255 } else static if (Args.length == 2) { 256 alias relation = Relation!(Tuple!(Args[0], "key", Args[1], "value")); 257 } else { 258 import std.conv : to; 259 260 static assert(0, "1 or 2 parameters required. " ~ Args.length.to!string ~ " provided"); 261 } 262 } 263 264 @("shall create a Relation using the parallel sort strategy") 265 unittest { 266 import std.algorithm : map; 267 268 Relation!(KVTuple!(int, int)) a; 269 a.__ctor!(ThreadStrategy.parallel)([kvTuple(1, 2)].map!"a", taskPool); 270 } 271 272 @("shall merge two relations") 273 unittest { 274 auto a = relation!(int, int).from([[1, 0], [2, -1], [5, -20]]); 275 auto b = relation!(int, int).from([[3, 0], [5, -10], [7, -20]]); 276 277 a.merge(b); 278 a.should == [tuple(1, 0), tuple(2, -1), tuple(3, 0), tuple(5, -20), 279 tuple(5, -10), tuple(7, -20)]; 280 } 281 282 @("shall create a sorted relation from unsorted elements") 283 unittest { 284 auto a = relation!(int, int).from([[3, -1], [2, -3], [5, -24]]); 285 a.should == [tuple(2, -3), tuple(3, -1), tuple(5, -24)]; 286 } 287 288 /// True iff `T` has a key field. 289 package enum hasKeyField(T) = hasMember!(T, "key"); 290 291 /// True iff `T` has a value field. 292 package enum hasValueField(T) = hasMember!(T, "value"); 293 294 /// True iff `T` has the fields key and value. 295 package enum hasKeyValueFields(T) = hasKeyField!T && hasValueField!T; 296 297 /// True iff `T0` and `T1` keys are the same type. 298 package enum isSameKeyType(T0, T1) = hasKeyField!T0 && hasKeyField!T1 299 && is(typeof(T0.key) == typeof(T1.key)); 300 301 @("shall check that the keys are the same type") 302 unittest { 303 auto a0 = kvTuple(1, "a0"); 304 auto a1 = kvTuple(2, "a1"); 305 306 static if (!isSameKeyType!(typeof(a0), typeof(a1))) 307 static assert(0, "isSameKeyType: expected to pass because the key types are the same"); 308 309 auto b0 = kvTuple(3, "b0"); 310 auto b1 = kvTuple(1.1, "b1"); 311 312 static if (isSameKeyType!(typeof(b0), typeof(b1))) 313 static assert(0, "isSameKeyType: expected to fail because the key types are different"); 314 } 315 316 /// A type that can report on whether it has changed. 317 /// changed = Reports whether the variable has changed since it was last asked. 318 package enum isVariable(T) = is(T : VariableTrait); 319 320 enum ThreadStrategy { 321 single, 322 parallel 323 } 324 325 alias Iteration = IterationImpl!(ThreadStrategy.single); 326 alias ParallelIteration = IterationImpl!(ThreadStrategy.parallel); 327 328 /** Make an `Iteration`. 329 * 330 * It affects all `Variable`s created through the `variable` method. 331 * 332 * Returns: a parallel `Iteration` 333 */ 334 auto makeIteration(ThreadStrategy Kind)(TaskPool tpool = null) { 335 import std.parallelism : taskPool; 336 337 static if (Kind == ThreadStrategy.parallel) { 338 return ParallelIteration(() { 339 if (tpool is null) 340 return taskPool; 341 else 342 return tpool; 343 }()); 344 } else 345 return Iteration(); 346 } 347 348 /// An iterative context for recursive evaluation. 349 /// 350 /// An `Iteration` tracks monotonic variables, and monitors their progress. 351 /// It can inform the user if they have ceased changing, at which point the 352 /// computation should be done. 353 struct IterationImpl(ThreadStrategy Kind) { 354 static if (Kind == ThreadStrategy.parallel) { 355 TaskPool taskPool; 356 invariant { 357 assert(taskPool !is null, "the taskpool is required to be initialized for a parallel"); 358 } 359 } 360 361 VariableTrait[] variables; 362 363 /// Reports whether any of the monitored variables have changed since 364 /// the most recent call. 365 bool changed() { 366 import std.algorithm : map; 367 368 bool r = false; 369 foreach (a; variables.map!"a.changed") { 370 if (a) 371 r = true; 372 } 373 374 return r; 375 //TODO why didnt this work? 376 //return variables.reduce!((a, b) => a.changed || b.changed); 377 } 378 379 /// Creates a new named variable associated with the iterative context. 380 scope auto variable(T0, T1)(string s) { 381 static if (Kind == ThreadStrategy.single) { 382 auto v = new Variable!(KVTuple!(T0, T1), Kind)(s); 383 } else { 384 auto v = new Variable!(KVTuple!(T0, T1), Kind)(s, taskPool); 385 } 386 variables ~= v; 387 return v; 388 } 389 390 /// Creates a new named variable associated with the iterative context. 391 /// 392 /// This variable will not be maintained distinctly, and may advertise tuples as 393 /// recent multiple times (perhaps unbounded many times). 394 scope auto variableInDistinct(T0, T1)(string s) { 395 static if (Kind == ThreadStrategy.single) { 396 auto v = new Variable!(KVTuple!(T0, T1), Kind)(s); 397 } else { 398 auto v = new Variable!(KVTuple!(T0, T1), Kind)(s, taskPool); 399 } 400 v.distinct = false; 401 return v; 402 } 403 404 /// Returns: a range that continue until all variables stop changing. 405 IteratorRange!(typeof(this)) range() { 406 return typeof(return)(&this); 407 } 408 } 409 410 /// A type that has a key and value member. 411 enum isTuple(T) = hasMember!(T, "key") && hasMember!(T, "value"); 412 413 /// A type that can report on whether it has changed. 414 interface VariableTrait { 415 /// Reports whether the variable has changed since it was last asked. 416 bool changed(); 417 } 418 419 /// An monotonically increasing set of `Tuple`s. 420 /// 421 /// There are three stages in the lifecycle of a tuple: 422 /// 423 /// 1. A tuple is added to `this.toAdd`, but is not yet visible externally. 424 /// 2. Newly added tuples are then promoted to `this.recent` for one iteration. 425 /// 3. After one iteration, recent tuples are moved to `this.stable` for posterity. 426 /// 427 /// Each time `this.changed()` is called, the `recent` relation is folded into `stable`, 428 /// and the `toAdd` relations are merged, potentially deduplicated against `stable`, and 429 /// then made `recent`. This way, across calls to `changed()` all added tuples are in 430 /// `recent` at least once and eventually all are in `stable`. 431 /// 432 /// A `Variable` may optionally be instructed not to de-duplicate its tuples, for reasons 433 /// of performance. Such a variable cannot be relied on to terminate iterative computation, 434 /// and it is important that any cycle of derivations have at least one de-duplicating 435 /// variable on it. 436 /// TODO: tuple should be constrainted to something with Key/Value. 437 // dfmt off 438 final class Variable(TupleT, ThreadStrategy TS = ThreadStrategy.single) : VariableTrait if (isTuple!TupleT) { 439 import std.range : isInputRange, ElementType, isOutputRange; 440 441 static if (TS == ThreadStrategy.parallel) 442 TaskPool taskPool; 443 444 /// Convenient aliases to retrieve properties about the variable 445 alias TT = TupleT; 446 alias ThisTS = TS; 447 448 version (unittest) { 449 /// Used for testing purpose to ensure both paths produce the same result. 450 bool forceFastPath; 451 } 452 453 /// Should the variable be maintained distinctly. 454 bool distinct = true; 455 456 /// A useful name for the variable. 457 string name; 458 459 /// A list of relations whose union are the accepted tuples. 460 Relation!TupleT[] stable; 461 462 /// A list of recent tuples, still to be processed. 463 Relation!TupleT recent; 464 465 /// A list of future tuples, to be introduced. 466 Relation!TupleT[] toAdd; 467 468 static if (TS == ThreadStrategy.single) { 469 this() { 470 } 471 472 this(string name) { 473 this.name = name; 474 } 475 } else { 476 this(TaskPool tp) { 477 this.taskPool = tp; 478 } 479 480 this(string name, TaskPool tp) { 481 this.name = name; 482 this.taskPool = tp; 483 } 484 } 485 486 /// Inserts a relation into the variable. 487 /// 488 /// This is most commonly used to load initial values into a variable. 489 /// it is not obvious that it should be commonly used otherwise, but 490 /// it should not be harmful. 491 void insert(Relation!TupleT relation) { 492 if (!relation.empty) 493 toAdd ~= relation; 494 } 495 496 /// ditto 497 void insert(T)(T relation) if (is(T : TupleT[]) || isInputRange!T && is(ElementType!T == TupleT)) { 498 import std.range : hasLength; 499 500 static if (hasLength!T) { 501 if (relation.length == 0) 502 return; 503 } 504 505 Relation!TupleT rel; 506 static if (TS == ThreadStrategy.parallel) 507 rel.__ctor!(TS)(relation, taskPool); 508 else 509 rel.__ctor!(TS)(relation); 510 511 toAdd ~= rel; 512 } 513 514 /// ditto 515 void insert(T)(T[][] relation) { 516 import std.algorithm : map; 517 518 if (relation.length == 0) 519 return; 520 521 this.insert(relation.map!(a => KVTuple!(T,T)(a[0], a[1]))); 522 } 523 524 /// Consumes the variable and returns a relation. 525 /// 526 /// This method removes the ability for the variable to develop, and 527 /// flattens all internal tuples down to one relation. The method 528 /// asserts that iteration has completed, in that `self.recent` and 529 /// `self.to_add` should both be empty. 530 Relation!TupleT complete() { 531 import std.array : empty; 532 533 assert(recent.empty); 534 assert(toAdd.empty); 535 536 typeof(return) result; 537 foreach (batch; stable) { 538 result.merge(batch); 539 } 540 stable = null; 541 542 return result; 543 } 544 545 override bool changed() { 546 import std.array : popBack, back, appender, empty; 547 548 // 1. Merge self.recent into self.stable. 549 if (!recent.empty) { 550 while (!stable.empty && stable.back.length <= 2 * recent.length) { 551 auto last = stable[$ - 1]; 552 stable.popBack; 553 recent.merge(last); 554 } 555 stable ~= recent; 556 recent.clear; 557 } 558 559 if (toAdd.empty) 560 return false; 561 562 // 2. Move this.toAdd into this.recent. 563 auto to_add = () { auto a = toAdd[$ - 1]; toAdd.popBack; return a; }(); 564 565 // 2b. merge the rest of this.toAdd into to_add 566 foreach (to_add_more; toAdd) { 567 to_add.merge(to_add_more); 568 } 569 toAdd = null; 570 571 // 2c. Restrict `to_add` to tuples not in `self.stable`. 572 if (distinct) { 573 foreach (batch; stable) { 574 auto retained = appender!(TupleT[])(); 575 void fastPath() { 576 foreach (elem; to_add) { 577 import datacat.join : gallop; 578 579 batch = batch.gallop!(y => y < elem); 580 if (batch.length == 0 || batch[0] != elem) 581 retained.put(elem); 582 } 583 } 584 585 void slowPath() { 586 version (unittest) { 587 if (forceFastPath) { 588 fastPath; 589 return; 590 } 591 } 592 593 foreach (elem; to_add) { 594 while (batch.length > 0 && batch[0] < elem) { 595 batch = batch[1 .. $]; 596 } 597 if (batch.length == 0 || batch[0] != elem) 598 retained.put(elem); 599 } 600 } 601 602 if (batch.length > 4 * to_add.length) { 603 fastPath; 604 } else { 605 slowPath; 606 } 607 to_add = retained.data; 608 } 609 } 610 recent = to_add; 611 612 return !recent.empty; 613 } 614 615 /// Returns: total elements in stable. 616 size_t countStable() { 617 import std.algorithm : map, sum; 618 619 return stable.map!"a.length".sum; 620 } 621 622 void toString(Writer)(ref Writer w) if (isOutputRange!(Writer, char)) { 623 import std.format : formattedWrite; 624 625 if (!name.empty) 626 formattedWrite(w, "name:%s ", name); 627 formattedWrite(w, "distinc:%s stable:%s recent:%s toAdd:%s", distinct, 628 stable, recent, toAdd); 629 } 630 } 631 // dfmt on 632 633 /// Create a Variable type with a tuple of the provided types (`Args`). 634 template Variable(Args...) { 635 import std.typecons : Tuple; 636 import std.variant : Variant; 637 638 static if (Args.length == 1) { 639 alias Variable = Variable!(Tuple!(Args[0], "key")); 640 } else static if (Args.length == 2) { 641 alias Variable = Variable!(Tuple!(Args[0], "key", Args[1], "value")); 642 } else { 643 import std.conv : to; 644 645 static assert(0, "1 or 2 parameters required. " ~ Args.length.to!string ~ " provided"); 646 } 647 } 648 649 @("shall complete a variable") 650 unittest { 651 // arrange 652 auto a = new Variable!(int, int); 653 a.insert(relation!(int, int).from([[1, 10], [5, 51]])); 654 a.insert(relation!(int, int).from([[1, 10], [5, 52]])); 655 656 // act 657 while (a.changed) { 658 } 659 660 // assert 661 a.complete.should == [kvTuple(1, 10), kvTuple(5, 51), kvTuple(5, 52)]; 662 } 663 664 @("shall progress a variable by moving newly added to the recent state") 665 unittest { 666 import std.array : empty; 667 668 // arrange 669 auto a = new Variable!(int, int); 670 a.insert(relation!(int, int).from([[1, 10], [2, 20], [5, 50]])); 671 a.toAdd.empty.should == false; 672 a.recent.empty.should == true; 673 a.stable.empty.should == true; 674 675 // act 676 a.changed.shouldBeTrue; 677 678 // assert 679 a.toAdd.empty.should == true; 680 a.recent.empty.should == false; 681 a.stable.empty.should == true; 682 } 683 684 @("shall progress from toAdd to stable after two `changed`") 685 unittest { 686 import std.array : empty; 687 688 // arrange 689 auto a = new Variable!(int, int); 690 a.insert(relation!(int, int).from([[1, 10], [2, 20], [5, 50]])); 691 692 // act 693 a.changed.shouldBeTrue; 694 a.changed.shouldBeFalse; 695 696 // assert 697 a.toAdd.empty.should == true; 698 a.recent.empty.should == true; 699 a.stable.empty.should == false; 700 } 701 702 @("shall be chunks in stable that have a size about 2x of recent") 703 unittest { 704 import std.algorithm : map, count; 705 import std.range : iota; 706 707 // arrange 708 Iteration iter; 709 auto variable = iter.variable!(int, int)("source"); 710 variable.insert(iota(10).map!(x => kvTuple(x, x + 1))); 711 variable.insert(iota(10).map!(x => kvTuple(x + 1, x))); 712 713 // act 714 while (iter.changed) { 715 variable.fromJoin!((k, v1, v2) => kvTuple(v1, v2))(variable, variable); 716 } 717 718 // assert 719 variable.stable.map!(a => a.count).should == [91, 30]; 720 } 721 722 @("shall produce the same result between the fast and slow path when forcing distinct facts") 723 unittest { 724 import std.algorithm : map, count; 725 import std.range : iota; 726 727 // arrange 728 Iteration iter; 729 auto fast = iter.variable!(int, int)("fast"); 730 fast.forceFastPath = true; 731 auto slow = iter.variable!(int, int)("slow"); 732 foreach (a; [fast, slow]) { 733 a.insert(iota(10).map!(x => kvTuple(x, x + 1))); 734 a.insert(iota(10).map!(x => kvTuple(x + 1, x))); 735 } 736 737 // act 738 while (iter.changed) { 739 static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) { 740 return kvTuple(v1, v2); 741 } 742 743 fast.fromJoin!(helper)(fast, fast); 744 slow.fromJoin!(helper)(slow, slow); 745 } 746 747 // assert 748 fast.complete.should == slow.complete; 749 } 750 751 @("shall produce the same result between the single and multithreaded Iteration") 752 unittest { 753 import std.algorithm : map, count; 754 import std.range : iota; 755 756 // arrange 757 auto iter_s = makeIteration!(ThreadStrategy.single); 758 auto single = iter_s.variable!(int, int)("fast"); 759 single.insert(iota(10).map!(x => kvTuple(x, x + 1))); 760 single.insert(iota(10).map!(x => kvTuple(x + 1, x))); 761 762 auto iter_p = makeIteration!(ThreadStrategy.parallel); 763 auto parallel = iter_p.variable!(int, int)("slow"); 764 parallel.insert(iota(10).map!(x => kvTuple(x, x + 1))); 765 parallel.insert(iota(10).map!(x => kvTuple(x + 1, x))); 766 767 // act 768 static auto helper(T0, T1, T2)(T0 k, T1 v1, T2 v2) { 769 return kvTuple(v1, v2); 770 } 771 772 while (iter_s.changed) 773 single.fromJoin!(helper)(single, single); 774 while (iter_p.changed) 775 parallel.fromJoin!(helper)(parallel, parallel); 776 777 // assert 778 single.complete.should == parallel.complete; 779 } 780 781 @("shall count the elements in the nested arrays in stable") 782 unittest { 783 auto var = new Variable!(int, int); 784 var.stable ~= relation!(int, int)([kvTuple(3, 2), kvTuple(4, 2)]); 785 var.stable ~= relation!(int, int)([kvTuple(3, 2), kvTuple(4, 2)]); 786 787 var.countStable.should == 4; 788 }