Cancel

质数筛子拓展

Algorithm

·

September 23, 2024

前言

介绍一种特殊形式的筛子–增量筛子以及给出使用分段技术的筛子的并行化版本的实现代码。

增量筛子

增量筛子(incremental sieve),就是一种可以无限(内存空间限制内)获取质数的筛子,在不确定质数范围的情境下,配合函数式编程的风格,有它独特的作用。

E. 筛增量筛(√)

可以预先计算 $k$ 个质数,用前 $k$ 个质数筛完 $[0, p_k^2]$ 范围内的质数,如果还要继续筛选,就以 $k=p_k^2$,进行下一轮筛选。

pub fn e_sieve_inf() -> impl Iterator<Item = usize> {
    std::iter::from_coroutine(
        #[coroutine]
        move || {
            let mut p0 = 2;
            let mut pris = vec![2, 3];

            yield 2;
            yield 3;

            loop {
                // ~p0^2
                let p1 = pris.last().unwrap().clone();
                let round = e_seg_sieve_0(
                    &pris,
                    p0 * p0,
                    p1 * p1 - p0 * p0
                )
                .collect::<Vec<usize>>();

                for i in 0..round.len() {
                    yield round[i];
                }

                pris.extend(round);
                p0 = p1;
            }
        },
    )
}

算法评价

性能在增量筛里勉强还算可用。

这只是把普通筛子算法在接口上改造成增量筛子的一个实例,理论上所有依赖预先计算开头一部分质数的一般筛子算法都可以仿照这种方式进行改造。

但是这种方式改造出的增量筛,它的性能曲线并不好看,充满一个个峰值,而好的增量筛子应该(随着质数个数增加),在性能(需求)上呈现平缓增长,贴合质数本身的增长曲线 $\frac{n}{\ln n}$ 。

下面会介绍几个专门的增量筛算法。

Bengelloun 增量筛(√√√)

Bengelloun 增量筛123,也可以称为 LPF 增量筛,基于最小质因数分解 $c = \texttt{lpf}(c)\cdot f$ ,使用固定 $f$ 而在 $p$ 上遍历的思路。4

思路分析

和 GPF 增量筛 思路类似,但是简单很多,强烈建议先看 GPF 增量筛 再回过来看本章节。

每个合数 $c$ 由同一个 $f$ 而更小的一个质数 $p’ \lt p,\ (p’ \leqslant \texttt{lpf}(f))$ 的 $c’ = p’\cdot f$ 标记。

而且简单地只要 $\texttt{lpf}(c)\gt 2$ ,上述标记就成立,只需特别处理偶数的情况。

样例代码

pub fn bengelloun_sieve_inf() -> impl Iterator<Item = usize> {
    std::iter::from_coroutine(
        #[coroutine]
        move || {
            let mut lastp = 2;
            let mut lpf = vec![0; 5]; // +1 cap for index start from 1.

            yield lastp;

            for n in 3.. {
                if n % 2 == 0 {
                    lpf[n] = 2;
                    lpf[n / 2 * 3] = 3;
                }
                else if lpf[n] == 0 {
                    yield n;

                    lpf[lastp] = n;
                    // lpf[n] = n;
                    lastp = n;

                    // => p_next < 2n
                    // => p1 / p0 < 2
                    // => (p1 / p0) * p_next < 4n - 2
                    lpf.resize(n * 4, 0);
                } else {
                    let p = lpf[n]; // lp0 > 2
                    let f = n / p;

                    // min(f, lpf[f]) = truly lpf[f]
                    if p < min(f, lpf[f]) {
                        let p1 = lpf[p];  // next prime after p

                        lpf[p1 * f] = p1;
                    }
                }
            }
        },
    )
}

如果按照 GPF 增量筛的风味,可以把 lpf[n / 2 * 3] = 3; 放到 else 的代码块里。

$\texttt{lpf}$ 扩容

同样类似于 GPF 增量筛,下一个质数 $\lt 2n$ ,在此过程中最大的合数 $c \lt 2n-1 \lt 2n$ :

  1. $c$ 是偶数,扩张 $3 \div 2 = 1.5 \lt 2$ 倍;
  2. $c$ 不是偶数,扩张 相邻两个质数之比,$\lt 2$ 倍。

