Cancel

BT(2) - B+树(TreeMap)

Algorithm

·

March 14, 2023

1st revision - 2025-07-024

序言

没错,现在我们就是要用前文实现的 B+ 树作为我们新 B+ 树的节点基础。当然这是一个递归的概念,我们也可以继续下去把这个新的 B+ 树的实现作为节点基础实现 B+ 树,如此 $3$ 层、$4$ 层的嵌套。但从 $m$ 常数的角度,两层就够了,再多也没有意义,通常 $m$ 也就是一百左右,次级的 B+ 树的 $m$ 最多也就是就是十几,已经什么空间了,何况因此获得的性能增长也呈指数级下降。

但是实现用前面实现的 B+ 树作为节点的基础,还需要实现额外的一些 public method ,包括增加字段,比如统计键值总数的 cnt 字段等等。但是如何扩写 Vec 版本的 B+ 树一时难以论说,事实上扩写&重构改动的代码比原代码还长,所以就把它放到下一篇介绍,那时将得到一个相对完整的 Vec 版本的 B+ 树。

这里会列出一个作为节点实现 B+ 树的 TreeMap 需要实现的方法,既作为下一篇的索引,也作为用其他 TreeMap 代替实现时的参考。

数据结构

树

这里增加了 cnt 统计字段和 min_node 用于快速范查

impl_tree!(
    /// B+ Trees powered by B+ tree
    ///
    BPT2 { cnt: usize, min_node: WeakNode<K, V> }
);

节点包装

impl_node!();

内部节点

TreeMap 实现的 B+ 树和数组实现的 B+ 树在内部节点的设计上有质的不同。

数组版本的,键值和孩子是按照索引来排的,而 TreeMap 则是按照唯一的键值来排列的。这既省了事儿又另一方面多了事儿,整个节点状态维护的逻辑都发生了变化。

首先从节点布局上,内部节点的 keys 字段没有存在的意义,直接使用 children map,但是这样对 map 的键的维护就成了一个关键。如何选取一个合适的键来代表孩子呢?

有两种明显的思路:

  1. 就用节点的最小值作为键,这样查询起来非常快,可以快速失败,但是当节点的最小键发生变化时,必须向上更新 map 来维护这一性质;
  2. 那么用任意孩子节点的任一一个键就不需要始终维护了,但这样的话查询的时候就不知道到底要落到哪一个键上,就查询左右两个键的孩子的所有键值也解决不了问题,因为它们也不一定都是最小键

于是显然,我们采用把最小键最为子节点在父节点上的键的设计。

const SUB_M: usize = 20;

enum Node_<K, V> {
    Internal {
        children: BPT<K, Node<K, V>, SUB_M>,
        paren: WeakNode<K, V>,
    },
    Leaf {
        entries: BPT<K, V, SUB_M>,
        /// Successor (Leaf)
        succ: WeakNode<K, V>,
        paren: WeakNode<K, V>,
    },
}
use Node_::*;


def_attr_macro_bpt!(paren, succ, entries, children);


impl<K, V> Node_<K, V> {
    fn is_leaf(&self) -> bool {
        matches!(self, Leaf { .. })
    }

    def_node__heap_access!(internal, children, BPT<K, Node<K, V>, SUB_M>);
    def_node__heap_access!(leaf, entries, BPT<K, V, SUB_M>);

    def_node__wn_access!(both, paren);
    def_node__wn_access!(leaf, succ);
}

基础方法

搜索到叶子

这里会用到 TreeMap 里面的一个配合方法,low_bound_search ,是搜索 $\geqslant k$ 的第一个节点。

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    #[inline(always)]
    fn search_to_leaf_r<Q>(mut x: &Node<K, V>, k: &Q) -> Node<K, V>
    where
        K: Borrow<Q> + Ord,
        Q: Ord + ?Sized,
    {
        while x.is_internal() {
            if let Some(x_) = children!(x).low_bound_search(k) {
                x = x_;
            } else {
                return Node::none();
            }
        }

        x.clone()
    }
}

节点访问

impl<K, V> Node<K, V> {
    fn is_leaf(&self) -> bool {
        self.is_some() && attr!(self | self).is_leaf()
    }

    fn is_internal(&self) -> bool {
        self.is_some() && !attr!(self | self).is_leaf()
    }

