Cancel

分片级联(Fractional Cascading)

Algorithm

·

April 17, 2023

本文主要参考 wiki ,对其中一些内容进行了拓展,对个别错误进行了修正。

分片级联是一种同时对多个有序列表进行二分查找的加速技术。

假设有 $k$ 个有序列表,给定一个元素 $q$ ,查询在每个有序列表上的位置。

思路

原始做法

依次在每个有序列表上执行二分查找,假设每个列表的平均长度为 $n$ ,总长度 $N=kn$ , 则时间复杂度为 $O(k\ \text{log}\ n)$ 。

改进的做法

可以提前处理下这 $k$ 个有序列表,为其中每个元素计算在每个列表的位置,然后把这些元素和对应的在每个列表的位置合成一个大的有序列表,查找的时候只需要在这个大的有序列表上进行一次二分查找就可以了。

根据查找到的元素对应的统计信息,直接得到全部 $k$ 个有序列表上的位置。

例如如下 $k=4$ 个有序列表的

L\i 0 1 2 3 4
1 24 64 65 80 93
2 23 25 26    
3 13 44 62 66  
4 11 35 46 79 81

得到

L[0,6,12] L[1,7,13] L[2,8,14] L[3,9,15] l[4,10,16] l[5,11,17]
11[0,0,0,0] 13[0,0,0,1] 23[0,1,1,1] 24[0,1,1,1] 25[1,1,1,1] 26[1,2,1,1]
35[1,3,1,1] 44[1,3,1,2] 46[1,3,2,2] 62[1,3,2,3] 64[1,3,3,3] 65[2,3,3,3]
66[3,3,3,3] 79[3,3,4,3]        

具体说明这个统计信息,以 $46[1,3,2,2]$ 为例,表明 $46$ 在 $L_1$ 的位置为 1 ,在 $L_2$ 的位置为 $3$ ,在 $L_3$ 的位置为 $2$ , 在 $L_4$ 的位置为 $2$ 。

这样如果我们查找 $q=45$ ,发现落到 $46$ 的位置,就直接使用 $46$ 的索引统计 $q=45$ 的索引结果即可。

这种做法,花费 $kn \cdot (k-1)\log n$ 的预处理时间,和 $kN$ 1的空间,以实现 $O(\text{log}N)$ 时间复杂度的单次查询。

这样,单次查询的时间复杂度,从 $O(k\log n)$ 变为 $O(\log kn)$ , 相当于把 $k$ 放进了 log 里,性能得到了提升。

但是空间上 $kN$ 是非常的昂贵。

分片级联

那么如何在尽量保持改进的查询时间复杂度的情况下降低空间复杂度呢? 这才引出来,我们分片 · 级联的技术。

分片指得是分片取样(fractional sampling),级联(cascading)指得是并不同时保存所有 $k$ 个列表的索引信息,而只是保存紧邻下一级的索引信息,然后一层层进行连锁。2

具体说就是:

  1. 构建一个预处理的列表 $M$ ,它的每一级 $M_i$ 都是由对应 $L_i$ 和下一级的 $M_{i+1}$ 中的元素构成,$M_i$ 存储元素本身以及每个元素在 $L_i$ 上的索引位置和 $M_{i+1}$ 上索引位置,这样一直到最后一级 $M_k$ ,因为没有下一级,所以只保存 $L_k$ 的元素,这体现了级联;
  2. 次一级的 $M_{i+1}$ 里面不是每个元素都提升到 $M_i$ 里,而是两两取样3,

例如上述例子为

M\I 0 1 2 3 4 5 6 7
1 24[0,1] 25[1,1] 35[1,3] 64[1,5] 65[2,5] 79[3,5] 80[3,6] 93[4,6]
2 23[0,1] 25[1,1] 26[2,1] 35[3,1] 62[3,3] 79[3,5]    
3 13[0,1] 35[1,1] 44[1,2] 62[2,3] 66[3,3] 79[4,3]    
4 11[0,0] 35[1,0] 46[2,0] 79[3,0] 81[4,0]      

