1 /**
2 Copyright: Copyright (c) 2018 Frank McSherry
3 License: MIT
4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Port of DataFrog to D.
7 
8 Functionality for joining Variables.
9 */
10 module datacat.join;
11 
12 import logger = std.experimental.logger;
13 import std.traits;
14 
15 import datacat : Variable, Relation, hasKeyField, hasValueField,
16     hasKeyValueFields;
17 
18 // TODO: change Input1T and Input2T to KeyT, Val1T, Val2T.
19 // Add condition that logicFn(ref Key, ref Val1, ref Val2)->Result
20 // Add condition that OutputT!Result, the Result is the same as the return type of logicFn.
21 /** TODO: add description
22  * Params:
23  *  output = the result of the cross product between input1 and input2 by applying logicFn
24  */
25 void join(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1, Input2T input2, OutputT output) {
26     import std.array : appender;
27 
28     auto results = appender!(Input1T.TT[]);
29 
30     alias fn = (k, v1, v2) { return results.put(logicFn(k, v1, v2)); };
31 
32     auto recent1 = input1.recent;
33     foreach (batch; input2.stable)
34         joinHelper!fn(recent1, batch);
35 
36     auto recent2 = input2.recent;
37     foreach (batch; input1.stable)
38         joinHelper!fn(batch, recent2);
39 
40     joinHelper!fn(recent1, recent2);
41 
42     output.insert(results.data.Relation!(Input1T.TT));
43 }
44 
45 /// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
46 void antiJoin(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1,
47         Input2T input2, OutputT output) {
48     import std.array : appender, empty;
49 
50     auto results = appender!(Input1T.TT[]);
51     auto tuples2 = input2[];
52 
53     foreach (kv; input1.recent) {
54         tuples2 = tuples2.gallop!(k => k.key < kv.key);
55         if (!tuples2.empty && tuples2[0].key != kv.key)
56             results.put(logicFn(kv.key, kv.value));
57     }
58 
59     output.insert(results.data.Relation!(Input1T.TT));
60 }
61 
62 // TODO: add constraint for CmpT, Fn(&T)->bool
63 SliceT gallop(alias cmp, SliceT)(SliceT slice) {
64     // if empty slice, or already >= element, return
65     if (slice.length > 0 && cmp(slice[0])) {
66         auto step = 1;
67         while (step < slice.length && cmp(slice[step])) {
68             slice = slice[step .. $];
69             // TODO: add check so this doesn't overflow.
70             step = step << 1;
71         }
72 
73         step = step >> 1;
74         while (step > 0) {
75             if (step < slice.length && cmp(slice[step])) {
76                 slice = slice[step .. $];
77             }
78             // TODO: add check so this doesn't overflow.
79             step = step >> 1;
80         }
81 
82         slice = slice[1 .. $]; // advance one, as we always stayed < value
83     }
84 
85     return slice;
86 }
87 
88 private:
89 
90 /**
91  * Params:
92  *  logicFn = call repeatedly with key, value1, value2.
93  */
94 void joinHelper(alias logicFn, Slice1T, Slice2T)(ref Slice1T slice1, ref Slice2T slice2) {
95     import std.algorithm : count, until;
96 
97     while (!slice1.empty && !slice2.empty) {
98         auto e1 = slice1[0];
99         auto e2 = slice2[0];
100 
101         if (e1.key < e2.key) {
102             slice1 = slice1.gallop!((x) { return x.key < e2.key; });
103         } else if (e1.key > e2.key) {
104             slice2 = slice2.gallop!((x) { return x.key < e1.key; });
105         } else {
106             // Determine the number of matching keys in each slice.
107             const cnt1 = slice1.until!(x => x.key != e1.key).count;
108             const cnt2 = slice2.until!(x => x.key != e2.key).count;
109 
110             // Produce results from the cross-product of matches.
111             foreach (index1; 0 .. cnt1) {
112                 foreach (index2; 0 .. cnt2) {
113                     logicFn(e1.key, slice1[index1].value, slice2[index2].value);
114                 }
115             }
116 
117             // Advance slices past this key.
118             slice1 = slice1[cnt1 .. $];
119             slice2 = slice2[cnt2 .. $];
120         }
121     }
122 }