    fn min_key(&self) -> Option<&K>
    where
        K: Ord,
    {
        if self.is_none() {
            None
        } else if self.is_internal() {
            children!(self).min_key()
        } else {
            entries!(self).min_key()
        }
    }
}


macro_rules! min_key {
    ($x:expr) => {
        if let Some(k) = $x.min_key() {
            k.clone()
        } else {
            panic!("No min-key {:?}", $x)
        }
    };
}

创建节点

macro_rules! node {
    (kv| $k:expr, $v:expr) => { {
        // overflow is M
        let mut entries = BPT::new();
        entries.insert($k, $v);

        node!(basic-leaf| entries, WeakNode::none(), WeakNode::none())
    } };
    (basic-leaf| $entries:expr, $succ:expr, $paren:expr) => { {
        aux_node!(FREE-ENUM Leaf {
            entries: $entries,
            succ: $succ,
            paren: $paren
        })
    } };
    (basic-internal| $children:expr, $paren:expr) => { {
        aux_node!(FREE-ENUM Internal {
            children: $children,
            paren: $paren
        })
    } };
}

一般公开方法

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
	pub fn new() -> Self {
        assert!(M > 2, "M should be greater than 2");

        Self {
            root: Node::none(),
            cnt: 0,
            min_node: WeakNode::none(),
        }
    }

    #[inline(always)]
    pub fn len(&self) -> usize {
        self.cnt
    }

    pub fn get<Q>(&self, k: &Q) -> Option<&V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        mut_self!(self).get_mut(k).map(|v| &*v)
    }

    pub fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let x = Self::search_to_leaf_r(&self.root, k);

        // Nil
        if x.is_none() {
            None
        }
        // Leaf
        else {
            entries_mut!(x).get_mut(k)
        }
    }
}

范围查询

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
	pub fn select<'a, Q, R>(&self, range: R) -> impl Iterator<Item = &V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized + 'a,
        R: RangeBounds<&'a Q> + Clone,
    {
        mut_self!(self).select_mut(range).map(|v| &*v)
    }

    pub fn select_mut<'a, Q, R>(
        &mut self,
        range: R,
    ) -> impl Iterator<Item = &mut V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized + 'a,
        R: RangeBounds<&'a Q> + Clone,
    {
        /* find start_node */

        let mut cur;

        if self.cnt == 0 {
            cur = Node::none();
        } else {
            match range.start_bound() {
                Included(&k) => {
                    cur = Self::search_to_leaf_r(&self.root, k.borrow());

                    if cur.is_none() {
                        cur = self.min_node.upgrade();
                    }
                }
                Excluded(_) => unimplemented!(),
                Unbounded => {
                    cur = self.min_node.upgrade();
                }
            }
        }

        std::iter::from_generator(move || {
            while cur.is_some() {
                let mut entries = entries_mut!(cur).select_mut(range.clone());

                if let Some(fst) = entries.next() {
                    yield fst;

                    for ent in entries {
                        yield ent
                    }

                    cur = succ!(cur);
                } else {
                    break;
                }
            }
        })
    }
}

插入

主流程

插入的时候,检查是否需要更新 map 的索引。

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    pub fn insert(&mut self, k: K, v: V) -> Option<V>
    where
        K: Clone + Debug,
        V: Debug,
    {
        let x = Self::search_to_leaf_r(&self.root, &k);

        self.insert_into_leaf(x, k, v)
    }
    
    fn insert_into_leaf(&mut self, mut x: Node<K, V>, k: K, v: V) -> Option<V>
    where
        K: Clone + Debug,
        V: Debug,
    {
        /* new min none */

        if x.is_none() {
            if self.cnt == 0 {
                self.root = node!(kv | k, v);
                self.min_node = self.root.downgrade();

                self.cnt += 1;

                return None;
            } else {
                x = self.min_node.upgrade();
            }
        }

        /* insert into leaf */

        debug_assert!(x.is_leaf());

        let old_k = min_key!(x);

        let popped = entries_mut!(x).insert(k, v);

        Self::update_index(old_k, &x);

        if entries!(x).len() == M {
            self.promote(x);
        }

        if popped.is_none() {
            self.cnt += 1;
        }

        popped
    }
}

更新索引

