Cancel

BT(2) - B+树(TreeMap)

Algorithm

·

March 14, 2023

对于 B 树来说,传统上有一种对节点分裂、合并时性能的改进方法,就是把存储结构由数组改为 TreeMap 。TreeMap 或者有序字典,比如我们前面介绍的所有的 BST,比如红黑树,以及,我们 B 树。

没错,现在我们就是要用前文实现的 B+ 树作为我们新 B+ 树的节点基础。当然这是一个递归的概念,我们也可以继续下去把这个新的 B+ 树的实现作为节点基础实现 B+ 树,如此 $3$ 层、$4$ 层的嵌套。但从 $m$ 常数的角度,两层就够了,再多也没有意义,通常 $m$ 也就是一百左右,次级的 B+ 树的 $m$ 最多也就是就是十几,已经什么空间了,何况因此获得的性能增长也呈指数级下降。

但是实现用前面实现的 B+ 树作为节点的基础,还需要实现额外的一些 public method ,包括增加字段,比如统计键值总数的 cnt 字段等等。但是如何扩写 Vec 版本的 B+ 树一时难以论说,事实上扩写&重构改动的代码比原代码还长,所以就把它放到下一篇介绍,那时将得到一个相对完整的 Vec 版本的 B+ 树。

这里会列出一个作为节点实现 B+ 树的 TreeMap 需要实现的方法,既作为下一篇的索引,也作为用其他 TreeMap 代替实现时的参考。

数据结构

树

这里增加了 cnt 统计字段和 min_node 用于快速范查

impl_tree!(
    /// B+ Trees powered by B+ tree
    ///
    BPT2 { cnt: usize, min_node: WeakNode<K, V> }
);

节点包装

impl_node!();

内部节点

TreeMap 实现的 B+ 树和数组实现的 B+ 树在内部节点的设计上有质的不同。

数组版本的,键值和孩子是按照索引来排的,而 TreeMap 则是按照唯一的键值来排列的。这既省了事儿又另一方面多了事儿,整个节点状态维护的逻辑都发生了变化。

首先从节点布局上,内部节点的 keys 字段没有存在的意义,直接使用 children map,但是这样对 map 的键的维护就成了一个关键。如何选取一个合适的键来代表孩子呢?

有两种明显的思路:

  1. 就用节点的最小值作为键,这样查询起来非常快,可以快速失败,但是当节点的最小键发生变化时,必须向上更新 map 来维护这一性质;
  2. 那么用任意孩子节点的任一一个键就不需要始终维护了,但这样的话查询的时候就不知道到底要落到哪一个键上,就查询左右两个键的孩子的所有键值也解决不了问题,因为它们也不一定都是最小键

于是显然,我们采用把最小键最为子节点在父节点上的键的设计。

const SUB_M: usize = 20;

enum Node_<K, V> {
    Internal {
        children: BPT<K, Node<K, V>, SUB_M>,
        paren: WeakNode<K, V>,
    },
    Leaf {
        entries: BPT<K, V, SUB_M>,
        /// Successor (Leaf)
        succ: WeakNode<K, V>,
        paren: WeakNode<K, V>,
    },
}
use Node_::*;


def_attr_macro_bpt!(paren, succ, entries, children);


impl<K, V> Node_<K, V> {
    fn is_leaf(&self) -> bool {
        matches!(self, Leaf { .. })
    }

    def_node__heap_access!(internal, children, BPT<K, Node<K, V>, SUB_M>);
    def_node__heap_access!(leaf, entries, BPT<K, V, SUB_M>);

    def_node__wn_access!(both, paren);
    def_node__wn_access!(leaf, succ);
}

基础方法

搜索到叶子

这里会用到 TreeMap 里面的一个配合方法,low_bound_search ,是搜索 $\geqslant k$ 的第一个节点。

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    #[inline(always)]
    fn search_to_leaf_r<Q>(mut x: &Node<K, V>, k: &Q) -> Node<K, V>
    where
        K: Borrow<Q> + Ord,
        Q: Ord + ?Sized,
    {
        while x.is_internal() {
            if let Some(x_) = children!(x).low_bound_search(k) {
                x = x_;
            } else {
                return Node::none();
            }
        }

        x.clone()
    }
}

节点访问

impl<K, V> Node<K, V> {
    fn is_leaf(&self) -> bool {
        self.is_some() && attr!(self | self).is_leaf()
    }

    fn is_internal(&self) -> bool {
        self.is_some() && !attr!(self | self).is_leaf()
    }