因此在产生新的质数 $n$ 后扩容到 $4n$ 即可。

储存 $\texttt{next}$

利用和 GPF 增量筛同样的方式在 lpf 数组上存储 $\texttt{next}(p)$ 。

可以通过 min(f, lpf[f]) 来比较简洁地得到真正的 $\texttt{lpf}(f)$ 。

算法评价

目前最快的一档增量筛。

GPF 增量筛(√√√)

GPF 增量筛2,和普通 GPF 筛子一样基于最大质因数分解 $c = f\cdot\texttt{gpf}(c)$,但主要以 $f$ 为常量,以 $p \geqslant \texttt{gpf}(f)$ 为增量。

思路分析

回顾下普通的 GPF 筛子,是以 $p$ 为常量,遍历所有 $f\in \lbrace f\ \vert\ \texttt{gpf}(f) \leqslant p ,\ p\cdot f\leqslant N \rbrace$ 。

但是增量筛子并不能一次性求取一个预先确定范围的质数,而是按照大小的顺序逐个求取,如果仍然固定 $p$ ,由于不等式 $\texttt{gpf}(f) \leqslant p$ ,必须考虑最大质因数 $\leqslant p$ 范围内的每个级别的 $f$ ,或者说因为并不知道这些级别的 $f$ 之间的大小关系, 所以没办法快速得到 $\texttt{next}(f)$ 。

于是我们考虑固定 $f$ ,那么显然 $p$ 有一个确定的值的顺序,开始值是 $p=\texttt{gpf}(f)$ ,按照求解出的质数顺序逐步增加即可。

但是 $c = f\cdot\texttt{gpf}(f)$ 本身又该如何标记呢?这下只能重新固定 $p$ ,由前一个满足 $\texttt{gpf}(f) = p $ 的 $f$ 5 来标记,而满足条件的最小的 $f=p$ ,需要我们手动进行标记。

让我们把问题总结一下:

一个合数 $c = f \cdot p,\ (\texttt{gpf}(f) \leqslant p)$ ,

如果 $\texttt{gpf}(f) = p$ ,

           那么如果 $f=p$ ,那它是在发现某个数 $n=p^2$ 时被手动标记;

           否则,它是在前一个 $g\cdot p,\ (\texttt{gpf}(g) = p)$ 时被标记。

否则 $\texttt{gpf}(f) \lt p$ ,它是在前一个 $f\cdot q,\ (q \lt p)$ 时被标记。

样例代码

/// *LINEAR PRIME-NUMBER SIEVES: A FAMILY TREE:* Algorithm 4.4.
pub fn gpf_sieve_inf() -> impl Iterator<Item = usize> {
    std::iter::from_coroutine(
        #[coroutine]
        move || {
            let mut lastp = 2;
            let mut sqrtp = 2; // minimal (for square) prime >= n
            let mut gpf = vec![0; 2 * 2 + 1]; // (p, f)

            yield 2;

            for n in 3.. {
                if n == sqrtp * sqrtp {
                    gpf[n] = sqrtp; // add starter
                    sqrtp = gpf[sqrtp]; // point to next prime after sqrtp
                }

                if gpf[n] == 0 {
                    yield n;

                    // gpf[n] = n;
                    // C_max < p_next < 2n => (p1/p0 or 2) * C_max < 4n
                    gpf[lastp] = n; // point to next prime
                    lastp = n;
                    gpf.resize(4 * n, 0);
                } else {
                    let p = gpf[n];
                    let f = n / p;
                    let p1 = gpf[p]; // next prime

                    gpf[p1 * f] = p1;

                    // min(f, gpf[f]) is truly gpf[f]
                    if p == min(f, gpf[f]) {
                        let mut f0 = f / p + 1;

                        while min(f0, gpf[f0]) > p {
                            f0 += 1;
                        }

                        gpf[f0 * p * p] = p;
                    }
                }
            }
        },
    )
}

还有几个实现的细节问题需要结合代码进行解释。

$\texttt{gpf}$ 扩容

根据增量筛的语义,应当在生成一个质数后,在下一个质数生成时对数组 gpf 进行扩容。

扩容大小做如下分析,如果当前生成的质数是 $n$ ,那么下一个质数 $\lt 2n$ 6 。