索引更新需要一路向上检查

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    #[inline(always)]
    fn update_index(old_k: K, x: &Node<K, V>)
    where
        K: Clone + Debug,
    {
        let k = min_key!(x);
        let p = paren!(x);

        if old_k != k && p.is_some() {
            /* update x key */

            let old_p_k = min_key!(p);

            children_mut!(p).remove(&old_k);
            children_mut!(p).insert(k.clone(), x.clone());

            Self::update_index(old_p_k, &p);
        }
    }
}

节点提升

到目前位置 TreeMap 的节点实现给我们增加了很多麻烦,但当分裂节点的时候,就很简单了1,这里的 split_off 的表现和 Vec 的一致。

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {    
	fn promote(&mut self, x: Node<K, V>)
    where
        K: Clone + Debug,
        V: Debug,
    {
        debug_assert!(x.is_some());

        /* split node */

        let x2;

        if x.is_leaf() {
            debug_assert_eq!(entries!(x).len(), Self::entries_high_bound());

            // keep key for leaf
            let entries_x2 = entries_mut!(x).split_off(M.div_floor(2));

            x2 = node!(
                basic - leaf | entries_x2,
                succ!(x).downgrade(),
                WeakNode::none()
            );

            succ!(x, x2.clone());
        } else {
            debug_assert_eq!(
                children!(x).len(),
                Self::entries_high_bound() + 1
            );

            let children_x2 = children_mut!(x).split_off((M + 1).div_floor(2));

            x2 = node!(basic - internal | children_x2, WeakNode::none());

            for child in children_mut!(x2).values() {
                paren!(child, x2.clone());
            }
        }

        let p = paren!(x);
        let x_k = min_key!(x);
        let x2_k = min_key!(x2);

        /* push new level */
        if p.is_none() {
            let children = bpt! {
                x_k => x,
                x2_k => x2
            };

            self.root = node!(basic - internal | children, WeakNode::none());

            for child in children_mut!(self.root).values() {
                paren!(child, self.root.clone());
            }
        }
        /* insert into paren node */
        else {
            paren!(x2, p.clone());

            // insert new or update
            children_mut!(p).insert(x2_k, x2);

            if children!(p).len() == Self::entries_high_bound() + 1 {
                self.promote(p);
            }
        }
    }
}

删除

主流程

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
    where
        K: Borrow<Q> + Clone + Debug,
        Q: Ord + ?Sized,
        V: Debug,
    {
        let x = Self::search_to_leaf_r(&self.root, k);

        self.remove_on_leaf(x, k)
    }

    fn remove_on_leaf<Q>(&mut self, x: Node<K, V>, k: &Q) -> Option<V>
    where
        K: Borrow<Q> + Clone + Debug,
        Q: Ord + ?Sized,
        V: Debug,
    {
        if x.is_none() {
            return None;
        }

        debug_assert!(x.is_leaf());

        let old_k = min_key!(x);

        let popped = entries_mut!(x).remove(k);

        if popped.is_some() {
            self.cnt -= 1;
        }

        if entries!(x).len() == 0 {
            if self.cnt == 0 {
                self.root = Node::none();
                self.min_node = WeakNode::none();
            } else {
                self.unpromote(x, old_k);
            }
        } else {
            Self::update_index(old_k, &x);
        }

        popped
    }
}

节点下降

在本方法和后面的重平衡方法里更新索引时很容易出错,需要仔细考虑,比如同时更新两个索引时可能会出现键值覆盖的问题。

rank:查询键排名,从 $0$ 开始

