1 /** 2 Copyright: Copyright (c) 2018 Frank McSherry 3 License: MIT 4 Author: Joakim BrännströmJoakim Brännström (joakim.brannstrom@gmx.com) 5 6 Port of DataFrog to D. 7 8 Functionality for joining Variables. 9 */ 10 module datacat.join; 11 12 import logger = std.experimental.logger; 13 import std.traits; 14 15 import datacat : Variable, Relation, hasKeyField, hasValueField, 16 hasKeyValueFields; 17 18 // TODO: change Input1T and Input2T to KeyT, Val1T, Val2T. 19 // Add condition that logicFn(ref Key, ref Val1, ref Val2)->Result 20 // Add condition that OutputT!Result, the Result is the same as the return type of logicFn. 21 /** TODO: add description 22 * Params: 23 * output = the result of the cross product between input1 and input2 by applying logicFn 24 */ 25 void join(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1, Input2T input2, OutputT output) { 26 import std.array : appender; 27 28 auto results = appender!(Input1T.TT[]); 29 30 alias fn = (k, v1, v2) { return results.put(logicFn(k, v1, v2)); }; 31 32 auto recent1 = input1.recent; 33 foreach (batch; input2.stable) 34 joinHelper!fn(recent1, batch); 35 36 auto recent2 = input2.recent; 37 foreach (batch; input1.stable) 38 joinHelper!fn(batch, recent2); 39 40 joinHelper!fn(recent1, recent2); 41 42 output.insert(results.data.Relation!(Input1T.TT)); 43 } 44 45 /// Moves all recent tuples from `input1` that are not present in `input2` into `output`. 46 void antiJoin(alias logicFn, Input1T, Input2T, OutputT)(Input1T input1, 47 Input2T input2, OutputT output) { 48 import std.array : appender, empty; 49 50 auto results = appender!(Input1T.TT[]); 51 auto tuples2 = input2[]; 52 53 foreach (kv; input1.recent) { 54 tuples2 = tuples2.gallop!(k => k.key < kv.key); 55 if (!tuples2.empty && tuples2[0].key != kv.key) 56 results.put(logicFn(kv.key, kv.value)); 57 } 58 59 output.insert(results.data.Relation!(Input1T.TT)); 60 } 61 62 // TODO: add constraint for CmpT, Fn(&T)->bool 63 SliceT gallop(alias cmp, SliceT)(SliceT slice) { 64 // if empty slice, or already >= element, return 65 if (slice.length > 0 && cmp(slice[0])) { 66 auto step = 1; 67 while (step < slice.length && cmp(slice[step])) { 68 slice = slice[step .. $]; 69 // TODO: add check so this doesn't overflow. 70 step = step << 1; 71 } 72 73 step = step >> 1; 74 while (step > 0) { 75 if (step < slice.length && cmp(slice[step])) { 76 slice = slice[step .. $]; 77 } 78 // TODO: add check so this doesn't overflow. 79 step = step >> 1; 80 } 81 82 slice = slice[1 .. $]; // advance one, as we always stayed < value 83 } 84 85 return slice; 86 } 87 88 private: 89 90 /** 91 * Params: 92 * logicFn = call repeatedly with key, value1, value2. 93 */ 94 void joinHelper(alias logicFn, Slice1T, Slice2T)(ref Slice1T slice1, ref Slice2T slice2) { 95 import std.algorithm : count, until; 96 97 while (!slice1.empty && !slice2.empty) { 98 auto e1 = slice1[0]; 99 auto e2 = slice2[0]; 100 101 if (e1.key < e2.key) { 102 slice1 = slice1.gallop!((x) { return x.key < e2.key; }); 103 } else if (e1.key > e2.key) { 104 slice2 = slice2.gallop!((x) { return x.key < e1.key; }); 105 } else { 106 // Determine the number of matching keys in each slice. 107 const cnt1 = slice1.until!(x => x.key != e1.key).count; 108 const cnt2 = slice2.until!(x => x.key != e2.key).count; 109 110 // Produce results from the cross-product of matches. 111 foreach (index1; 0 .. cnt1) { 112 foreach (index2; 0 .. cnt2) { 113 logicFn(e1.key, slice1[index1].value, slice2[index2].value); 114 } 115 } 116 117 // Advance slices past this key. 118 slice1 = slice1[cnt1 .. $]; 119 slice2 = slice2[cnt2 .. $]; 120 } 121 } 122 }