而且可以发现,每个数字标记的合数不超过自身的 $2$ 倍,分析如下:

  1. 如果是 $p$ 增加的标记,那么相邻质数之比不超过 $2$;

  2. 如果是 $f$ 增加的标记,虽然有 $\texttt{gpf}(f) = p$ 的限制,但因为 $2$ 是最小的质数,$2f$ 一定在下一个 $f$ 的序列里。

因此,到达下一个质数过程中最大的合数 $c\lt 2n$ ,那么最大标记的数为 $2c \lt 4n$ ,因此直接将数组 gpf 扩容到 $4n$ 即可。

检查平方质数

根据算法,我们需要手动检测当前值是否是某个质数的平方。

这乍看上去是个棘手的任务,好在质数也是按照增序的方式产生,实际上只需要追踪当前 $\gt n$ 的最小质数,代码里我们使用 sqrtp 这个变量来做这个事情。

储存 $\texttt{next}$

原文构想使用一个大小为 $2n$ 的数组去为每个质数存储 $\texttt{next}(f)$ 和 $\texttt{next}(p)$ ,但实际并无特别的必要。

$\texttt{next}(f)$ 可以延迟到需要时再计算,而 $\texttt{next}(p)$ 的存储可以仿照 Bengelloun 增量筛 一样利用 gpf 既有的空间:

对已发现的质数 $p$,gpf[p] 的空间实际是没有用的(代码里 gpf[n] = n 这行代码直接被注释掉了),可以利用它来存储 $\texttt{next}(p)$ 。

于是代码里使用了 lastp 来保存最后发现的质数,当发现新质数时就令 gpf[lastp] 指向这个新质数。

只不过这样的改动会稍微影响计算 $\texttt{next}(f)$ ,所以需要把 $f=p$ 的情况单独拿出来讨论,可以通过 min(f, gpf[f]) 来比较简洁地得到真正的 $\texttt{gpf}(f)$ 。

算法评价

目前最快的一档增量筛。

并行化实现

可以发现支持分片技术的筛子也意味着支持并行化处理。

分段 E. 筛

/// set amount of cpu cores used in parallel tasks
pub static USED_CPU_CORES_NUM: usize = 3;

pub fn e_seg_sieve_p(n: usize) -> Box<dyn Iterator<Item = usize>> {
    sieve_spec_case!(n; [0, 4]);

    let delta = n.isqrt();
    let pris = e_sieve(delta).collect::<Vec<usize>>();

    fn sieve_subtask<'a>(
        pris: &'a [usize],
        l0: usize,
        l1: usize,
        delta: usize,
    ) -> Vec<usize> {
        let mut ans = vec![];

        let mut seg = BitVec::from_elem(delta + 1, true);

        for l in (l0..=l1).step_by(delta)  {
            let actual_delta = min(delta, l1 - l);

            for &p in pris.iter() {
                for i in (p - l % p..=actual_delta).step_by(p) {
                    seg.set(i, false);
                }
            }

            for i in 1..=actual_delta {
                if seg[i] {
                    ans.push(l + i);
                }
            }

            seg.set_all();
        }

        ans
    }

    let ans = thread::scope(|scope| {
        let pris_ref = &pris;

        let mut l = delta;

        let mut k_dekta = (n / delta - 1) / USED_CPU_CORES_NUM;

        if l + k_dekta * delta < n {
            k_dekta += 1;
        }

        let mut handles = vec![];

        for _ in 0..USED_CPU_CORES_NUM {
            handles.push(scope.spawn(move || {
                sieve_subtask(pris_ref, l, min(l + k_dekta * delta, n), delta)
            }));

            l += k_dekta * delta;
        }

        handles
            .into_iter()
            .map(|handle| handle.join().unwrap())
            .flatten()
            .collect::<Vec<usize>>()
    });

    Box::new(pris.into_iter().chain(ans.into_iter()))
}

分段固定轮筛

