blob: 94270af777daa3d2031d2b5768a8f5ff4f5a3a5f [file] [log] [blame] [edit]
//! Join functionality.
use super::{Relation, Variable};
use std::cell::Ref;
use std::ops::Deref;
/// Implements `join`. Note that `input1` must be a variable, but
/// `input2` can be either a variable or a relation. This is necessary
/// because relations have no "recent" tuples, so the fn would be a
/// guaranteed no-op if both arguments were relations. See also
/// `join_into_relation`.
pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
input1: &Variable<(Key, Val1)>,
input2: impl JoinInput<'me, (Key, Val2)>,
output: &Variable<Result>,
mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
) {
let mut results = Vec::new();
let recent1 = input1.recent();
let recent2 = input2.recent();
{
// scoped to let `closure` drop borrow of `results`.
let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));
for batch2 in input2.stable().iter() {
join_helper(&recent1, &batch2, &mut closure);
}
for batch1 in input1.stable().iter() {
join_helper(&batch1, &recent2, &mut closure);
}
join_helper(&recent1, &recent2, &mut closure);
}
output.insert(Relation::from_vec(results));
}
/// Join, but for two relations.
pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
input1: &Relation<(Key, Val1)>,
input2: &Relation<(Key, Val2)>,
mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
) -> Relation<Result> {
let mut results = Vec::new();
join_helper(&input1.elements, &input2.elements, |k, v1, v2| {
results.push(logic(k, v1, v2));
});
Relation::from_vec(results)
}
/// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>(
input1: impl JoinInput<'me, (Key, Val)>,
input2: &Relation<Key>,
mut logic: impl FnMut(&Key, &Val) -> Result,
) -> Relation<Result> {
let mut tuples2 = &input2[..];
let results = input1
.recent()
.iter()
.filter(|(ref key, _)| {
tuples2 = gallop(tuples2, |k| k < key);
tuples2.first() != Some(key)
})
.map(|(ref key, ref val)| logic(key, val))
.collect::<Vec<_>>();
Relation::from_vec(results)
}
fn join_helper<K: Ord, V1, V2>(
mut slice1: &[(K, V1)],
mut slice2: &[(K, V2)],
mut result: impl FnMut(&K, &V1, &V2),
) {
while !slice1.is_empty() && !slice2.is_empty() {
use std::cmp::Ordering;
// If the keys match produce tuples, else advance the smaller key until they might.
match slice1[0].0.cmp(&slice2[0].0) {
Ordering::Less => {
slice1 = gallop(slice1, |x| x.0 < slice2[0].0);
}
Ordering::Equal => {
// Determine the number of matching keys in each slice.
let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count();
let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count();
// Produce results from the cross-product of matches.
for index1 in 0..count1 {
for s2 in slice2[..count2].iter() {
result(&slice1[0].0, &slice1[index1].1, &s2.1);
}
}
// Advance slices past this key.
slice1 = &slice1[count1..];
slice2 = &slice2[count2..];
}
Ordering::Greater => {
slice2 = gallop(slice2, |x| x.0 < slice1[0].0);
}
}
}
}
pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] {
// if empty slice, or already >= element, return
if !slice.is_empty() && cmp(&slice[0]) {
let mut step = 1;
while step < slice.len() && cmp(&slice[step]) {
slice = &slice[step..];
step <<= 1;
}
step >>= 1;
while step > 0 {
if step < slice.len() && cmp(&slice[step]) {
slice = &slice[step..];
}
step >>= 1;
}
slice = &slice[1..]; // advance one, as we always stayed < value
}
slice
}
/// An input that can be used with `from_join`; either a `Variable` or a `Relation`.
pub trait JoinInput<'me, Tuple: Ord>: Copy {
/// If we are on iteration N of the loop, these are the tuples
/// added on iteration N-1. (For a `Relation`, this is always an
/// empty slice.)
type RecentTuples: Deref<Target = [Tuple]>;
/// If we are on iteration N of the loop, these are the tuples
/// added on iteration N - 2 or before. (For a `Relation`, this is
/// just `self`.)
type StableTuples: Deref<Target = [Relation<Tuple>]>;
/// Get the set of recent tuples.
fn recent(self) -> Self::RecentTuples;
/// Get the set of stable tuples.
fn stable(self) -> Self::StableTuples;
}
impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> {
type RecentTuples = Ref<'me, [Tuple]>;
type StableTuples = Ref<'me, [Relation<Tuple>]>;
fn recent(self) -> Self::RecentTuples {
Ref::map(self.recent.borrow(), |r| &r.elements[..])
}
fn stable(self) -> Self::StableTuples {
Ref::map(self.stable.borrow(), |v| &v[..])
}
}
impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> {
type RecentTuples = &'me [Tuple];
type StableTuples = &'me [Relation<Tuple>];
fn recent(self) -> Self::RecentTuples {
&[]
}
fn stable(self) -> Self::StableTuples {
std::slice::from_ref(self)
}
}