基于前缀树的持久化向量(TrieVec)
在前面介绍了从 Fibonacci 堆、Dary 堆,到图上的诸多算法,从 BST 到 BT 的一系列数据结构和算法,并提供了它们的 Rust 实现,如果把这些代码都集成起来,可能就会发现这个编译的过程怎么这么熬人,似乎越来越让人难以忍受,这时可以参考一下另篇关于降低构建时间的笔记。
前言
本篇作为一个阶段性的总结篇,介绍一个 Clojure 里的基于前缀树(Trie)的持久化向量(Vector)的实现1 ,以下用 TrieVec 简称。
一方面,Vector 作为函数式编程语言里的允许随机读写的数据结构,本身有很重要的意义,没有它,传统的用类 C 描述的算法就无法落地;
另一方面,讲到持久化数据结构,其实大部分都已涉及,只有持久化向量比较陌生并且比较复杂,有必要专门看一下;
最后,在看这个实现的过程中,会发现这是一个总体概念陌生但局部细节熟悉的数据结构,它可以串起之前介绍过的诸多数据结构,不管是结构特点、设计方案还是实现方法,是一个合适的阶段尾声。
概念基础
TrieVec 本质上就像我们在多叉堆上那样的做法,只不过多叉堆建立在数组上,而 TrieVec 则以树的的形式组织。
比如对于一棵节点数宽度为 $2$ ,也就是 $2^2 = 4$ 阶的树,访问 idx = 54
\[\begin{array}{l} 54 &=\underbrace{11}\ \underbrace{01}\ \underbrace{10}\\ &=\ \ \ 3\quad\ \ \ 1\quad\quad 2 \end{array}\]情况如下图:
这要求树的完全平衡,也就是每个叶子到根的高度都一样,然后把数据存储在每个叶子上,这一点又和我们在 B+ 树上的情况相似。
而对树层级的定义又和 AA 树 是一样,用 level 的概念来描述是非常恰当的,叶子层的 level 是 $1$ ,空树的 level 是 $0$ 。
对于持久化的要求来讲,每次操作,Push/Pop/Update etc. 只需要复制从根节点到叶子一条路径的节点,对数级别的开销是一个可以接受的代价。
另外,对于某些时间敏感的任务,Clojure 还允许把持久化的结构转变为易变的结构(Transient)来做就地修改,然后再转回持久性结构,以避免复制开销。这样的话需要在每个节点上做出标识来追踪节点的创建者,在下一部分的标记一节具体讨论。
与易变向量的节点,以及区分历史上由不同线程创建的节点。
数据结构
树
这里有一个优化的小改动2,就是把 TrieVec 最后一个节点(Bucket)从 Trie 中单独拿出来,作为尾节点,这会使得 Clojure 里像 conj
这样等价于 push 的操作在常量时间里完成。我们计算 TrieVec 的结构时候需要把尾部节点的长度扣除。
#[derive(Clone)]
pub struct PTrieVec<T> {
cnt: usize,
root: Node<T>,
tail: Node<T>,
}
pub struct TTrieVec<T> {
cnt: usize,
root: Node<T>,
tail: Node<T>,
}
常量定义
const BIT_WIDTH: u32 = 5;
const NODE_SIZE: usize = 1 << BIT_WIDTH as usize;
const MASK: usize = NODE_SIZE - 1;
从 Trie size 反推 Trie height
macro_rules! h {
($self:ident) => {
trie_height(tailoff!($self))
};
}
const fn trie_height(trie_size: usize) -> u32 {
match trie_size {
0 => 0,
1 => 1,
x => {
let mut h = (x - 1).ilog2() / BIT_WIDTH;
if x > NODE_SIZE.pow(h as u32) {
h += 1;
}
h
}
}
}
根据总索引和当前层得到索引位置
macro_rules! idx {
($idx:expr, $lv:expr) => {
// Precedence: '*' > '>>' > '&'
$idx >> ($lv - 1) * BIT_WIDTH & MASK
};
}
尾部偏移量
也就是尾节点之前的元素数,也就是实际 Trie 的大小
macro_rules! tailoff {
($self:ident) => {
if $self.cnt == 0 {
0
} else {
(($self.cnt - 1) >> BIT_WIDTH) << BIT_WIDTH
}
};
}
标志
由于存在持久与易变版本间的转换,一个节点可能是持久版本的节点,也可能是历史上不同线程创建的易变节点。
这里使用 u64
3表示的 Thread id 作为标志,表示历史上创建易变节点的线程,Thread id 都大于 0 ,于是我们可以用 0 来表示持久版本的节点。
这样对于可编辑节点需要满足:
- 不是持久化节点
- 不是历史上其他线程创建的节点
不满足条件就需要复制节点,创造属于当前线程的节点。
macro_rules! edit {
() => {
std::thread::current().id().as_u64().get()
};
}
macro_rules! no_edit {
() => {
0
};
}
pub type ID = u64;
impl<T> Node<T> {
fn id(&self) -> ID {
if self.is_none() {
0
} else {
id!(self).clone()
}
}
}
内部节点
enum Node_<T> {
Internal { id: ID, children: Array<Node<T>> },
Leaf { id: ID, values: Array<T> },
}
use Node_::*;
impl<T> Node_<T> {
fn is_leaf(&self) -> bool {
matches!(self, Leaf { .. })
}
def_node__heap_access!(both, id, ID);
def_node__heap_access!(internal, children, Array<Node<T>>);
def_node__heap_access!(leaf, values, Array<T>);
}
Array
使用的是前面DaryHeap
使用过的分配在堆上的静态数组;def_node__heap_access
宏使用的之前 B+ 树里面使用过的宏
节点包装
考虑到持久化数据结构的跨线程使用的情况,节点包装里面的引用计数器将不再采用之前的 std::rc::Rc
,而改用同步版本的 std::sync::Arc
4 。
但即使如此,由于 Rust 的严格限制,我们仍然不能把 Node
跨线程使用,即使我们确信作为持久化的结构时,是可以这样安全使用的。在 std::sync
的包里有各种各样的同步手段:
- 临界区(Exclusive)
- 惰性锁(LazyLock)
- 单写锁(OnceLock)
- 栅栏(Barrier)
- 条件变量(CondVar)
- 锁(Mutex)
- 单次锁 (Once)
- 读写锁 (RwLock)
看了让人直呼好家伙,上世纪的那点儿古董玩意儿都让它给复活了😅,但是这里面没有我们需要的,我们的持久化结构实际避免了同步的需求,但是需要有一种机制让编译器知道这个情况以便能通过 Trait 检查,而显然标准库里没有提供这种手段。
不过我们自己可以通过定义一个新的包装结构,为它实现 Sync
和 Send
两个 Trait 来实现持久化结构需要的 Trait 语义,已经有人干得很好了,我们在它的基础上按照我们的需求改进,得到一个告知编译器通过同步检查的包装结构,UnsafeSendSync
#[derive(Default, Clone, Copy)]
#[repr(transparent)]
pub struct UnsafeSendSync<T>(T);
// 一个自动实现Trait的宏,细节不展开了
impl_unpack!(UnsafeSendSync | AsRef, AsMut, Deref, DerefMut, From);
unsafe impl<T> Send for UnsafeSendSync<T> {}
unsafe impl<T> Sync for UnsafeSendSync<T> {}
impl<T> UnsafeSendSync<T> {
pub fn new(v: T) -> Self {
Self(v)
}
pub fn unwrap(self) -> T {
self.0
}
pub fn as_ptr(&self) -> *const T {
&self.0
}
pub unsafe fn as_ref_mut_ptr(&self) -> *mut T {
&self.0 as *const _ as *mut _
}
}
于是我们重新修改了前面定义节点包装器的 impl_node
的宏:
#[macro_export]
macro_rules! impl_node {
() => {
impl_node!(pub(self));
};
($vis:vis) => {
impl_node!($vis <K, V>, rc);
};
($vis:vis <$($g:ident),+>, rc) => {
impl_node!(
$vis <$($g),+>,
std::rc::Rc<std::cell::RefCell<Node_<$($g),+>>>,
std::rc::Weak<std::cell::RefCell<Node_<$($g),+>>>
);
#[allow(unused)]
impl<$($g),+> Node<$($g),+> {
fn as_ptr(&self) -> *mut Node_<$($g),+> {
match self.0 {
Some(ref rc) => rc.as_ptr(),
None => std::ptr::null_mut(),
}
}
}
#[allow(unused)]
macro_rules! aux_node {
({ $$($attr:ident : $attr_val:expr),* $$(,)? }) => {
Node(Some(std::rc::Rc::new(std::cell::RefCell::new(Node_ {
$$(
$attr: $attr_val
),*
}))))
};
(ENUM $ty:ident { $$($attr:ident : $attr_val:expr),* $$(,)? }) => {
Node(Some(std::rc::Rc::new(std::cell::RefCell::new(Node_::$ty {
$$(
$attr: $attr_val
),*
}))))
};
}
#[allow(unused)]
macro_rules! unwrap_into {
($node:expr) => {
std::rc::Rc::try_unwrap($node.0.unwrap())
.unwrap()
.into_inner()
};
}
};
($vis:vis <$($g:ident),+>, arc) => {
impl_node!(
$vis <$($g),+>,
std::sync::Arc<UnsafeSendSync<Node_<$($g),+>>>,
std::sync::Weak<UnsafeSendSync<Node_<$($g),+>>>
);
macro_rules! aux_node {
({ $$($attr:ident : $attr_val:expr),* }) => {
Node(Some(std::sync::Arc::new(UnsafeSendSync::new(Node_ {
$$(
$attr: $attr_val
),*
}))))
};
(ENUM $ty:ident { $$($attr:ident : $attr_val:expr),* }) => {
Node(Some(std::sync::Arc::new(UnsafeSendSync::new(Node_::$ty {
$$(
$attr: $attr_val
),*
}))))
};
}
};
($vis:vis <$($g:ident),+>, $rc:ty, $wk:ty) => {
$vis struct Node<$($g),+>(
Option<$rc>,
);
/// Used for reverse reference to avoid circular-reference
///
/// So we can easy auto drop
struct WeakNode<$($g),+>(
Option<$wk>,
);
#[allow(unused)]
impl<$($g),+> Node<$($g),+> {
fn downgrade(&self) -> WeakNode<$($g),+> {
WeakNode(
self.0.clone().map(|ref rc| <$rc>::downgrade(rc)),
)
}
fn none() -> Self {
Self(None)
}
fn is_some(&self) -> bool {
self.0.is_some()
}
fn is_none(&self) -> bool {
self.0.is_none()
}
fn rc_eq(&self, other: &Self) -> bool {
match self.0 {
Some(ref rc1) => {
if let Some(ref rc2) = other.0 {
<$rc>::ptr_eq(rc1, rc2)
} else {
false
}
}
None => other.is_none(),
}
}
}
impl<$($g),+> Default for Node<$($g),+> {
fn default() -> Self {
Self::none()
}
}
impl<$($g),+> Clone for Node<$($g),+> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[allow(unused)]
impl<$($g),+> WeakNode<$($g),+> {
fn upgrade(&self) -> Node<$($g),+> {
Node(self.0.clone().map(|weak| {
if let Some(strong) = weak.upgrade() {
strong
} else {
unreachable!("weak node upgrade failed")
}
}))
}
fn none() -> Self {
Self(None)
}
fn is_none(&self) -> bool {
self.0.is_none()
}
fn is_some(&self) -> bool {
self.0.is_some()
}
fn replace(&mut self, oth: Self) {
self.0 = oth.0;
}
}
impl<$($g),+> Clone for WeakNode<$($g),+> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
}
}
并更新节点包装的访问宏,增加新的用例:
#[macro_export]
macro_rules! def_attr_macro {
// ...
(call_unsafe_sync | $($name:ident),+) => {
$(
coll::paste!(
macro_rules! $name {
($node:expr) => {
attr!(self_unsafe_sync | $$node).$name()
};
}
);
coll::paste!(
#[allow(unused)]
macro_rules! [<$name _mut>] {
($node:expr) => {
attr!(self_unsafe_sync_mut | $$node).[<$name _mut>]()
};
}
);
)+
}
}
#[macro_export]
macro_rules! attr {
// ...
(self_unsafe_sync | $node:expr) => { {
if let Some(ref _unr) = $node.0 {
unsafe { &* _unr.as_ref().as_ptr() }
}
else {
panic!("Call {} on None", stringify!($attr));
}
} };
(self_unsafe_sync_mut | $node:expr) => { {
if let Some(ref _unr) = $node.0 {
unsafe { &mut* _unr.as_ref().as_ref_mut_ptr() }
}
else {
panic!("MAccess {} on None", stringify!($attr));
}
} };
}
于是得到我们自己定义的节点的访问宏:
impl_node!(pub <T>, arc);
def_attr_macro!(call_unsafe_sync | id, children, values);
并用它们定义一些节点包装上的方法:
impl<T> Node<T> {
fn is_leaf(&self) -> bool {
self.is_some() && attr!(self_unsafe_sync | self).is_leaf()
}
fn is_internal(&self) -> bool {
self.is_some() && !attr!(self_unsafe_sync | self).is_leaf()
}
fn len(&self) -> usize {
assert!(self.is_some());
if self.is_internal() {
children!(self).len()
} else {
values!(self).len()
}
}
fn id(&self) -> ID {
if self.is_none() {
0
} else {
id!(self).clone()
}
}
}
简单方法
首先定义下向量的持久化版本和易变版本共通的一些方法:
macro_rules! impl_trie_common {
() => {
pub fn new() -> Self {
Self {
cnt: 0,
root: Node::none(),
tail: Node::none(),
}
}
pub fn len(&self) -> usize {
self.cnt
}
pub fn id(&self) -> ID {
self.tail.id()
}
pub fn nth(&self, idx: usize) -> &T {
assert!(self.cnt > idx);
let leaf = self.down_to_leaf(idx);
&values!(leaf)[idx!(idx, 1)]
}
// Alias as search to leaf, array_for, etc
fn down_to_leaf(&self, idx: usize) -> Node<T> {
debug_assert!(idx < self.cnt);
if idx >= tailoff!(self) {
return self.tail.clone();
}
let mut lv = h!(self);
let mut cur = &self.root;
while lv > 1 {
cur = &children!(cur)[idx!(idx, lv)];
lv -= 1;
}
cur.clone()
}
};
}
- 不同于 Clojure 里面的实现,我们检查尾节点的
id
而不是根节点的,因为尾节点才是首先被插入的节点
创建节点
定义一个创建节点的宏可以极大地方便我们后续持久性以及易变性数据结构推入或弹出节点的过程。
基础创建
macro_rules! node {
// ...
(single-leaf| $id:expr, $v:expr, $cap:expr) => { {
let node = node!(leaf| $id, $cap);
values_mut!(node)[0] = $v;
node
} };
(leaf| $id:expr, $cap:expr) => { {
aux_node!(ENUM Leaf {
id: $id,
values: Array::new($cap)
})
} };
(internal| $id:expr, $cap:expr) => { {
aux_node!(ENUM Internal {
id: $id,
children: Array::new_with_clone(Node::none(), $cap)
})
} };
}
带有容积参数的复制
创建一个指定大小的数组,并在允许范围内复制一个既有节点。这既可以用在 push
时创建一个扩容的新节点,也可以用在 pop
时创建一个缩容的新节点,当然也可以原封不动地复制节点。
macro_rules! node {
// ...
(dup-with| $id:expr, $x:expr, $cap:expr) => { {
let x = $x;
if x.is_leaf() {
node!(dup-with-leaf| $id, x, $cap)
}
else {
node!(dup-with-internal| $id, x, $cap)
}
} };
(dup-with-leaf| $id:expr, $x:expr, $cap:expr) => { {
let x = $x;
let cap = $cap;
debug_assert!(cap <= NODE_SIZE);
let node = node!(leaf| $id, cap);
let n = min(x.len(), cap);
values_mut!(node)[..n].clone_from_slice(&values!(x)[..n]);
node
} };
(dup-with-internal| $id:expr, $x:expr, $cap:expr) => { {
let x = $x;
let cap = $cap;
debug_assert!(cap <= NODE_SIZE);
let n = min(x.len(), cap);
let node = node!(internal| $id, cap);
children_mut!(node)[..n].clone_from_slice(&children!(x)[..n]);
nod
} }
}
复制时容量 +1/-1
macro_rules! node {
// ...
(dup-inc-leaf| $id:expr, $x:expr, $v:expr) => { {
let x = $x;
debug_assert!(x.is_leaf());
let node = node!(dup-with-leaf| $id, x, x.len() + 1);
values_mut!(node)[x.len()] = $v;
node
} };
(dup-inc-internal| $id:expr, $x:expr, $v:expr) => { {
let x = $x;
debug_assert!(x.is_internal());
let node = node!(dup-with-internal| $id, x, x.len() + 1);
children_mut!(node)[x.len()] = $v;
node
} };
(dup-dec| $id:expr, $x:expr) => { {
let x = $x;
debug_assert!(x.len() > 0);
if x.is_leaf() {
node!(dup-with-leaf| $id, x, x.len() - 1)
}
else {
node!(dup-with-internal| $id, x, x.len() - 1)
}
} };
值得注意的是,对 +1 复制与 -1 复制的实现行为有些不同,+1 复制要单独分为两种情况,这是因为它有一个额外的参数 $v
,也就是需要插入节点的值,而值的类型是异构的,不能动态地分发,只能手动静态地分发。
Push
主流程
impl<T> PTrieVec<T> {
pub fn push(&self, v: T) -> Self
where
T: Clone,
{
let cnt = self.cnt + 1;
let mut root = self.root.clone();
let tail;
// trie is empty
if self.tail.is_none() {
tail = node!(single - leaf | self.id(), v, 1);
}
// tail is available
else if self.cnt - tailoff!(self) < NODE_SIZE {
tail = node!(dup - inc - leaf | self.id(), &self.tail, v);
}
// tail is full
else {
tail = node!(single - leaf | self.id(), v, 1);
root = self.push_tail_into_trie();
}
Self { cnt, root, tail }
}
}
创建新路径
创建一条从某 $\text{lv}$ 开始直到叶子( $\text{lv}=1$ )的新路径:
/// Top-down clone new path from lv (1..h)
fn new_path<T>(id: ID, lv: u32, x: &Node<T>, cap: usize) -> Node<T> {
if lv == 1 {
return x.clone();
}
let node = node!(internal | id, cap);
children_mut!(node)[0] = new_path(id, lv - 1, x, cap);
node
}
这里使用了递归的实现,在原版的作为 Clojure 标准库的 Java 实现里所有 Trie 上的操作都采用了类似结构的递归的实现,这或许能简化一些代码,但是严重牺牲了代码的可读性:一方面接口丑陋,作为递归函数的代价,包含了过多不应该属于接口而是栈上临时的参数;另一方面递归本身又隐藏了细节,让人难以看清楚到底这个过程做了什么事情。因此后面我们将使用我们自己的展开版本的实现作为替代。
推入尾节点
当 push
时发现尾节点已满时,需要把旧的尾节点推入 Trie 里面,按照 Trie 树高可以分为三种情况:
- $\text{lv}=0$ ,也就是 Trie 为空的情况,把旧尾节点作为 Trie 的根;
- $\text{lv}=1$ ,此时发现根溢出,于是创建一个新的节点包含原来的旧根和推入的尾节点。但是更进一步地,可以推广到包含 $\text{lv}=1$ 在内的全部根溢出的情况,此时新根插入的不是原来的尾节点,而是
new_path
创建的包含尾节点的一整条路径; - $\text{lv} \geqslant 2$ ,从根节点开始复制,注意选择合适的数组容量,当中间节点超过一层时,自顶向下地复制每一层的节点,并把上一层中间节点里对本层节点的引用更新为新创建的节点,当发现新的节点在一个还未创建的路径时,可以直接使用
new_path
创建到叶子的新路径,然后插入到上一层节点。
impl<T> PTrieVec<T> {
fn push_tail_into_trie(&self) -> Node<T>
where
T: Clone,
{
debug_assert_eq!(self.tail.len(), NODE_SIZE);
let leaf_i = self.cnt - 1;
let mut lv = h!(self);
let root;
if lv == 0 {
root = self.tail.clone();
return root;
}
// Complete trie including case lv == 1
else if tailoff!(self) == NODE_SIZE.pow(lv) {
root = node!(internal | self.id(), 2);
children_mut!(root)[0] = self.root.clone();
children_mut!(root)[1] = new_path(self.id(), lv, &self.tail, 1);
return root;
}
debug_assert!(lv >= 2);
let mut p_i = idx!(leaf_i, lv);
let mut p;
if self.root.len() > p_i {
p = node!(dup | self.id(), &self.root);
} else {
p = node!(
dup - inc - internal | self.id(),
&self.root,
Node::none()
);
};
let root = p.clone();
// Go down through the branch
while lv >= 3 {
// at p's level
if children!(p)[p_i].is_some() {
let old_cur = &children!(p)[p_i];
let cur;
let cur_i = idx!(leaf_i, lv - 1);
if old_cur.len() > cur_i {
cur = node!(dup | self.id(), old_cur);
} else {
cur = node!(
dup - inc - internal | self.id(),
old_cur,
Node::none()
);
}
children_mut!(p)[p_i] = cur.clone();
lv -= 1;
p_i = cur_i;
p = cur;
} else {
children_mut!(p)[p_i] =
new_path(self.id(), lv - 1, &self.tail, 1);
return root;
}
}
debug_assert_eq!(lv, 2);
children_mut!(p)[p_i] = self.tail.clone();
root
}
}
Assoc
按照索引更新向量,如果索引和长度相等,就顺势插入。
和前面 push_tail_into_trie
同样自顶向下地复制一条只到叶子的路径,只不过前者相当于访问的索引是 self.cnt - 1
,而这里使用的是传入的索引。
impl<T> PTrieVec<T> {
/// idx in `[0, self.len()]` (update or push)
pub fn assoc(&self, idx: usize, v: T) -> Self
where
T: Clone,
{
assert!(self.cnt >= idx);
if idx == self.cnt {
return self.push(v);
}
debug_assert!(self.cnt > 0);
let cnt = self.cnt;
let root;
let tail;
if idx >= tailoff!(self) {
root = self.root.clone();
tail = node!(dup | self.id(), &self.tail);
values_mut!(tail)[idx!(idx, 1)] = v;
} else {
let mut lv = h!(self);
debug_assert!(lv > 0);
let mut p_i = idx!(idx, lv);
let mut p = node!(dup | self.id(), &self.root);
root = p.clone();
if lv > 1 {
// at p's level
loop {
let old_cur = &children!(p)[p_i];
let cur = node!(dup | self.id(), old_cur);
children_mut!(p)[p_i] = cur.clone();
lv -= 1;
p_i = idx!(idx, lv);
p = cur;
if lv == 1 {
break;
}
}
}
values_mut!(p)[p_i] = v;
tail = self.tail.clone();
}
Self { cnt, root, tail }
}
}
Pop
主流程
impl<T> PTrieVec<T> {
pub fn pop(&self) -> Self
where
T: Clone,
{
assert!(self.cnt > 0, "Can't pop empty vector");
let cnt = self.cnt - 1;
let mut root = self.root.clone();
let tail;
// Get empty vec
if self.cnt == 1 {
tail = Node::none();
}
// Get non-empty tail
else if self.cnt - tailoff!(self) > 1 {
tail = node!(dup - dec | self.id(), &self.tail);
} else {
// the last two idx
tail = self.down_to_leaf(self.cnt - 2);
root = self.pop_tail_from_trie();
}
Self { cnt, root, tail }
}
}
弹出尾节点
与 push
的操作相对的,当弹出尾部元素后发现尾节点为空时,需要把 Trie 最后一个节点弹出来作为尾节点。
有两点值得一讲:
-
使用
self.cnt - 2
作为对 Trie 上最后一个节点的访问索引,因为此时原尾节点上只有一个元素,因此 Trie 上有self.cnt - 1
个元素; -
弹出尾节点可能会产生空路径,也就是唯一的子节点为空的节点,空路径可能会一路向上延伸,直到根节点5。对应推入尾节点时有根溢出的情况,这时应该叫做根不足吧😀。
从结构上进行判断的话,需要从尾部再回溯到根,由于 Trie 节点没有对父节点的反向引用,所以在自顶向下过程中还要保存一下整条节点路径,然后进行检查。
不过有更简单的,就是直接检查 Trie 的大小,是否弹出尾节点后后高度会下降
impl<T> PTrieVec<T> {
fn pop_tail_from_trie(&self) -> Node<T>
where
T: Clone,
{
debug_assert_eq!(self.cnt - tailoff!(self), 1);
// Get empty tail
let mut lv = h!(self);
debug_assert!(lv > 0);
if lv == 1 {
return Node::none();
}
debug_assert!(lv >= 2);
let leaf_i = self.cnt - 1 - 1; // tail size 1
let mut p_i = idx!(leaf_i, lv);
let mut p = node!(dup | self.id(), &self.root);
let mut root = p.clone();
while lv > 2 {
let cur = node!(dup | self.id(), &children!(p)[p_i]);
children_mut!(p)[p_i] = cur.clone();
lv -= 1;
p_i = idx!(leaf_i, lv);
p = cur;
}
children_mut!(p)[p_i] = Node::none();
if self.cnt - 1 - NODE_SIZE == NODE_SIZE.pow(h!(self) - 1) {
/* pop root */
root = children!(root)[0].clone();
}
root
}
}
易变版本实现
数据结构
pub struct TTrieVec<T> {
cnt: usize,
root: Node<T>,
tail: Node<T>,
}
impl<T> !Send for TTrieVec<T> {}
impl<T> !Sync for TTrieVec<T> {}
由于 Sync
和 Send
是自动 Trait,由于 Node 因为我们定义的 UnsafeSendSync
自动实现了这两个 Trait ,因此 PTrieVec
和 TTrieVec
也都自动实现了这两个 Trait ,但我们不喜欢让 TTrieVec
能后被跨线程的分发与访问,于是通过“负实现”取消这两个 Trait 的自动实现。
transient
把持久向量变为属于当前线程的可变向量。
不会马上复制整棵树的所有节点,而只是 TrieVec 的根节点和尾节点。
使用宏 ensure_editable
作为警戒哨,确保最后得到一个属于当前线程的可变的节点。
macro_rules! ensure_editable {
($id:expr, $x:expr) => { {
let id = $id;
let x = $x;
if x.is_none() || id == x.id() {
x.clone()
} else {
node!(dup - with | id, x, NODE_SIZE)
}
} };
($x:expr) => {
ensure_editable!(edit!(), $x)
};
}
impl<T> PTrieVec<T> {
pub fn transient(self) -> TTrieVec<T>
where
T: Clone,
{
TTrieVec {
cnt: self.cnt,
root: ensure_editable!(&self.root),
tail: ensure_editable!(&self.tail),
}
}
}
push
impl<T> TTrieVec<T> {
#[inline(always)]
fn is_editable(&self) -> bool {
self.cnt == 0 || self.id() != 0
}
pub fn push(&mut self, v: T) -> Self
where
T: Clone,
{
assert!(self.is_editable());
// trie is empty
if self.cnt == 0 {
self.tail = node!(single - leaf | edit!(), v, NODE_SIZE)
}
// tail is available
// WARNING: neq `tail.len` for it's array capcity
else if self.cnt - tailoff!(self) < NODE_SIZE {
let leaf_i = self.cnt - tailoff!(self);
if self.tail.len() <= leaf_i {
self.tail =
node!(dup - with | self.id(), &self.tail, NODE_SIZE);
}
values_mut!(self.tail)[leaf_i] = v;
}
// tail is full
else {
self.push_tail_into_trie();
self.tail = node!(single - leaf | self.id(), v, NODE_SIZE);
}
self.cnt += 1;
Self {
cnt: self.cnt,
root: self.root.clone(),
tail: self.tail.clone(),
}
}
fn push_tail_into_trie(&mut self)
where
T: Clone,
{
debug_assert_eq!(self.tail.len(), NODE_SIZE);
let leaf_i = self.cnt - 1;
let mut lv = h!(self);
if lv == 0 {
self.root = self.tail.clone();
return;
} else if tailoff!(self) == NODE_SIZE.pow(lv) {
let root = node!(internal | self.id(), NODE_SIZE);
children_mut!(root)[0] = self.root.clone();
children_mut!(root)[1] =
new_path(self.id(), lv, &self.tail, NODE_SIZE);
self.root = root;
return;
}
debug_assert!(lv >= 2);
let mut p_i = idx!(leaf_i, lv);
let mut p = self.ensure_editable(&self.root);
self.root = p.clone();
while lv >= 3 {
// at p's level
if children!(p)[p_i].is_some() {
let cur = self.ensure_editable(&children!(p)[p_i]);
let cur_i = idx!(leaf_i, lv - 1);
children_mut!(p)[p_i] = cur.clone();
lv -= 1;
p_i = cur_i;
p = cur;
} else {
children_mut!(p)[p_i] =
new_path(self.id(), lv - 1, &self.tail, NODE_SIZE);
return;
}
}
children_mut!(p)[p_i] = self.tail.clone();
}
}
基本过程和持久化版本高度一致,有一个区别是:
- 持久化版本扩容时容量最多增加 $1$ ,而易变版本直接分配满额的节点;
- 直接修改 TrieVec 头,
self.root
,self.tail
assoc
impl<T> TTrieVec<T> {
pub fn assoc(&mut self, idx: usize, v: T) -> Self
where
T: Clone,
{
assert!(self.cnt >= idx);
assert!(self.is_editable());
if idx == self.cnt {
return self.push(v);
}
debug_assert!(self.cnt > 0);
if idx >= tailoff!(self) {
debug_assert!(self.tail.id() == self.id());
values_mut!(self.tail)[idx!(idx, 1)] = v;
} else {
let mut lv = h!(self);
debug_assert!(lv > 0);
let mut p_i = idx!(idx, lv);
let mut p = self.root.clone();
if lv > 1 {
// at p's level
loop {
let cur = self.ensure_editable(&children!(p)[p_i]);
children_mut!(p)[p_i] = cur.clone();
lv -= 1;
p_i = idx!(idx, lv);
p = cur;
if lv == 1 {
break;
}
}
}
values_mut!(p)[p_i] = v;
}
Self {
cnt: self.cnt,
root: self.root.clone(),
tail: self.tail.clone(),
}
}
}
pop
impl<T> TTrieVec<T> {
pub fn pop(&mut self) -> Self
where
T: Clone + Default,
{
assert!(self.cnt > 0, "Can't pop empty vector");
assert!(self.is_editable());
if self.cnt == 1 {
self.tail = Node::none();
} else if self.cnt - tailoff!(self) > 1 {
values_mut!(self.tail)[self.cnt - tailoff!(self) - 1] =
Default::default();
} else {
let tail = self.ensure_editable(&self.down_to_leaf(self.cnt - 2));
self.pop_tail_from_trie();
self.tail = tail;
}
self.cnt -= 1;
Self {
cnt: self.cnt,
root: self.root.clone(),
tail: self.tail.clone(),
}
}
fn pop_tail_from_trie(&mut self)
where
T: Clone,
{
let mut lv = h!(self);
debug_assert!(lv > 0);
if lv == 1 {
self.root = Node::none();
return;
}
debug_assert!(lv >= 2);
let leaf_i = self.cnt - 1 - 1;
let mut p_i = idx!(leaf_i, lv);
let mut p = self.root.clone();
while lv > 2 {
let cur = self.ensure_editable(&children!(p)[p_i]);
lv -= 1;
p_i = idx!(leaf_i, lv);
p = cur;
}
children_mut!(p)[p_i] = Node::none();
if self.cnt - 1 - NODE_SIZE == NODE_SIZE.pow(h!(self) - 1) {
self.root = children!(self.root)[0].clone();
}
}
}
persistent
把易变版本转换回持久化版本。改一下头尾节点的标记就可以。
impl<T> TTrieVec<T> {
pub fn persistent(self) -> PTrieVec<T>
where
T: Clone,
{
assert!(self.is_editable());
if self.tail.is_some() {
*id_mut!(self.tail) = no_edit!();
}
PTrieVec {
cnt: self.cnt,
root: self.root,
tail: self.tail,
}
}
}
由于 Rust 的编程模型,易变版本持久化后原来的易变版本保证无法访问,相比 Clojure 的 Java 实现会简化很多过程。
调试方法
打印方法
#[cfg(test)]
macro_rules! impl_trie_test_common {
() => {
#[allow(unused)]
fn debug_print(&self)
where
T: Debug,
{
debug_print(&self.root, &self.tail)
}
};
}
#[cfg(test)]
fn debug_print<T>(root: &Node<T>, tail: &Node<T>)
where
T: Debug,
{
use common::vecdeq;
let mut lv = 1usize;
let mut cur_q = vecdeq![];
println!();
println!("MAIN TRIE:");
println!();
if root.is_some() {
cur_q.push_back(vec![root]);
} else {
println!("empty.\n");
}
while !cur_q.is_empty() {
println!("############ Level: {} #############\n", lv);
let mut nxt_q = vecdeq![];
while let Some(group) = cur_q.pop_front() {
for (i, child) in group.into_iter().enumerate() {
println!("{i:02}. {child:?}");
if child.is_internal() {
let child_group = children!(child)
.iter()
.filter(|&x| x.is_some())
.collect();
nxt_q.push_back(child_group);
}
}
println!();
}
cur_q = nxt_q;
lv += 1;
}
// print tail
println!("###################################\n");
println!("TAIL: \n");
if tail.is_some() {
println!("{:?}", tail);
} else {
println!("empty.");
}
println!("------------- end --------------");
}
尾声
注解
-
参考自 hyPiRion 介绍的系列博文 和 Clojure 源代码 ↩
-
https://hypirion.com/musings/understanding-persistent-vector-pt-3#the-rationale-for-tails ↩
-
在原版 Clojure 的 Java 里,标志使用了原子类型,这主要是担心易变类型在跨线程使用时重新变为持久类型时的竞争问题,但是由于 Rust 的编程模型限制了易变类型不允许跨线程使用,因此就不需要变为原子类型 ↩
-
Atomic Rc ↩
-
对于树高至少为 $2$ 作为的 Trie 来讲,空路径应该是根节点的第二个孩子, ↩