pub fn fixed_wheel_seg_sieve_p(n: usize) -> Box<dyn Iterator<Item = usize>> {
    let k = 7;
    let WStruct { wheel_gap: wg, .. } = &W[k];

    if n == 0 {
        return Box::new(std::iter::empty());
    }

    let delta = n.isqrt();

    let pris = e_seg_sieve(delta).collect::<Vec<usize>>();
    let np = pris.len();

    let mut v = 1 + wg[0]; // v = p_{k+1}
    let mut vi = 1;

    if np <= k {
        // Just rolling to n

        let mut ans = vec![];

        for i in 1..=k {
            if P[i] > n {
                return Box::new(ans.into_iter());
            }

            ans.push(P[i]);
        }

        while v <= n {
            ans.push(v);

            v += wg[vi];
            vi = (vi + 1) % wg.len();
        }

        return Box::new(ans.into_iter());
    }

    fn sieve_subtask(
        k: usize,
        pris: &[usize],
        np: usize,
        l0: usize,
        l1: usize,
        delta: usize,
    ) -> Vec<usize> {
        let WStruct {
            wheel: w,
            wheel_gap: wg,
            prod,
            // ipm,
            ..
        } = &W[k];

        let mut ans = vec![];

        /* Init v, vi */

        let v_raw: usize = l0 + 1;
        let (mut v, mut vi) = locate_in_wheel(w, *prod, v_raw);

        /* Init factors */
        // absolute value
        let mut factors = vec![];

        for i in 0..np - k {
            let p = pris[k + i];
            let f_raw = l0 / p + 1;

            factors.push(locate_in_wheel(w, *prod, f_raw));
        }

        /* Run the algorithm */

        let mut bits = BitVec::from_elem(delta + 1, true);

        for l in (l0..=l1).step_by(delta) {
            let end = min(l + delta, l1);

            /* sift for p_k..p_np */

            for i in 0..np - k {
                let p = pris[k + i];
                let (mut f, mut j) = factors[i];

                while p * f <= end {
                    bits.set(p * f - l, false);

                    f += wg[j];
                    j = (j + 1) % wg.len();
                }

                factors[i] = (f, j);
            }

            /* accumulate primes */

            while v <= end {
                if bits[v - l] {
                    ans.push(v);
                }

                v += wg[vi];
                vi = (vi + 1) % wg.len();
            }

            /* reset for next segment */

            bits.set_all();
        }

        ans
    }

    let ans = thread::scope(|scope| {
        let pris_ref = &pris;

        let np = pris
        .iter()
        .enumerate()
        .rev()
        .find(
            |(_, &p)|  p * p <= n
        )
        .map(|(i, _)| i + 1)
        .unwrap_or(0);

        let mut l = delta;

        let mut k_dekta = (n / delta - 1) / USED_CPU_CORES_NUM;

        if l + k_dekta * delta < n {
            k_dekta += 1;
        }

        let mut handles = vec![];

        for _ in 0..USED_CPU_CORES_NUM {
            handles.push(scope.spawn(move || {
                sieve_subtask(
                    k,
                    pris_ref,
                    np,
                    l,
                    min(l + k_dekta * delta, n),
                    delta,
                )
            }));

            l += k_dekta * delta;
        }

        handles
            .into_iter()
            .map(|handle| handle.join().unwrap())
            .flatten()
            .collect::<Vec<usize>>()
    });

    Box::new(pris.into_iter().chain(ans.into_iter()))
}

尾声

源代码:Rust 和 Python

注解

  1. S.A. Bengelloun. An Incremental Primal Sieve. 1986. ↩

  2. Paul Pritchard. LINEAR PRIME-NUMBER SIEVES: A FAMILY TREE. 1987. ↩ ↩2

  3. 不知道为什么,在 Pritchard 的论文引用里,Bengelloun 一律被写作 Bengalloun ↩

  4. 对于增量筛子,固定 $f$ 而在 $p$ 上遍历的算法设计本身就有天然优势 ,所以后面 GPF 增量筛子也采用了这种设计,原因的具体分析二者也是相似的,所以这里省略了,直接参考 GPF 增量筛子即可。 ↩

  5. 寻找 $f$ 这一步的复杂度分析本质上还要研究邻近质数的相关规律,但现在数学上好像还不能把这个规律搞得很清楚,不过根据经验判断,这一步的时间复杂度近似常量。同时这一步,只发生在遇到 $n = f’\cdot p^2,\ \texttt{gpf}(f’) = p$ 的时候。 ↩

  6. 根据前文介绍过伯特兰-切比雪夫定理 ↩

© minghu6

·

theme

Simplex theme logo

by golas