对于前面的,$q=45$ 的例子,假设取样的分片大小为 $f=2$

  1. 对 $M_1$ ,进行二分查找,发现最小的 $\geqslant 45$ 的是 $64[1,5]$ ,于是对于 $L_1$ 的结果是 $1$,然后从 $M_2[5]$ 开始,直接根据所属分片的级联的信息,就像拉拉链一样,最多经过 $f-1$ 次比较就可以得到结果;

  2. 然后比较 $M_2[5]$ 和它的前一项 $M_2[4]$ 4发现 $M_2[5]=62[3,3]$ 是最小的 $\geqslant q$ 的一项,于是 $L_2$ 的结果是 $3$;

  3. 比较 $M_3[3]$ 和 $M_3[2]$,发现 $M_3[3] =62[2,3]$ 是最小的 $\geqslant q$ 的项,于是 $L_3$ 的结果是 $2$;

  4. 比较 $M_4[3]$ 和 $M_4[2]$ ,发现 $M_4[2]=46[2,0]$ 是最小的 $\geqslant q$ 的项,于是 $L_4$ 的结果是 $4$;

利用合并两个有序列表的方法处理每个 $L_i$ 和 $M_{i+1}$ 的分片,使用 $(k-1)n$ 的预处理时间,消耗的内存小于 $2N$ 个单元,对于数字类型来说,也就是 $3 * 2N = 6N$ 的空间复杂度。

空间复杂度证明

预处理列表 M 每一层的空间(单元):

\[|M_i| = {\displaystyle n \sum_{j=1}^{k-i+1} (\frac{1}{f})^{j-1}}\]

总共空间是(单元):

\[\begin{array}{l} {\displaystyle \sum_{i=1}^k|M_i|} &= {\displaystyle n\sum_{i=1}^k \sum_{j=1}^{k-i+1} (\frac{1}{f})^{j-1}}\\ &\lt {\displaystyle n \sum_{i=1}^k \sum_{j=1}^{k+1} (\frac{1}{f})^{j-1}}\\ &= {\displaystyle n \sum_{i=1}^k 1 + (\frac{1}{f}) + (\frac{1}{f})^2 ...,+(\frac{1}{f})^k}\\ &\lt 2n{\displaystyle \sum_{i=1}^k 1}\\ &= 2N \end{array}\]

时间复杂度分析

由于 $M_1 \lt 2n$ ,因此单次查询的时间复杂度为 $O(\text{log}n + (f-1)k)$ 。

由于除了 log ,还多了一个 $k$ , 于是情况就比较微妙了 。

  1. 一方面,当 $k \ll n$ 时,分片级联的性能好于原始办法。

  2. 但另一方面,如果 $k$ 相对 $n$ 较大,那分片级联相比于原始算法就没有优势,甚至更慢。

  3. 而在几乎所有的情况下,只要 $k$ 不是特别小,改进算法还是最快的,是快好几倍的5 。因为很明显,对于 $\text{log}N$ ,即使 $N$ 取一个 u64 能表示的最大值,也就是相当于 $k=64$ 的大小,因此只要稍微大一点的 $k$ 都会使得查询性能不如改进算法

我们可以通过一个图表把这个问题看得更清楚,取一个固定的 $N$ ,让 $k$ 增长,观察

  1. 原始做法-raw
  2. 改进做法-best
  3. $f=2$ 的分片级联-fc2
  4. $f=4$ 的分片级联-fc4

的运行时间的变化:

我们发现当 $k$ 从一个较小的值增长的时候,原始算法的性能会变得有些优于 $f=4$ 的分片级联。

但如果拉长 $k$ 的取值话,发现分片级联还是明显优于原始做法。

实现

源代码

数据结构

pub struct FractionalCascading<'a, T, const F: usize = 2> {
    m: Vec<Vec<METype<T>>>,
    l: &'a [&'a [T]],
}

type METype<T> = (T, usize, usize);

对于 $q$ 的二分查找返回的结果是, $\text{min}({x \in L,\ x >= q})$

构建

分片级联,对于输入有一些动态时才能检查的条件:

  1. $k > 0$;
  2. 输入的 $k$ 个列表每个列表都不为空且都是有序的;
  3. 分片大小 $f > 0$

