1 /** 2 Copyright: Copyright (c) 2018 Frank McSherry 3 License: MIT 4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com) 5 6 Port of DataFrog to D. 7 8 Functionality for joining Variables. 9 */ 10 module datacat.join; 11 12 import logger = std.experimental.logger; 13 import std.traits : hasMember; 14 15 version (unittest) { 16 import unit_threaded; 17 import datacat : Relation, Iteration, kvTuple, relation; 18 } 19 20 // TODO: change Input1T and Input2T to KeyT, Val1T, Val2T. 21 // Add condition that logicFn(ref Key, ref Val1, ref Val2)->Result 22 // Add condition that OutputT!Result, the Result is the same as the return type of logicFn. 23 /** Perform a cross-product between `input1` and `input2` by applying `logicFn`. 24 * 25 * The task pool from `output` is used if the `ThreadStrategy` is parallel. 26 * 27 * Params: 28 * output = the result of the join 29 */ 30 private void join(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1, 31 Input2T input2, OutputT output) { 32 import std.array : appender; 33 34 auto results = appender!(OutputT.TT[]); 35 36 alias fn = (k, v1, v2) { return results.put(logicFn(k, v1, v2)); }; 37 38 auto recent1 = input1.recent; 39 foreach (batch; input2.stable) 40 joinHelper!fn(recent1, batch); 41 42 auto recent2 = input2.recent; 43 foreach (batch; input1.stable) 44 joinHelper!fn(batch, recent2); 45 46 joinHelper!fn(recent1, recent2); 47 48 output.insert(results.data); 49 } 50 51 /** Adds tuples that result from joining `input1` and `input2`. 52 */ 53 template fromJoin(Args...) if (Args.length == 1) { 54 auto fromJoin(Self, I1, I2)(Self self, I1 i1, I2 i2) { 55 import std.functional : unaryFun; 56 57 alias fn_ = unaryFun!(Args[0]); 58 return join!(fn_)(i1, i2, self); 59 } 60 } 61 62 /** 63 * This example starts a collection with the pairs (x, x+1) and (x+1, x) for x in 0 .. 10. 64 * It then adds pairs (y, z) for which (x, y) and (x, z) are present. Because the initial 65 * pairs are symmetric, this should result in all pairs (x, y) for x and y in 0 .. 11. 66 */ 67 @("shall join two variables to produce all pairs (x,y) in the provided range") 68 unittest { 69 import std.algorithm : map; 70 import std.range : iota; 71 72 // arrange 73 Iteration iter; 74 auto variable = iter.variable!(int, int)("source"); 75 variable.insert(iota(3).map!(x => kvTuple(x, x + 1))); 76 // [[0,1],[1,2],[2,3],] 77 variable.insert(iota(3).map!(x => kvTuple(x + 1, x))); 78 // [[1,0],[2,1],[3,2],] 79 80 // act 81 while (iter.changed) { 82 variable.fromJoin!((k, v1, v2) => kvTuple(v1, v2))(variable, variable); 83 } 84 85 auto result = variable.complete; 86 87 // assert 88 result.should == [[0, 0], [0, 1], [0, 2], [0, 3], [1, 0], [1, 1], [1, 2], 89 [1, 3], [2, 0], [2, 1], [2, 2], [2, 3], [3, 0], [3, 1], [3, 2], [3, 3]].map!( 90 a => kvTuple(a[0], a[1])); 91 } 92 93 /** Moves all recent tuples from `input1` that are not present in `input2` into `output`. 94 * 95 * The task pool from `output` is used if the `ThreadStrategy` is parallel. 96 * 97 * Params: 98 * output = the result of the join 99 */ 100 private void antiJoin(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1, 101 Input2T input2, OutputT output) { 102 import std.array : appender, empty; 103 104 auto results = appender!(OutputT.TT[]); 105 auto tuples2 = input2[]; 106 107 foreach (kv; input1.recent) { 108 tuples2 = tuples2.gallop!(k => k.key < kv.key); 109 if (!tuples2.empty && tuples2[0].key != kv.key) 110 results.put(logicFn(kv.key, kv.value)); 111 } 112 113 output.insert(results.data); 114 } 115 116 /** Adds tuples from `input1` whose key is not present in `input2`. 117 */ 118 template fromAntiJoin(Args...) if (Args.length == 1) { 119 auto fromAntiJoin(Self, I1, I2)(Self self, I1 i1, I2 i2) { 120 import std.functional : unaryFun; 121 122 alias fn_ = unaryFun!(Args[0]); 123 return antiJoin!(fn_)(i1, i2, self); 124 } 125 } 126 127 /** 128 * This example starts a collection with the pairs (x, x+1) for x in 0 .. 10. It then 129 * adds any pairs (x+1,x) for which x is not a multiple of three. That excludes four 130 * pairs (for 0, 3, 6, and 9) which should leave us with 16 total pairs. 131 */ 132 @("shall anti-join two variables to produce only those pairs that are not multiples of 3") 133 unittest { 134 import std.algorithm : map, filter; 135 import std.range : iota; 136 137 // arrange 138 Iteration iter; 139 auto variable = iter.variable!(int, int)("source"); 140 variable.insert(iota(10).map!(x => kvTuple(x, x + 1))); 141 auto relation_ = relation!(int).from(iota(10).filter!(x => x % 3 == 0) 142 .map!kvTuple); 143 144 // act 145 while (iter.changed) { 146 variable.fromAntiJoin!((k, v) => kvTuple(v, k))(variable, relation_); 147 } 148 149 auto result = variable.complete; 150 151 // assert 152 result.should == relation!(int, int).from([[0, 1], [1, 2], [2, 1], [2, 3], 153 [3, 2], [3, 4], [4, 5], [5, 4], [5, 6], [6, 5], [6, 7], [7, 8], [8, 154 7], [8, 9], [9, 8], [9, 10],]); 155 //.map!(a => kvTuple(a[0], a[1])); 156 result.length.should == 16; 157 } 158 159 // TODO: add constraint for CmpT, Fn(&T)->bool 160 SliceT gallop(alias cmp, SliceT)(SliceT slice) { 161 // if empty slice, or already >= element, return 162 if (slice.length > 0 && cmp(slice[0])) { 163 auto step = 1; 164 while (step < slice.length && cmp(slice[step])) { 165 slice = slice[step .. $]; 166 // TODO: add check so this doesn't overflow. 167 step = step << 1; 168 } 169 170 step = step >> 1; 171 while (step > 0) { 172 if (step < slice.length && cmp(slice[step])) { 173 slice = slice[step .. $]; 174 } 175 // TODO: add check so this doesn't overflow. 176 step = step >> 1; 177 } 178 179 slice = slice[1 .. $]; // advance one, as we always stayed < value 180 } 181 182 return slice; 183 } 184 185 private: 186 187 /** 188 * Params: 189 * logicFn = call repeatedly with key, value1, value2. 190 */ 191 void joinHelper(alias logicFn, Slice1T, Slice2T)(ref Slice1T slice1, ref Slice2T slice2) { 192 import std.algorithm : count, until; 193 194 while (!slice1.empty && !slice2.empty) { 195 auto e1 = slice1[0]; 196 auto e2 = slice2[0]; 197 198 if (e1.key < e2.key) { 199 slice1 = slice1.gallop!((x) { return x.key < e2.key; }); 200 } else if (e1.key > e2.key) { 201 slice2 = slice2.gallop!((x) { return x.key < e1.key; }); 202 } else { 203 // Determine the number of matching keys in each slice. 204 const cnt1 = slice1.until!(x => x.key != e1.key).count; 205 const cnt2 = slice2.until!(x => x.key != e2.key).count; 206 207 // Produce results from the cross-product of matches. 208 foreach (index1; 0 .. cnt1) { 209 foreach (index2; 0 .. cnt2) { 210 logicFn(e1.key, slice1[index1].value, slice2[index2].value); 211 } 212 } 213 214 // Advance slices past this key. 215 slice1 = slice1[cnt1 .. $]; 216 slice2 = slice2[cnt2 .. $]; 217 } 218 } 219 }