    fn min_key(&self) -> Option<&K>
    where
        K: Ord,
    {
        if self.is_none() {
            None
        } else if self.is_internal() {
            children!(self).min_key()
        } else {
            entries!(self).min_key()
        }
    }
}


macro_rules! min_key {
    ($x:expr) => {
        if let Some(k) = $x.min_key() {
            k.clone()
        } else {
            panic!("No min-key {:?}", $x)
        }
    };
}

创建节点

macro_rules! node {
    (kv| $k:expr, $v:expr) => { {
        // overflow is M
        let mut entries = BPT::new();
        entries.insert($k, $v);

        node!(basic-leaf| entries, WeakNode::none(), WeakNode::none())
    } };
    (basic-leaf| $entries:expr, $succ:expr, $paren:expr) => { {
        aux_node!(FREE-ENUM Leaf {
            entries: $entries,
            succ: $succ,
            paren: $paren
        })
    } };
    (basic-internal| $children:expr, $paren:expr) => { {
        aux_node!(FREE-ENUM Internal {
            children: $children,
            paren: $paren
        })
    } };
}

一般公开方法

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
	pub fn new() -> Self {
        assert!(M > 2, "M should be greater than 2");

        Self {
            root: Node::none(),
            cnt: 0,
            min_node: WeakNode::none(),
        }
    }

    #[inline(always)]
    pub fn len(&self) -> usize {
        self.cnt
    }

    pub fn get<Q>(&self, k: &Q) -> Option<&V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        mut_self!(self).get_mut(k).map(|v| &*v)
    }

    pub fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let x = Self::search_to_leaf_r(&self.root, k);

        // Nil
        if x.is_none() {
            None
        }
        // Leaf
        else {
            entries_mut!(x).get_mut(k)
        }
    }
}

范围查询

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
	pub fn select<'a, Q, R>(&self, range: R) -> impl Iterator<Item = &V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized + 'a,
        R: RangeBounds<&'a Q> + Clone,
    {
        mut_self!(self).select_mut(range).map(|v| &*v)
    }

    pub fn select_mut<'a, Q, R>(
        &mut self,
        range: R,
    ) -> impl Iterator<Item = &mut V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized + 'a,
        R: RangeBounds<&'a Q> + Clone,
    {
        /* find start_node */

        let mut cur;

        if self.cnt == 0 {
            cur = Node::none();
        } else {
            match range.start_bound() {
                Included(&k) => {
                    cur = Self::search_to_leaf_r(&self.root, k.borrow());

                    if cur.is_none() {
                        cur = self.min_node.upgrade();
                    }
                }
                Excluded(_) => unimplemented!(),
                Unbounded => {
                    cur = self.min_node.upgrade();
                }
            }
        }

        std::iter::from_generator(move || {
            while cur.is_some() {
                let mut entries = entries_mut!(cur).select_mut(range.clone());

                if let Some(fst) = entries.next() {
                    yield fst;

                    for ent in entries {
                        yield ent
                    }

                    cur = succ!(cur);
                } else {
                    break;
                }
            }
        })
    }
}

插入

主流程

插入的时候,检查是否需要更新 map 的索引。

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    pub fn insert(&mut self, k: K, v: V) -> Option<V>
    where
        K: Clone + Debug,
        V: Debug,
    {
        let x = Self::search_to_leaf_r(&self.root, &k);

        self.insert_into_leaf(x, k, v)
    }
    
    fn insert_into_leaf(&mut self, mut x: Node<K, V>, k: K, v: V) -> Option<V>
    where
        K: Clone + Debug,
        V: Debug,
    {
        /* new min none */

        if x.is_none() {
            if self.cnt == 0 {
                self.root = node!(kv | k, v);
                self.min_node = self.root.downgrade();

                self.cnt += 1;

                return None;
            } else {
                x = self.min_node.upgrade();
            }
        }

        /* insert into leaf */

        debug_assert!(x.is_leaf());

        let old_k = min_key!(x);

        let popped = entries_mut!(x).insert(k, v);

        Self::update_index(old_k, &x);

        if entries!(x).len() == M {
            self.promote(x);
        }

        if popped.is_none() {
            self.cnt += 1;
        }

        popped
    }
}

更新索引