构建 M 时:

  1. 采用 merge-sort 里面的 merge 操作合并 $L_i$ 和 $M_{i+1}$ ;
  2. 对于列表中重复的元素只保留一份,因为标准的二分搜索不会区分不同位置的重复元素,留着也没用;
  3. 同时注意如果 $M_{i+1}$ 的长度不能恰好被分片 $f$ 整除,需要把最后的尾元素补上,这样使得所有的完整的或者不完整的分片都有一个代表,就是分片的最大值,简化了比较逻辑
impl<'a, T: Ord, const F: usize> FractionalCascading<'a, T, F> {
    pub fn new(l: &'a [&[T]]) -> Self
    where
        T: Clone,
    {
        assert!(l.len() > 0);
        assert!(l.iter().all(|i| i.is_sorted()));
        assert!(F > 0);

        debug_assert!(l.iter().all(|i| !i.is_empty() && i.is_sorted()));

        let m = Self::build_m(l);

        Self { m, l }
    }

    fn build_m(l: &[&[T]]) -> Vec<Vec<METype<T>>>
    where
        T: Clone,
    {
        let k = l.len();
        let mut m = vec![vec![]; k];

        /* Init m_k*/

        let l_k = l[k - 1];
        let m_k = &mut m[k - 1];

        for (i, v) in l_k.iter().cloned().enumerate() {
            m_k.push((v, Ok(i), 0));
        }

        /* on guard */

        if k == 1 {
            return m;
        }

        for m_i in (0..=k - 2).rev() {
            let l1 = l[m_i];
            let m2 = &m[m_i + 1];
            let mut m1 =
                Vec::with_capacity(l1.len() + m2.len().div_ceil(F));

            /* merge two sorted vec */

            let mut i = 0;
            let mut j = 1;

            macro_rules! nxt_j {
                ($j:ident) => {
                    // end
                    if $j == m2.len() - 1 {
                        j + 1
                    } else if $j + F > m2.len() - 1 {
                        m2.len() - 1
                    } else {
                        $j + F
                    }
                };
            }

            while i < l1.len() && j < m2.len() {
                match l1[i].cmp(&m2[j].0) {
                    Less => {
                        m1.push((l1[i].clone(), Ok(i), j));

                        i += 1;
                    }
                    Equal => {
                        m1.push((l1[i].clone(), Ok(i), j));

                        /* skip dup */

                        let dup = &l1[i];

                        while i < l1.len() && &l1[i] == dup {
                            i += 1;
                        }

                        while j < m2.len() && &m2[j].0 == dup {
                            j = nxt_j!(j);
                        }
                    }
                    Greater => {
                        m1.push((m2[j].0.clone(), Err(i), j));

                        j = nxt_j!(j);
                    }
                }
            }

            while i < l1.len() {
                m1.push((l1[i].clone(), Ok(i), m2.len()));

                i += 1;
            }

            while j < m2.len() {
                m1.push((m2[j].0.clone(), Err(l1.len()), j));

                j = nxt_j!(j);
            }

            m[m_i] = m1;
        }

        m
    }
}

查询

按照 Rust 的 API 风格,查找元素时用 Result<usize, usize> 类型区分相等的情况和最小大于的情况。

查询

主流程:

  1. 在 m[0] 上执行二分查找,找到“拉链头”;
  2. 然后在后续的 m[1..m.len()] 上“拉拉链”,在每一级上,都会在分片大小的范围内找到结果

考虑重复元素的情况,实际上前面讲的“区分相等的情况和最小大于的情况”的说法并不严谨,实际 API 返回值的要求应该是相等和插入后保持有序两种情况 ,这意味着:

  1. 如果存在重复的最小大于的元素,不能随便选一个返回它的索引位置,而是应该返回最左边的那一个;
  2. 在极端的最坏情况下,这可能会拖累性能到线性
impl<'a, T: Ord, const F: usize> FractionalCascading<'a, T, F> {    
	pub fn find<Q: Borrow<T>>(&self, k: &Q) -> Vec<Result<usize, usize>> {
        self.find_(k.borrow(), Self::find_handle_approx_result)
    }
    