nth:根据排名,返回键值,从 $0$ 开始

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {   
    fn unpromote(&mut self, mut x: Node<K, V>, old_k: K)
    where
        K: Clone + Debug,
        V: Debug,
    {
        debug_assert!(paren!(x).is_some());
        debug_assert!(
            x.is_leaf()
                || children!(x).len() == Self::entries_low_bound() - 1 + 1
        );

        let p = paren!(x);
        let idx = children!(p).rank(&old_k);

        if Self::try_rebalancing(&x, &x, idx, old_k.clone()) {
            return;
        }

        // merge with left_sib (no need to update index)
        if idx > 0 {
            let (_sib_k, sib_lf) = children!(p).nth(idx - 1).unwrap();

            children_mut!(p).remove(&old_k);

            if x.is_leaf() {
                succ!(sib_lf, succ!(x));
            } else {
                debug_assert!(x.is_internal());

                for (child_k, child) in children_mut!(x).drain_all() {
                    paren!(child, sib_lf.clone());
                    children_mut!(sib_lf).push_back(child_k, child);
                }
            }
        }
        // for right_sib (merge into left)
        else {
            let (_sib_k, sib_rh) = children!(p).nth(idx + 1).unwrap();
            let sib_rh = sib_rh.clone();

            // 由于没有prev,只能总是合并到左节点,好在此时叶子节点只有一个元素
            if x.is_leaf() {
                debug_assert_eq!(entries!(sib_rh).len(), 1);

                succ!(x, succ!(sib_rh));

                let (k, v) = entries_mut!(sib_rh).pop_first().unwrap();
                children_mut!(p).remove(&k);
                entries_mut!(x).insert(k.clone(), v);

                // remove x from p
                Self::update_index(old_k, &x);

                // 不需要更新索引,因为 sib_rh 值更大
            } else {
                let old_key_sib_rh = min_key!(sib_rh);
                children_mut!(p).remove(&old_key_sib_rh);

                for (child_k, child) in children_mut!(sib_rh).drain_all() {
                    paren!(child, x.clone());
                    children_mut!(x).push_back(child_k, child);
                }

                Self::update_index(old_k, &x);
            }
        }

        let old_key = min_key!(p);
        x = p;

        if paren!(x).is_none() {
            if children!(x).len() == 1 {
                // pop new level
                self.root = children_mut!(x).pop_first().unwrap().1;
                paren!(self.root, Node::none());
            }
        } else {
            if children!(x).len() < Self::entries_low_bound() + 1 {
                self.unpromote(x, old_key);
            }
        }
    }
}

尝试重平衡

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    fn try_rebalancing(
        p: &Node<K, V>,
        x: &Node<K, V>,
        idx: usize,
        old_k: K,
    ) -> bool
    where
        K: Clone + Debug,
        V: Debug,
    {
        // try to redistribute with left
        if idx > 0 {
            let (_sib_k, sib_lf) = children!(p).nth(idx - 1).unwrap();

            if sib_lf.is_leaf() && entries!(sib_lf).len() > 1 {
                let (k, v) = entries_mut!(sib_lf).pop_last().unwrap();
                entries_mut!(x).insert(k, v);

                Self::update_index(old_k, x);

                return true;
            }

            if sib_lf.is_internal()
                && children!(sib_lf).len() > Self::entries_low_bound() + 1
            {
                let (k, v) = children_mut!(sib_lf).pop_last().unwrap();
                paren!(v, x.clone());
                children_mut!(x).insert(k, v);

                Self::update_index(old_k, x);

                return true;
            }
        }
        // try to redistribute with right then
        if idx < children!(p).len() - 1 {
            let (_sib_k, sib_rh) = children!(p).nth(idx + 1).unwrap();
            let sib_rh = sib_rh.clone();

            if sib_rh.is_leaf() && entries!(sib_rh).len() > 1 {
                let (k, v) = entries_mut!(sib_rh).pop_first().unwrap();
                entries_mut!(x).insert(k.clone(), v);

                Self::update_index(old_k, x); // WARNING: it wll replace sib_rh with x
                children_mut!(p).insert(min_key!(sib_rh), sib_rh);

                return true;
            }

            if sib_rh.is_internal()
                && children!(sib_rh).len() > Self::entries_low_bound() + 1
            {
                let sib_rh_old_key = min_key!(sib_rh);

                let (k, v) = children_mut!(sib_rh).pop_first().unwrap();
                paren!(v, x.clone());
                children_mut!(x).insert(k.clone(), v);

                // 不需要像叶子节点那样两段儿更新是因为在叶子更新的时候,已经对 x 向上更新过了
                // Self::update_index(old_k, x);
                // children_mut!(p).insert(min_key!(sib_rh), sib_rh);

                Self::update_index(sib_rh_old_key, &sib_rh);

                return true;
            }
        }

        false
    }
}

测试

打印方法

impl<K: Ord + Debug, V, const M: usize> Debug for BPT2<K, V, M> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
        f.debug_struct("BPT2")
            .field("root", &self.root)
            .field("cnt", &self.cnt)
            .finish()
    }
}


impl<K: Debug + Ord, V> Debug for Node_<K, V> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
        match self {
            Self::Internal { children, .. } => f
                .debug_struct("Internal")
                .field("children", children)
                .finish(),
            Self::Leaf { entries, .. } => {
                f.debug_struct("Leaf").field("entries", entries).finish()
            }
        }
    }
}