索引更新需要一路向上检查

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    #[inline(always)]
    fn update_index(old_k: K, x: &Node<K, V>)
    where
        K: Clone + Debug,
    {
        let k = min_key!(x);
        let p = paren!(x);

        if old_k != k && p.is_some() {
            /* update x key */

            let old_p_k = min_key!(p);

            children_mut!(p).remove(&old_k);
            children_mut!(p).insert(k.clone(), x.clone());

            Self::update_index(old_p_k, &p);
        }
    }
}

节点提升

到目前位置 TreeMap 的节点实现给我们增加了很多麻烦,但当分裂节点的时候,就很简单了1,这里的 split_off 的表现和 Vec 的一致。

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {    
	fn promote(&mut self, x: Node<K, V>)
    where
        K: Clone + Debug,
        V: Debug,
    {
        debug_assert!(x.is_some());

        /* split node */

        let x2;

        if x.is_leaf() {
            debug_assert_eq!(entries!(x).len(), Self::entries_high_bound());

            // keep key for leaf
            let entries_x2 = entries_mut!(x).split_off(M.div_floor(2));

            x2 = node!(
                basic - leaf | entries_x2,
                succ!(x).downgrade(),
                WeakNode::none()
            );

            succ!(x, x2.clone());
        } else {
            debug_assert_eq!(
                children!(x).len(),
                Self::entries_high_bound() + 1
            );

            let children_x2 = children_mut!(x).split_off((M + 1).div_floor(2));

            x2 = node!(basic - internal | children_x2, WeakNode::none());

            for child in children_mut!(x2).values() {
                paren!(child, x2.clone());
            }
        }

        let p = paren!(x);
        let x_k = min_key!(x);
        let x2_k = min_key!(x2);

        /* push new level */
        if p.is_none() {
            let children = bpt! {
                x_k => x,
                x2_k => x2
            };

            self.root = node!(basic - internal | children, WeakNode::none());

            for child in children_mut!(self.root).values() {
                paren!(child, self.root.clone());
            }
        }
        /* insert into paren node */
        else {
            paren!(x2, p.clone());

            // insert new or update
            children_mut!(p).insert(x2_k, x2);

            if children!(p).len() == Self::entries_high_bound() + 1 {
                self.promote(p);
            }
        }
    }
}

删除

主流程

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
    where
        K: Borrow<Q> + Clone + Debug,
        Q: Ord + ?Sized,
        V: Debug,
    {
        let x = Self::search_to_leaf_r(&self.root, k);

        self.remove_on_leaf(x, k)
    }

    fn remove_on_leaf<Q>(&mut self, x: Node<K, V>, k: &Q) -> Option<V>
    where
        K: Borrow<Q> + Clone + Debug,
        Q: Ord + ?Sized,
        V: Debug,
    {
        if x.is_none() {
            return None;
        }

        debug_assert!(x.is_leaf());

        let old_k = min_key!(x);

        let popped = entries_mut!(x).remove(k);

        if popped.is_some() {
            self.cnt -= 1;
        }

        if entries!(x).len() == 0 {
            if self.cnt == 0 {
                self.root = Node::none();
                self.min_node = WeakNode::none();
            } else {
                self.unpromote(x, old_k);
            }
        } else {
            Self::update_index(old_k, &x);
        }

        popped
    }
}

节点下降

在本方法和后面的重平衡方法里更新索引时很容易出错,需要仔细考虑,比如同时更新两个索引时可能会出现键值覆盖的问题。

rank:查询键排名,从 $0$ 开始

