1 /**
2 Copyright: Copyright (c) 2018 Frank McSherry
3 License: MIT
4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com)
5
6 Port of DataFrog to D.
7
8 Functionality for joining Variables.
9 */
10 module datacat.join;
11
12 import logger = std.experimental.logger;
13 import std.traits;
14
15 import datacat : Variable, Relation, hasKeyField, hasValueField,
16 hasKeyValueFields;
17
18 // TODO: change Input1T and Input2T to KeyT, Val1T, Val2T.
19 // Add condition that logicFn(ref Key, ref Val1, ref Val2)->Result
20 // Add condition that OutputT!Result, the Result is the same as the return type of logicFn.
21 /** TODO: add description
22 * Params:
23 * output = the result of the cross product between input1 and input2 by applying logicFn
24 */
25 void join(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1, Input2T input2, OutputT output) {
26 import std.array : appender;
27
28 auto results = appender!(Input1T.TT[]);
29
30 alias fn = (k, v1, v2) { return results.put(logicFn(k, v1, v2)); };
31
32 auto recent1 = input1.recent;
33 foreach (batch; input2.stable)
34 joinHelper!fn(recent1, batch);
35
36 auto recent2 = input2.recent;
37 foreach (batch; input1.stable)
38 joinHelper!fn(batch, recent2);
39
40 joinHelper!fn(recent1, recent2);
41
42 output.insert(results.data.Relation!(Input1T.TT));
43 }
44
45 /// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
46 void antiJoin(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1,
47 Input2T input2, OutputT output) {
48 import std.array : appender, empty;
49
50 auto results = appender!(Input1T.TT[]);
51 auto tuples2 = input2[];
52
53 foreach (kv; input1.recent) {
54 tuples2 = tuples2.gallop!(k => k.key < kv.key);
55 if (!tuples2.empty && tuples2[0].key != kv.key)
56 results.put(logicFn(kv.key, kv.value));
57 }
58
59 output.insert(results.data.Relation!(Input1T.TT));
60 }
61
62 // TODO: add constraint for CmpT, Fn(&T)->bool
63 SliceT gallop(alias cmp, SliceT)(SliceT slice) {
64 // if empty slice, or already >= element, return
65 if (slice.length > 0 && cmp(slice[0])) {
66 auto step = 1;
67 while (step < slice.length && cmp(slice[step])) {
68 slice = slice[step .. $];
69 // TODO: add check so this doesn't overflow.
70 step = step << 1;
71 }
72
73 step = step >> 1;
74 while (step > 0) {
75 if (step < slice.length && cmp(slice[step])) {
76 slice = slice[step .. $];
77 }
78 // TODO: add check so this doesn't overflow.
79 step = step >> 1;
80 }
81
82 slice = slice[1 .. $]; // advance one, as we always stayed < value
83 }
84
85 return slice;
86 }
87
88 private:
89
90 /**
91 * Params:
92 * logicFn = call repeatedly with key, value1, value2.
93 */
94 void joinHelper(alias logicFn, Slice1T, Slice2T)(ref Slice1T slice1, ref Slice2T slice2) {
95 import std.algorithm : count, until;
96
97 while (!slice1.empty && !slice2.empty) {
98 auto e1 = slice1[0];
99 auto e2 = slice2[0];
100
101 if (e1.key < e2.key) {
102 slice1 = slice1.gallop!((x) { return x.key < e2.key; });
103 } else if (e1.key > e2.key) {
104 slice2 = slice2.gallop!((x) { return x.key < e1.key; });
105 } else {
106 // Determine the number of matching keys in each slice.
107 const cnt1 = slice1.until!(x => x.key != e1.key).count;
108 const cnt2 = slice2.until!(x => x.key != e2.key).count;
109
110 // Produce results from the cross-product of matches.
111 foreach (index1; 0 .. cnt1) {
112 foreach (index2; 0 .. cnt2) {
113 logicFn(e1.key, slice1[index1].value, slice2[index2].value);
114 }
115 }
116
117 // Advance slices past this key.
118 slice1 = slice1[cnt1 .. $];
119 slice2 = slice2[cnt2 .. $];
120 }
121 }
122 }