#[cfg(test)]
impl<K: Ord + Debug + Clone, V, const M: usize> BPT2<K, V, M> {
    fn debug_print(&self)
    where
        V: Debug,
    {
        use common::vecdeq;

        /* print header */

        println!("{self:?}");

        /* print body */

        if self.root.is_none() {
            return;
        }

        let mut this_q = vecdeq![vec![self.root.clone()]];
        let mut lv = 1;

        while !this_q.is_empty() {
            println!();
            println!("############ Level: {lv} #############");
            println!();

            let mut nxt_q = vecdeq![];

            while let Some(children) = this_q.pop_front() {
                for (i, x) in children.iter().enumerate() {
                    let p = paren!(x);

                    if x.is_internal() {
                        nxt_q.push_back(
                            children!(x).values().cloned().collect(),
                        );
                        println!("({i:02}): {x:?} (p: [{p:?}])");
                    } else {
                        let succ = succ!(x);
                        println!(
                            "({i:02}): {x:?} (p: [{p:?}], succ: [{succ:?}])"
                        );
                    }
                }

                println!();
            }


            this_q = nxt_q;
            lv += 1;
        }

        println!("------------- end --------------\n");
    }
}

校验方法

impl<K: Ord + Debug + Clone, V, const M: usize> BPT2<K, V, M> {    
	fn validate(&self)
    where
        K: Debug + std::hash::Hash,
        V: Debug,
    {
        if self.root.is_none() {
            return;
        }

        use std::collections::VecDeque;

        use common::vecdeq;

        let mut cur_q = vecdeq![vecdeq![Node::none(), self.root.clone()]];

        while !cur_q.is_empty() {
            let mut nxt_q = vecdeq![];
            let mut leaf_num = 0;
            let mut internal_num = 0;

            while let Some(mut group) = cur_q.pop_front() {
                let p = group.pop_front().unwrap();

                for child in group.iter() {
                    assert!(child.is_some());

                    if child.is_internal() {
                        assert!(
                            children!(child).len()
                                < Self::entries_high_bound() + 1
                        );

                        // children!(child).validate();
                    } else {
                        assert!(
                            entries!(child).len() < Self::entries_high_bound()
                        );
                        // entries!(child).validate();
                    }

                    // Exclude leaf
                    if child.is_leaf() {
                        leaf_num += 1;
                    } else {
                        // Exclude the root (which is always one when it's internal node)
                        if paren!(child).is_some() {
                            assert!(
                                children!(child).len()
                                    >= Self::entries_low_bound() + 1
                            );
                        } else {
                            assert!(children!(child).len() >= 2);
                        }

                        internal_num += 1;

                        let mut nxt_children = VecDeque::from_iter(
                            children!(child).values().cloned(),
                        );
                        nxt_children.push_front(child.clone());

                        nxt_q.push_back(nxt_children);
                    }
                }

                // All leaves are in same level
                assert!(
                    leaf_num == 0 || internal_num == 0,
                    "leaf: {leaf_num}, internal: {internal_num}"
                );
            }

            cur_q = nxt_q;
        }
    }
}

方法统计

以 Trait 形式总结下作为节点基础的数据结构所需要实现的接口方法

pub trait TreeMap: Sized {
    type K;
    type V;

    fn new() -> Self;

    fn len(&self) -> usize;

    fn is_empty(&self) -> bool {
        self.len() == 0
    }

    fn first(&self) -> Option<(&Self::K, &Self::V)> {
        self.nth(0)
    }

    fn first_value(&self) -> Option<&Self::V> {
        self.nth(0).map(|(_k, v)| v)
    }

    fn nth(&self, idx: usize) -> Option<(&Self::K, &Self::V)>;

    fn nth_value_mut(&mut self, idx: usize) -> Option<&mut Self::V>;

    fn rank<Q>(&self, k: &Q) -> Result<usize, usize>
    where
        Self::K: std::borrow::Borrow<Q>,
        Q: Ord + ?Sized;

    ///
    /// `Some(Ok(&V)): node.k == k`
    ///
    /// `Some(Err(&V)): node.k < k`
    ///
    /// `None: k < min_key`
    fn lower_bound_search<Q>(
        &self,
        k: &Q,
    ) -> Option<Result<&Self::V, &Self::V>>
    where
        Self::K: std::borrow::Borrow<Q>,
        Q: Ord + ?Sized;