nth:根据排名,返回键值,从 $0$ 开始

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {   
    fn unpromote(&mut self, mut x: Node<K, V>, old_k: K)
    where
        K: Clone + Debug,
        V: Debug,
    {
        debug_assert!(paren!(x).is_some());
        debug_assert!(
            x.is_leaf()
                || children!(x).len() == Self::entries_low_bound() - 1 + 1
        );

        let p = paren!(x);
        let idx = children!(p).rank(&old_k);

        if Self::try_rebalancing(&x, &x, idx, old_k.clone()) {
            return;
        }

        // merge with left_sib (no need to update index)
        if idx > 0 {
            let (_sib_k, sib_lf) = children!(p).nth(idx - 1).unwrap();

            children_mut!(p).remove(&old_k);

            if x.is_leaf() {
                succ!(sib_lf, succ!(x));
            } else {
                debug_assert!(x.is_internal());

                for (child_k, child) in children_mut!(x).drain_all() {
                    paren!(child, sib_lf.clone());
                    children_mut!(sib_lf).push_back(child_k, child);
                }
            }
        }
        // for right_sib (merge into left)
        else {
            let (_sib_k, sib_rh) = children!(p).nth(idx + 1).unwrap();
            let sib_rh = sib_rh.clone();

            // 由于没有prev,只能总是合并到左节点,好在此时叶子节点只有一个元素
            if x.is_leaf() {
                debug_assert_eq!(entries!(sib_rh).len(), 1);

                succ!(x, succ!(sib_rh));

                let (k, v) = entries_mut!(sib_rh).pop_first().unwrap();
                children_mut!(p).remove(&k);
                entries_mut!(x).insert(k.clone(), v);

                // remove x from p
                Self::update_index(old_k, &x);

                // 不需要更新索引,因为 sib_rh 值更大
            } else {
                let old_key_sib_rh = min_key!(sib_rh);
                children_mut!(p).remove(&old_key_sib_rh);

                for (child_k, child) in children_mut!(sib_rh).drain_all() {
                    paren!(child, x.clone());
                    children_mut!(x).push_back(child_k, child);
                }

                Self::update_index(old_k, &x);
            }
        }

        let old_key = min_key!(p);
        x = p;

        if paren!(x).is_none() {
            if children!(x).len() == 1 {
                // pop new level
                self.root = children_mut!(x).pop_first().unwrap().1;
                paren!(self.root, Node::none());
            }
        } else {
            if children!(x).len() < Self::entries_low_bound() + 1 {
                self.unpromote(x, old_key);
            }
        }
    }
}

尝试重平衡

impl<K: Ord, V, const M: usize> BPT2<K, V, M> {
    fn try_rebalancing(
        p: &Node<K, V>,
        x: &Node<K, V>,
        idx: usize,
        old_k: K,
    ) -> bool
    where
        K: Clone + Debug,
        V: Debug,
    {
        // try to redistribute with left
        if idx > 0 {
            let (_sib_k, sib_lf) = children!(p).nth(idx - 1).unwrap();

            if sib_lf.is_leaf() && entries!(sib_lf).len() > 1 {
                let (k, v) = entries_mut!(sib_lf).pop_last().unwrap();
                entries_mut!(x).insert(k, v);

                Self::update_index(old_k, x);

                return true;
            }

            if sib_lf.is_internal()
                && children!(sib_lf).len() > Self::entries_low_bound() + 1
            {
                let (k, v) = children_mut!(sib_lf).pop_last().unwrap();
                paren!(v, x.clone());
                children_mut!(x).insert(k, v);

                Self::update_index(old_k, x);

                return true;
            }
        }
        // try to redistribute with right then
        if idx < children!(p).len() - 1 {
            let (_sib_k, sib_rh) = children!(p).nth(idx + 1).unwrap();
            let sib_rh = sib_rh.clone();

            if sib_rh.is_leaf() && entries!(sib_rh).len() > 1 {
                let (k, v) = entries_mut!(sib_rh).pop_first().unwrap();
                entries_mut!(x).insert(k.clone(), v);

                Self::update_index(old_k, x); // WARNING: it wll replace sib_rh with x
                children_mut!(p).insert(min_key!(sib_rh), sib_rh);

                return true;
            }

            if sib_rh.is_internal()
                && children!(sib_rh).len() > Self::entries_low_bound() + 1
            {
                let sib_rh_old_key = min_key!(sib_rh);

                let (k, v) = children_mut!(sib_rh).pop_first().unwrap();
                paren!(v, x.clone());
                children_mut!(x).insert(k.clone(), v);

                // 不需要像叶子节点那样两段儿更新是因为在叶子更新的时候,已经对 x 向上更新过了
                // Self::update_index(old_k, x);
                // children_mut!(p).insert(min_key!(sib_rh), sib_rh);

                Self::update_index(sib_rh_old_key, &sib_rh);

                return true;
            }
        }

        false
    }
}

测试

打印方法

impl<K: Ord + Debug, V, const M: usize> Debug for BPT2<K, V, M> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
        f.debug_struct("BPT2")
            .field("root", &self.root)
            .field("cnt", &self.cnt)
            .finish()
    }
}


impl<K: Debug + Ord, V> Debug for Node_<K, V> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
        match self {
            Self::Internal { children, .. } => f
                .debug_struct("Internal")
                .field("children", children)
                .finish(),
            Self::Leaf { entries, .. } => {
                f.debug_struct("Leaf").field("entries", entries).finish()
            }
        }
    }
}


