1 /**
2 Copyright: Copyright (c) 2018 Frank McSherry
3 License: MIT
4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Port of DataFrog to D.
7 
8 Functionality for joining Variables.
9 */
10 module datacat.join;
11 
12 import logger = std.experimental.logger;
13 import std.traits : hasMember;
14 
15 version (unittest) {
16     import unit_threaded;
17     import datacat : Relation, Iteration, kvTuple, relation;
18 }
19 
20 // TODO: change Input1T and Input2T to KeyT, Val1T, Val2T.
21 // Add condition that logicFn(ref Key, ref Val1, ref Val2)->Result
22 // Add condition that OutputT!Result, the Result is the same as the return type of logicFn.
23 /** Perform a cross-product between `input1` and `input2` by applying `logicFn`.
24  *
25  * The task pool from `output` is used if the `ThreadStrategy` is parallel.
26  *
27  * Params:
28  *  output = the result of the join
29  */
30 private void join(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1,
31         Input2T input2, OutputT output) {
32     import std.array : appender;
33 
34     auto results = appender!(OutputT.TT[]);
35 
36     alias fn = (k, v1, v2) { return results.put(logicFn(k, v1, v2)); };
37 
38     auto recent1 = input1.recent;
39     foreach (batch; input2.stable)
40         joinHelper!fn(recent1, batch);
41 
42     auto recent2 = input2.recent;
43     foreach (batch; input1.stable)
44         joinHelper!fn(batch, recent2);
45 
46     joinHelper!fn(recent1, recent2);
47 
48     output.insert(results.data);
49 }
50 
51 /** Adds tuples that result from joining `input1` and `input2`.
52  */
53 template fromJoin(Args...) if (Args.length == 1) {
54     auto fromJoin(Self, I1, I2)(Self self, I1 i1, I2 i2) {
55         import std.functional : unaryFun;
56 
57         alias fn_ = unaryFun!(Args[0]);
58         return join!(fn_)(i1, i2, self);
59     }
60 }
61 
62 /**
63  * This example starts a collection with the pairs (x, x+1) and (x+1, x) for x in 0 .. 10.
64  * It then adds pairs (y, z) for which (x, y) and (x, z) are present. Because the initial
65  * pairs are symmetric, this should result in all pairs (x, y) for x and y in 0 .. 11.
66  */
67 @("shall join two variables to produce all pairs (x,y) in the provided range")
68 unittest {
69     import std.algorithm : map;
70     import std.range : iota;
71 
72     // arrange
73     Iteration iter;
74     auto variable = iter.variable!(int, int)("source");
75     variable.insert(iota(3).map!(x => kvTuple(x, x + 1)));
76     // [[0,1],[1,2],[2,3],]
77     variable.insert(iota(3).map!(x => kvTuple(x + 1, x)));
78     // [[1,0],[2,1],[3,2],]
79 
80     // act
81     while (iter.changed) {
82         variable.fromJoin!((k, v1, v2) => kvTuple(v1, v2))(variable, variable);
83     }
84 
85     auto result = variable.complete;
86 
87     // assert
88     result.should == [[0, 0], [0, 1], [0, 2], [0, 3], [1, 0], [1, 1], [1, 2],
89         [1, 3], [2, 0], [2, 1], [2, 2], [2, 3], [3, 0], [3, 1], [3, 2], [3, 3]].map!(
90             a => kvTuple(a[0], a[1]));
91 }
92 
93 /** Moves all recent tuples from `input1` that are not present in `input2` into `output`.
94  *
95  * The task pool from `output` is used if the `ThreadStrategy` is parallel.
96  *
97  * Params:
98  *  output = the result of the join
99  */
100 private void antiJoin(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1,
101         Input2T input2, OutputT output) {
102     import std.array : appender, empty;
103 
104     auto results = appender!(OutputT.TT[]);
105     auto tuples2 = input2[];
106 
107     foreach (kv; input1.recent) {
108         tuples2 = tuples2.gallop!(k => k.key < kv.key);
109         if (!tuples2.empty && tuples2[0].key != kv.key)
110             results.put(logicFn(kv.key, kv.value));
111     }
112 
113     output.insert(results.data);
114 }
115 
116 /** Adds tuples from `input1` whose key is not present in `input2`.
117  */
118 template fromAntiJoin(Args...) if (Args.length == 1) {
119     auto fromAntiJoin(Self, I1, I2)(Self self, I1 i1, I2 i2) {
120         import std.functional : unaryFun;
121 
122         alias fn_ = unaryFun!(Args[0]);
123         return antiJoin!(fn_)(i1, i2, self);
124     }
125 }
126 
127 /**
128  * This example starts a collection with the pairs (x, x+1) for x in 0 .. 10. It then
129  * adds any pairs (x+1,x) for which x is not a multiple of three. That excludes four
130  * pairs (for 0, 3, 6, and 9) which should leave us with 16 total pairs.
131  */
132 @("shall anti-join two variables to produce only those pairs that are not multiples of 3")
133 unittest {
134     import std.algorithm : map, filter;
135     import std.range : iota;
136 
137     // arrange
138     Iteration iter;
139     auto variable = iter.variable!(int, int)("source");
140     variable.insert(iota(10).map!(x => kvTuple(x, x + 1)));
141     auto relation_ = relation!(int).from(iota(10).filter!(x => x % 3 == 0)
142             .map!kvTuple);
143 
144     // act
145     while (iter.changed) {
146         variable.fromAntiJoin!((k, v) => kvTuple(v, k))(variable, relation_);
147     }
148 
149     auto result = variable.complete;
150 
151     // assert
152     result.should == relation!(int, int).from([[0, 1], [1, 2], [2, 1], [2, 3],
153             [3, 2], [3, 4], [4, 5], [5, 4], [5, 6], [6, 5], [6, 7], [7, 8], [8,
154             7], [8, 9], [9, 8], [9, 10],]);
155     //.map!(a => kvTuple(a[0], a[1]));
156     result.length.should == 16;
157 }
158 
159 // TODO: add constraint for CmpT, Fn(&T)->bool
160 SliceT gallop(alias cmp, SliceT)(SliceT slice) {
161     // if empty slice, or already >= element, return
162     if (slice.length > 0 && cmp(slice[0])) {
163         auto step = 1;
164         while (step < slice.length && cmp(slice[step])) {
165             slice = slice[step .. $];
166             // TODO: add check so this doesn't overflow.
167             step = step << 1;
168         }
169 
170         step = step >> 1;
171         while (step > 0) {
172             if (step < slice.length && cmp(slice[step])) {
173                 slice = slice[step .. $];
174             }
175             // TODO: add check so this doesn't overflow.
176             step = step >> 1;
177         }
178 
179         slice = slice[1 .. $]; // advance one, as we always stayed < value
180     }
181 
182     return slice;
183 }
184 
185 private:
186 
187 /**
188  * Params:
189  *  logicFn = call repeatedly with key, value1, value2.
190  */
191 void joinHelper(alias logicFn, Slice1T, Slice2T)(ref Slice1T slice1, ref Slice2T slice2) {
192     import std.algorithm : count, until;
193 
194     while (!slice1.empty && !slice2.empty) {
195         auto e1 = slice1[0];
196         auto e2 = slice2[0];
197 
198         if (e1.key < e2.key) {
199             slice1 = slice1.gallop!((x) { return x.key < e2.key; });
200         } else if (e1.key > e2.key) {
201             slice2 = slice2.gallop!((x) { return x.key < e1.key; });
202         } else {
203             // Determine the number of matching keys in each slice.
204             const cnt1 = slice1.until!(x => x.key != e1.key).count;
205             const cnt2 = slice2.until!(x => x.key != e2.key).count;
206 
207             // Produce results from the cross-product of matches.
208             foreach (index1; 0 .. cnt1) {
209                 foreach (index2; 0 .. cnt2) {
210                     logicFn(e1.key, slice1[index1].value, slice2[index2].value);
211                 }
212             }
213 
214             // Advance slices past this key.
215             slice1 = slice1[cnt1 .. $];
216             slice2 = slice2[cnt2 .. $];
217         }
218     }
219 }