    fn get_min_key(&self) -> Option<&Self::K>;

    fn get<Q>(&self, k: &Q) -> Option<&Self::V>
    where
        Self::K: std::borrow::Borrow<Q>,
        Q: Ord + ?Sized;

    fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut Self::V>
    where
        Self::K: std::borrow::Borrow<Q>,
        Q: Ord + ?Sized;

    fn iter(&self) -> impl Iterator<Item = (&Self::K, &Self::V)>;

    fn drain_all(&mut self) -> impl Iterator<Item = (Self::K, Self::V)>;

    fn keys(&self) -> impl Iterator<Item = &Self::K> {
        self.iter().map(|(k, _v)| k)
    }

    fn values(&self) -> impl Iterator<Item = &Self::V> {
        self.iter().map(|(_k, v)| v)
    }

    fn values_mut(&mut self) -> impl Iterator<Item = &mut Self::V>;

    fn insert(&mut self, k: Self::K, v: Self::V) -> Option<Self::V>
    where
        Self::K: Ord;

    fn remove<Q>(&mut self, k: &Q) -> Option<Self::V>
    where
        Self::K: std::borrow::Borrow<Q>,
        Q: Ord + ?Sized;

    ///
    /// replace `(old_k, v)` to `(new_k, v)`
    ///
    /// if `old_k` == `new_k`, do nothing;
    ///
    /// if `old_k` doesn't exist, do nothing;
    ///
    /// if `new_k` exist,replace and return old value.
    ///
    /// There is a low efficient implementation.
    fn update_index<Q>(&mut self, old_k: &Q, new_k: Self::K) -> Option<Self::V>
    where
        Self::K: std::borrow::Borrow<Q> + Clone + Ord,
        Q: Ord,
    {
        self.remove(old_k).map(|x| self.insert(new_k, x)).flatten()
    }

    fn pop_first(&mut self) -> Option<(Self::K, Self::V)>;

    fn pop_last(&mut self) -> Option<(Self::K, Self::V)>;

    /// k should be less than min_key
    fn push_front(&mut self, k: Self::K, v: Self::V);

    /// k should be greate than max_key
    fn push_back(&mut self, k: Self::K, v: Self::V);

    /// split as [0, at) and return [at, len)
    ///
    /// # Panics
    ///
    /// Panics if at > len.
    fn split_off(&mut self, at: usize) -> Self;

    #[cfg(test)]
    fn validate(&self)
    where
        Self::K: Ord,
    {
        assert!(self.keys().is_sorted())
    }

    fn with_kv(mut self, k: Self::K, v: Self::V) -> Self
    where
        Self::K: Ord,
    {
        self.insert(k, v);
        self
    }
}

总结与延伸

性能比较

  1. 查询性能非常好,比 Rust 标准库里 HashMap 的查询速度还要快 25%;
  2. 插、删的性能介于传统 BST 和 B 树、之前版本的 B+ 树之间

在理论分析和在不同规模测试数据下的性能比较可以发现,这种实现的性能提高并不来源于理论时间复杂度的改善2,而是由于 TreeMap 实现的节点的内存布局上更加缓存友好,也即键-值聚合的单一数组比起键-值分离的两个数组更加缓存友好。

进一步改进

我们从中获得启发,如果进一步改进节点内存布局的局部性是不是就会得到一个实践上更好性能的数据结构? 而显然以 B+ 树为基础的节点局部性不如连续的内存构成的向量。

再进一步考虑,这个单独分配内存空间的向量不如直接在原地划分数组,不过这样需要一个变量手动追踪有效内容长度。

特别地,对 Rust 单一所有权的语义机制需要进行特别考虑,可以使用 Option 类型包装,但是需要付出不必要的额外内存代价,而这是作为底层数据结构所不愿看到的,这样就需要使用 MaybeUninit 类型然后自己维护数组操作过程中的单一所有权问题。

改进版数据结构3:

pub struct StackVec<T, const C: usize> {
    len: usize,
    arr: [MaybeUninit<T>; C],
}

#[repr(transparent)]
struct StackVecMap<K, V, const C: usize>(StackVec<KVEntry<K, V>, C>);