    fn find_(&self, k: &T, handler: FindHandler<'a, T, F>) -> Vec<Result<usize, usize>> {
        let mut res = vec![Err(0); self.m.len()];

        let mut j;

        // 1. assign res[0]
        //
        // 2. m_idx
        //
        match self.m[0].binary_search_by_key(&k.borrow(), |x| &x.0) {
            Ok(idx) => {
                res[0] = self.m[0][idx].1;
                j = self.m[0][idx].2;
            }
            Err(idx) => {
                if idx == self.m[0].len() {
                    res[0] = Err(self.l[0].len());
                    j = self.m[0][idx - 1].2;
                } else {
                    res[0] = self.m[0][idx].1.and_then(|x| Err(x));
                    j = self.m[0][idx].2;
                }
            }
        }

        for i in 1..self.m.len() {
            let elected;

            if j == 0 {
                elected = j;
            } else if j < self.m[i].len() {
                let mut cand = j;

                while cand > 0 && k.borrow() < &self.m[i][cand].0 {
                    cand -= 1;
                }

                if cand < j && k.borrow() > &self.m[i][cand].0 {
                    cand += 1;
                }

                elected = cand;
            } else {
                elected = j - 1;
            }

            res[i] = handler(self, k, i, elected);

            j = self.m[i][elected].2;
        }

        res
    }
    
    #[inline(always)]
    fn find_handle_approx_result(
        &self,
        k: &T,
        i: usize,
        j: usize,
    ) -> Result<usize, usize>
    {
        let dup = &self.m[i][j].0;

        match k.borrow().cmp(&self.m[i][j].0) {
            Less => {
                let mut idx = unpack_result!(self.m[i][j].1);

                // go back (the worst case down to O(nk))
                while idx > 0 && &self.l[i][idx - 1] == dup {
                    idx -= 1;
                }

                Err(idx)
            }
            Equal => {
                self.m[i][j].1
            }
            Greater => {
                let mut idx = unpack_result!(self.m[i][j].1);

                // go forward (the worst case down to O(nk))
                while idx < self.l[i].len() - 1 && &self.l[i][idx + 1] == dup {
                    idx += 1;
                }

                if idx == self.l[i].len() - 1 && dup == &self.l[i][idx] {
                    idx += 1;
                }

                Err(idx)
            }
        }
    }
}

type FindHandler<'a, T, const N: usize> = fn(
    &FractionalCascading<'a, T, N>,
    &T,
    usize,
    usize
) -> Result<usize, usize>;

快速查询

如果不考虑对重复元素的插入顺序的情况,可以直接这样写:

impl<'a, T: Ord, const F: usize> FractionalCascading<'a, T, F> {
    /// Ignore duplicated case
    pub fn quick_find<Q: Borrow<T>>(
        &self,
        k: &Q,
    ) -> Vec<Result<usize, usize>> {
        self.find_(k.borrow(), Self::quick_find_handle_approx_result)
    }

    #[inline(always)]
    fn quick_find_handle_approx_result(
        &self,
        k: &T,
        i: usize,
        j: usize,
    ) -> Result<usize, usize>
    {
        match k.cmp(&self.m[i][j].0) {
            Less => {
                self.m[i][j].1.and_then(|x| Err(x))
            }
            Equal => {
                self.m[i][j].1
            }
            Greater => {
                self.m[i][j].1.and_then(|x| Err(x + 1))
            }
        }
    }   
}

后续

动态

关于动态分片级联的问题,也就是目标数组被修改时的更新。

这将完全抛弃数组结构而采用经典地采用树型结构,就像前文介绍过的在 B+ 树的数组实现明显区别于和 TreeMap 实现一样,实现的具体细节将完全不同。

由于介绍的细节繁杂,也没有详细的示例,深入下去必然耗费大量时间,且已经偏离了主要介绍的内容,因此等需要的时候再来回顾吧。

应用

计算几何方面的问题

注解

  1. 假设元素类型也是和索引位置一样的类型 ↩

  2. 总感觉分片级联就像斐波那契堆一样,对复杂度采取逐级抵抗的办法,应该进行可以一些类比 ↩

  3. 两两取样也就是分片的大小为 $2$ ,当然也可以选择其他的分片大小 ↩

  4. 因为分片大小是 $2$ ,所以比较两项,如果分片大小是 $4$ 那就比较 $4$ 项 ↩

  5. 可以视作对应分片 $f=1$ 的情况,虽然此时耗费多一半的空间 ↩

© minghu6

·

theme

Simplex theme logo

by golas