#[cfg(test)]
impl<K: Ord + Debug + Clone, V, const M: usize> BPT2<K, V, M> {
    fn debug_print(&self)
    where
        V: Debug,
    {
        use common::vecdeq;

        /* print header */

        println!("{self:?}");

        /* print body */

        if self.root.is_none() {
            return;
        }

        let mut this_q = vecdeq![vec![self.root.clone()]];
        let mut lv = 1;

        while !this_q.is_empty() {
            println!();
            println!("############ Level: {lv} #############");
            println!();

            let mut nxt_q = vecdeq![];

            while let Some(children) = this_q.pop_front() {
                for (i, x) in children.iter().enumerate() {
                    let p = paren!(x);

                    if x.is_internal() {
                        nxt_q.push_back(
                            children!(x).values().cloned().collect(),
                        );
                        println!("({i:02}): {x:?} (p: [{p:?}])");
                    } else {
                        let succ = succ!(x);
                        println!(
                            "({i:02}): {x:?} (p: [{p:?}], succ: [{succ:?}])"
                        );
                    }
                }

                println!();
            }


            this_q = nxt_q;
            lv += 1;
        }

        println!("------------- end --------------\n");
    }
}

校验方法

impl<K: Ord + Debug + Clone, V, const M: usize> BPT2<K, V, M> {    
	fn validate(&self)
    where
        K: Debug + std::hash::Hash,
        V: Debug,
    {
        if self.root.is_none() {
            return;
        }

        use std::collections::VecDeque;

        use common::vecdeq;

        let mut cur_q = vecdeq![vecdeq![Node::none(), self.root.clone()]];

        while !cur_q.is_empty() {
            let mut nxt_q = vecdeq![];
            let mut leaf_num = 0;
            let mut internal_num = 0;

            while let Some(mut group) = cur_q.pop_front() {
                let p = group.pop_front().unwrap();

                for child in group.iter() {
                    assert!(child.is_some());

                    if child.is_internal() {
                        assert!(
                            children!(child).len()
                                < Self::entries_high_bound() + 1
                        );

                        // children!(child).validate();
                    } else {
                        assert!(
                            entries!(child).len() < Self::entries_high_bound()
                        );
                        // entries!(child).validate();
                    }

                    // Exclude leaf
                    if child.is_leaf() {
                        leaf_num += 1;
                    } else {
                        // Exclude the root (which is always one when it's internal node)
                        if paren!(child).is_some() {
                            assert!(
                                children!(child).len()
                                    >= Self::entries_low_bound() + 1
                            );
                        } else {
                            assert!(children!(child).len() >= 2);
                        }

                        internal_num += 1;

                        let mut nxt_children = VecDeque::from_iter(
                            children!(child).values().cloned(),
                        );
                        nxt_children.push_front(child.clone());

                        nxt_q.push_back(nxt_children);
                    }
                }

                // All leaves are in same level
                assert!(
                    leaf_num == 0 || internal_num == 0,
                    "leaf: {leaf_num}, internal: {internal_num}"
                );
            }

            cur_q = nxt_q;
        }
    }
}

方法统计

BPT Tree Map
select/select_mut select_mut
remove etc. remove
insert etc. insert
search_to_leaf_r low_bound_search
* len
* min
promote split_off
unpromote/try_rebalancing nth
unpromote rank
unpromote push_back
unpromote/try_rebalancing pop_first/pop_last
* min_key
   

总结

运行一个简单的基准测试,结果符合预期:

环境 值
CPU Intel Haswell Core i5-4590
Memory 16 GB
Key Size 8 Bytes

最优配置:

$\text{SUB_M} = 20$

$10 \leqslant M \leqslant 20$

  1. 查询性能非常好,比 Rust 标准库里 HashMap 的查询速度还要快 25%!但是这种加速不是 B+ 树结构本身所导致的,而是 Map 的使用使得节点的内存布局在缓存上更有利,也就是单一数组的 TreeMap 比起分离的两个数组更有利,换言之,直接使用单一数组实现2的 Map 性能会更好!
  2. 插、删的性能介于传统 BST 和 B 树、B+ 树之间

这个结果符合了理论和经验上的预期,最后再补充一个对比测试:

B+ Tree push_back-pop_first vs Vec push-remove(0) on u64

M batch CroSS
105 1150
10 2500

也就是说当数据量要足够大才能发挥出算法上的性能才能抵消缓存的优势,而在最佳 M 范围下,数组实现具有明显优势。

注解

  1. 只不过看起来 split_off 大概也是得自己实现 ↩

  2. 比如 Vec<KVEntry<K, V>> ↩

© minghu6

·

theme

Simplex theme logo

by golas