enum Node<K, V, const M: usize> {
    Leaf {
        entries: StackVecMap<K, V, M>,
        next: Option<Ptr<Self>>,
        paren: Option<Ptr<Self>>,
    },
    Internal {
        children: StackVecMap<K, OwnedPtr<Self>, M>,
        paren: Option<Ptr<Self>>,
    },
}

#[repr(transparent)]
pub struct OwnedPtr<T: ?Sized> {
    value: Box<T>,
}

#[repr(transparent)]
pub struct Ptr<T: ?Sized> {
    value: NonNull<T>,
}

pub struct KVEntry<K, V>(pub K, pub V);

作为对比,Rust 标准库里 BTreeMap 采取了如下数据结构4:

/// The underlying representation of leaf nodes and part of the representation of internal nodes.
struct LeafNode<K, V> {
    /// We want to be covariant in `K` and `V`.
    parent: Option<NonNull<InternalNode<K, V>>>,
    /// This node's index into the parent node's `edges` array.
    /// `*node.parent.edges[node.parent_idx]` should be the same thing as `node`.
    /// This is only guaranteed to be initialized when `parent` is non-null.
    parent_idx: MaybeUninit<u16>,
    /// The number of keys and values this node stores.
    len: u16,
    keys: [MaybeUninit<K>; CAPACITY],
    vals: [MaybeUninit<V>; CAPACITY],
}

#[repr(C)]
struct InternalNode<K, V> {
    data: LeafNode<K, V>,
    edges: [MaybeUninit<BoxedNode<K, V>>; 2 * B],
}

显然这个数据结构相比我们存在一个严重影响局部性的键-值分离的问题,我们在具体的性能基准测试里验证这一点。

测试环境

CPU: Intel Core i5-4590

测试数据集:40 K 8 Byte + 8 Byte 键值对儿的随机数据

查询测试

Std-B:标准库 B 树

B+3-48:我们自己的实现 节点容量 = 48

数据单位:ms / iter

         
No. Std-B B+3-48 B+3-48 Rel. Speed  
1 9.07 2.92 +211%  
2 7.69 2.93 +162%  
3 7.70 2.88 +167%  
4 7.73 2.93 +164%  
5 7.78 2.91 +167%  
6 7.73 2.91 +166%  
7 9.07 2.95 +207%  
8 7.75 2.96 +162%  
9 7.80 2.95 +164%  
10 7.74 2.94 +163%  
11 7.75 2.93 +165%  
12 7.67 2.88 +166%  
         
    Ave. +172%  

插/删测试

Std-B:标准库 B 树

B+3-48:我们自己的实现 节点容量 = 48

数据单位:ms / iter

             
No. Std-B Insert B+3-48 Insert Std-B Cal. Remove B+3-48 Cal. Remove B+3-48 Insert Rel. Speed B+3-48 Remove Rel. Speed
1 3.95 3.78 5.98 5.47 +4% +9%
2 3.75 3.62 4.66 4.76 +4% -2%
3 3.70 3.64 4.70 5.63 +2% -17%
4 3.97 3.60 5.13 4.83 +10% +6%
5 3.72 3.72 4.79 4.77 0% +0%
6 4.18 3.80 5.90 6.43 +10% -8%
7 3.79 3.57 5.02 5.16 +6% -3%
8 3.80 3.75 5.35 4.64 +1% +15%
9 3.92 3.75 5.66 5.49 +5% +3%
10 3.69 3.58 4.80 4.71 +3% +2%
11 4.36 3.78 5.74 5.52 +15% +4%
12 4.07 3.68 4.78 4.83 +11% -1%
             
        Ave. +6% +1%

考虑到 B 树本身由于键-值直接存储在节点上,相对于 B+ 树,不仅容易实现得多,而且查询时不需要搜索到叶子,而在插删操作时也不需要向上更新键索引,这个测试结果可以说非常令人满意了,平均查询性能超出 1.5 倍,平均插/删性能大致相当,还要略优。

注解

  1. 只不过看起来 split_off 大概也是得自己实现 ↩

  2. $\large \log_2 n = \log_2 2^m \cdot \log_{2^m} n $ ↩

  3. https://github.com/minghu6/rust-minghu6/blob/v0.1.18/coll_st/src/bt/bpt3.rs ↩

  4. https://github.com/rust-lang/rust/blob/1.88.0/library/alloc/src/collections/btree/node.rs#L50 ↩

© minghu6

·

theme

Simplex theme logo

by golas