斐波那契堆的实现(for Rust)
算法理论部分,在上一篇文章 Fibonacci Heap 已经讲完了,下面是实现的部分1。
关于节点索引
因为我们需要为 Fibonacci Heap 实现 DecreaseKey 的方法,因此需要维护一个用于记录堆中每个节点索引的 Map 。
作为某种对比,Rust 标准库里的 BinaryHeap 根本就没有节点索引,可它居然也宣称支持 DecreaseKey 的方法,它只是插入“同值”的新节点来一定程度地代替 DecreaseKey,这比红黑树里面用“假交换”来做树删除节点后的重平衡还要糟糕:
- 需要用户自己定义一个额外的结构体,然后为它实现一些必要的 Trait,非常的麻烦;
- 显然破坏了 DecreaseKey 的完整语义,引入潜在的Bug。
不过另一方面,维护节点索引会显著地增加性能开销,让完美的数据结构不那么完美。
数据结构
Node
Structure & Wrappers
struct Node<I, T>(Option<Rc<RefCell<Node_<I, T>>>>);
/// Used for reverse reference to avoid circular-reference
///
/// So we can easy auto drop
struct WeakNode<I, T>(Option<Weak<RefCell<Node_<I, T>>>>);
#[derive(Clone)]
struct Node_<I, T> {
idx: I,
key: *mut T,
rank: usize, // children number
/// rev ref
left: WeakNode<I, T>,
right: Node<I, T>,
/// rev ref
paren: WeakNode<I, T>,
child: Node<I, T>,
/// Indicate that it has lost a child
marked: bool,
}
Sugar Macros
////////////////////////////////////////
//// Attr macros
/// Clone attr
macro_rules! attr {
($node:expr, $attr:ident) => {
{
let _unr = $node.clone().0.unwrap();
let _bor = _unr.as_ref().borrow();
let _attr = _bor.$attr.clone();
drop(_bor);
_attr
}
};
}
macro_rules! mattr {
($node:expr, $attr:ident) => {
$node.clone().0.unwrap().as_ref().borrow_mut().$attr
};
}
macro_rules! def_attr_macro {
($($name:ident),+) => {
$(
macro_rules! $name {
($node:expr) => {
attr!($$node, $name)
}
}
concat_idents! (mname = m, $name {
#[allow(unused)]
macro_rules! mname {
($node:expr) => {
mattr!($$node, $name)
}
}
});
)+
};
}
macro_rules! key {
($node:expr) => {
unsafe { &*attr!($node, key) }
};
}
def_attr_macro!(left, right, child, paren, idx, rank, marked);
Head
Structure
- $\texttt{len}$ 和 $\texttt{rcnt}$ ,分别是节点数和根节点个数,它们只是额外的基本统计信息,可以省略;
- $\texttt{min}$ 是最小根;
- $\texttt{nodes}$ 是索引,这里只关注主要功能的实现,所以使用 HashMap ,如果还要支持 $O(1)$ 的 Union 方法就需要换成 Ordered TreeMap,比如二叉搜索树或者B+树
pub trait CollKey = Ord + Debug;
pub struct FibHeap<I: CollKey, T: CollKey> {
len: usize,
/// roots count
rcnt: usize,
min: Node<I, T>,
/// index of nodes
nodes: HashMap<I, Node<I, T>>,
}
Sugar Macros
macro_rules! boxptr {
($v:expr) => {
Box::into_raw(Box::new($v))
};
}
macro_rules! unboxptr {
($ptr:expr) => {
unsafe { *Box::from_raw($ptr) }
};
}
macro_rules! node {
($i:expr, $k:expr) => {
node!($i, $k, 0, false)
};
($i:expr, $k:expr, $rank:expr, $marked:expr) => {
Node(Some(Rc::new(RefCell::new(Node_ {
idx: $i,
key: boxptr!($k),
rank: $rank,
left: WeakNode::none(),
right: Node::none(),
paren: WeakNode::none(),
child: Node::none(),
marked: $marked,
}))))
};
}
macro_rules! unwrap_into {
($node:expr) => {
std::rc::Rc::try_unwrap($node.0.unwrap())
.unwrap()
.into_inner()
};
}
辅助方法
算法无关的基础方法
impl<I, T> Node<I, T> {
fn downgrade(&self) -> WeakNode<I, T> {
WeakNode(self.0.clone().map(|ref rc| Rc::downgrade(rc)))
}
fn none() -> Self {
Self(None)
}
fn is_some(&self) -> bool {
self.0.is_some()
}
fn is_none(&self) -> bool {
self.0.is_none()
}
fn replace(&mut self, node: Node<I, T>) -> Self {
let old = Node(self.0.clone());
self.0 = node.0;
old
}
fn rc_eq(&self, other: &Self) -> bool {
match self.0 {
Some(ref rc1) => {
if let Some(ref rc2) = other.0 {
Rc::ptr_eq(rc1, rc2)
} else {
false
}
}
None => other.is_none(),
}
}
fn children(&self) -> Vec<Self> {
let mut child = child!(self);
let mut res = vec![];
while child.is_some() {
res.push(child.clone());
child = right!(child);
}
res
}
/// replace with new val, return old val
fn replace_key(&self, key: T) -> T {
let oldk = attr!(self, key);
let newk = boxptr!(key);
mattr!(self, key) = newk;
unboxptr!(oldk)
}
}
impl<I, T> Clone for Node<I, T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<I, T> Default for Node<I, T> {
fn default() -> Self {
Self::none()
}
}
impl<I, T> WeakNode<I, T> {
fn upgrade(&self) -> Node<I, T> {
Node(self.0.clone().map(|weak| weak.upgrade().unwrap()))
}
fn none() -> Self {
Self(None)
}
fn is_none(&self) -> bool {
self.0.is_none()
}
}
impl<I, T> Clone for WeakNode<I, T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<I: CollKey + Hash + Clone, T: CollKey> FibHeap<I, T> {
pub fn new() -> Self {
Self {
len: 0,
rcnt: 0,
min: Node::none(),
nodes: HashMap::new(),
}
}
/// from self.min go through all roots
fn roots(&self) -> Vec<Node<I, T>> {
let mut sibs = vec![];
if self.min.is_none() {
return sibs;
}
else {
sibs.push(self.min.clone());
}
let mut sib = right!(self.min);
while !sib.rc_eq(&self.min) {
sibs.push(sib.clone());
sib = right!(sib);
}
sibs
}
fn remove_from_index(&mut self, x: &Node<I, T>) -> I {
let k = idx!(x);
self.nodes.remove(&k);
k
}
}
impl<I: CollKey, T: CollKey> Drop for FibHeap<I, T> {
fn drop(&mut self) {
if self.len > 0 {
// break circle dependency to enable drop
let tail = left!(self.min).upgrade();
mright!(tail) = Node::none();
self.nodes.clear();
}
}
}
算法相关的基础方法
摘儿子
在 update-key 的时候,需要把违背堆性质或者失去超过一个孩子的非根节点从父结点摘除。这就是这个方法的作用,需要仔细考虑,每一步都不能忽略:
假设被摘的节点为 $\texttt{x}$
- 如果 $\texttt{x}$ 的左邻居不为空,把左邻的右邻连接到 $\texttt{x}$ 的右邻,否则 $\texttt{x}$ 就是父节点的第一个孩子,这时还需要更新父节点的孩子到 $\texttt{x}$ 的右邻;
- 如果 $\texttt{x}$ 的右邻居不为空,还把右邻的左邻更新为 $\texttt{x}$ 的左邻;
- 父节点的 $\texttt{rank–}$ ;
- 摘出来的节点去头和左右邻,只保留它的孩子
impl<I, T> Node<I, T> {
fn cut_child(&self, x: Node<I, T>) {
if !left!(x).is_none() {
mright!(left!(x).upgrade()) = right!(x);
} else {
mchild!(self) = right!(x);
}
if !right!(x).is_none() {
mleft!(right!(x)) = left!(x);
}
mrank!(self) = rank!(self) - 1;
x.purge_as_root();
}
/// remove paren, left and right
fn purge_as_root(&self) {
mparen!(self) = WeakNode::none();
mleft!(self) = WeakNode::none();
mright!(self) = Node::none();
}
}
锄根
在执行 pop
操作的时候,需要把一个根节点从根链表中移除:
- 根数自减,如果不为 $0$ ,意味着根链表还存在,就把左邻的右邻连接到右邻,右邻的左邻连接到左邻;
- purge
// Impl FibHeap ...
fn remove_from_roots(&mut self, x: Node<I, T>) {
self.rcnt -= 1;
if self.rcnt > 0 {
mright!(left!(x).upgrade()) = right!(x);
mleft!(right!(x)) = left!(x);
}
x.purge_as_root();
}
// ...
插苗
锄根的逆过程,把一个节点 $\texttt{x}$ 插入到根链表中:
- 根数 $\texttt{rcnt}$ 自增,保护性地 purge 节点 $\texttt{x}$ ;
- 如果根链表不存在,把 $\texttt{x}$ 作为根链表的唯一节点,让它成为最小根并且左右邻指向自己;
- 否则,头插法插入 $\texttt{x}$ , $\texttt{x}$ 的右邻链接到最小根的右邻, $\texttt{x}$ 左邻链接到最小根,然后最小根的右邻链接到 $\texttt{x}$ , $\texttt{x}$ 的右邻的左邻链接到 $\texttt{x}$
// Impl FibHeap ...
fn push_into_roots(&mut self, x: Node<I, T>) {
self.rcnt += 1;
x.purge_as_root();
if self.min.is_none() {
self.min = x;
mleft!(self.min) = self.min.downgrade();
mright!(self.min) = self.min.clone();
}
else {
mright!(x) = right!(self.min);
mleft!(x) = self.min.downgrade();
mright!(self.min) = x.clone();
mleft!(right!(x)) = x.downgrade();
}
}
// ...
高空台 (Altitude Ground Test Facilities)
欲要善其事,必先利其器。
任何有一定复杂度的实现都非一蹴而就,而越是复杂度高的实现,越是需要测试与调试的部分先行,越是由测试的全面性、彻底性与调试工具的效率决定其进程速度。
打印方法
一个全面而又简洁的打印方法是进行状态观察的基础手段。
-
对于 Fib 堆,简单遍历根节点地逐树打印,表明最小根;
-
树的打印遵循 BFS 顺序,每层一行,同层不同父节点的由分号分开, 并且在开头表明父节点序号;
-
单个节点打印它的权重(key)和索引(idx),如果被标记还要打印标记
impl<T: CollKey, K: CollKey> Display for FibHeap<T, K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut sib = self.min.clone();
for i in 1..=self.rcnt {
writeln!(f, "{} ({i:03}) {}", "-".repeat(28), "-".repeat(28))?;
// writeln!(f)?;
if sib.rc_eq(&self.min) {
write!(f, "M=>")?;
}
writeln!(f, "{}", sib)?;
debug_assert!(sib.is_some());
sib = right!(sib);
}
writeln!(f, "{}>> end <<{}", "-".repeat(28), "-".repeat(28))?;
Ok(())
}
}
impl<I: Debug, T: Debug> Display for Node<I, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "R({:?}) ", self)?;
let mut curq = vec![(self.clone(), self.children())];
loop {
let mut nxtq = vec![];
for (p, children) in curq {
if children.is_empty() {
break;
}
write!(f, "P({:?}) ", p)?;
let childlen = children.len();
for (i, child) in children.into_iter().enumerate() {
write!(f, "{:?}", child)?;
if i < childlen - 1 {
write!(f, ", ")?;
}
nxtq.push((child.clone(), child.children()));
}
write!(f, "; ")?;
}
if !nxtq.is_empty() {
writeln!(f)?;
curq = nxtq;
} else {
break;
}
}
Ok(())
}
}
impl<I: Debug, T: Debug> Debug for Node<I, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_none() {
write!(f, "None")
} else {
write!(f, "{:?}", self.0.as_ref().unwrap().as_ref().borrow())
}
}
}
impl<I: Debug, T: Debug> Debug for Node_<I, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?}[{:?}]{}",
self.idx,
unsafe { &*self.key },
if self.marked { " X" } else { "" }
)
}
}
克隆方法
测试的时候,需要验证某一个状态是否正确,而当验证过程会影响测试对象本身状态时,就需要一个克隆方法来克隆一个被测试对象来进行这种有副作用地验证。
- 创建新的索引目录,对根链表逐根完全克隆,每克隆完一个,就和前一个克隆的节点双向链接起来,最后把头尾相连,构成一个循环链表;
- 对于根链表上的每棵树克隆的时候,先创建新的根节点,并把新创建的节点加入新的索引目录里,然后对它孩子递归地调用节点的完全克隆方法,创造出的孩子左右相连,每个孩子的父节点也要指向自己创建的新的根节点
impl<I: CollKey + Hash + Clone, T: CollKey + Clone> Clone for FibHeap<I, T> {
fn clone(&self) -> Self {
let len = self.len;
let rcnt = self.rcnt;
let mut nodes = HashMap::new();
let min;
let mut roots_iter = self.roots().into_iter();
if let Some(_min) = roots_iter.next() {
min = self.overall_clone(&mut nodes, _min.clone());
let mut cur = min.clone();
for root in roots_iter {
let newroot = self.overall_clone(&mut nodes, root);
mright!(cur) = newroot.clone();
mleft!(newroot) = cur.downgrade();
cur = newroot;
}
mright!(cur) = min.clone();
mleft!(min) = cur.downgrade();
} else {
min = Node::none();
}
Self {
len,
rcnt,
min,
nodes,
}
}
}
impl<I: CollKey + Hash + Clone, T: CollKey + Clone> FibHeap<I, T> {
fn overall_clone(
&self,
nodes: &mut HashMap<I, Node<I, T>>,
x: Node<I, T>,
) -> Node<I, T> {
if x.is_none() {
return Node::none();
}
// overall clone node body
let newx = node!(idx!(x), key!(x).clone(), rank!(x), marked!(x));
// update index reference
nodes.insert(idx!(x), newx.clone());
// recursive call it
let mut childen_iter = x.children().into_iter();
if let Some(child) = childen_iter.next() {
let newchild = self.overall_clone(nodes, child);
mchild!(newx) = newchild.clone();
mparen!(newchild) = newx.downgrade();
let mut cur = newchild;
for child in childen_iter {
let newchild = self.overall_clone(nodes, child);
mright!(cur) = newchild.clone();
mleft!(newchild) = cur.downgrade();
cur = newchild;
}
}
newx
}
}
主要算法方法
Fib 堆核心的算法就是三个:
- Push
- Pop
- DecreaseKey
Push 方法
根据算法,创建新的节点和索引后,直接插入到根链表中; 如果 $\texttt{key}$ 比最小根小,就更新最小根。
// Impl FibHeap ...
pub fn push(&mut self, i: I, v: T) {
let node = node!(i.clone(), v);
self.nodes.insert(i, node.clone());
self.push_into_roots(node.clone());
if key!(node) < key!(self.min) {
self.min = node;
}
self.len += 1;
}
// ...
Pop 方法
(这里从使用的接口上讲,实际上需要的是 pop_item
)
- 如果长度为 $0$ ,不存在最小根,直接返回
None
; - 否则长度自减,从最小根的右邻开始,遍历所有邻居,找到新的最小节点,如果最小根没有邻居(除它以外),就得到一个空节点;
- 把旧的最小根指向的节点从根链表摘除,最小根指向新的最小节点(包括可能是空节点),开启树的规整,合并同秩树;
- 树规整:创建一个 $\texttt{rank => node}$ 的 Map,遍历根链表,对于每个根,递归地查询 Map 是否有同秩的节点已经加入,有就合并两棵树,然后更新当前根节点为合并后的树的根节点,递归查询合并结束后,就插入当前树的秩和节点到 Map 中
- 把旧的最小根节点从索引删除,返回旧的最小根的节点
// impl FibHeap ...
pub fn pop(&mut self) -> Option<T> {
self.pop_item().map(|x| x.1)
}
pub fn pop_item(&mut self) -> Option<(I, T)> {
if self.min.is_none() {
return None;
}
self.len -= 1;
/* push children of oldmin into roots */
for child in self.min.children() {
self.push_into_roots(child.clone());
}
/* update min */
let newmin = self.roots()[1..]
.into_iter()
.min_by_key(|&sib| key!(sib))
.cloned()
.unwrap_or_default();
/* just del old min */
self.remove_from_roots(self.min.clone());
let oldmin = self.min.replace(newmin);
self.consolidate();
Some((
self.remove_from_index(&oldmin),
unboxptr!(unwrap_into!(oldmin).key),
))
}
/// merge same rank trees recusively
pub fn consolidate(&mut self) {
let mut rank: HashMap<usize, Node<I, T>> = hashmap!();
for mut sib in self.roots() {
while let Some(x) = rank.remove(&rank!(sib)) {
sib = self.merge_same_rank_root(x, sib);
}
rank.insert(rank!(sib), sib);
}
}
// ...
DecreaseKey 方法
对节点 $\texttt{x}$ 降 $\texttt{key}$ ,首先检查一下索引(检查是否 $\texttt{x}$ 的新 $\texttt{key}$ 确实较小),检查一下是否不是根节点并且堆性质违背(比父节点的 $\texttt{key}$ 更小),如果是,标记 $\texttt{x}$ ,设置 cut-meld-unmark 递归的起始点为 $\texttt{x}$ ,假装 $\texttt{x}$ 本身也是符合cut-meld-unmark 条件的一个父节点,否则设置起始点为空节点。最后由于是降 $\texttt{key}$ ,还要检查最小根是否需要更新为 $\texttt{x}$ 。
// impl FibHeap ...
pub fn decrease_key(&mut self, i: I, v: T) -> Option<T> {
let x;
match self.nodes.entry(i.clone()) {
Occupied(ent) => {
x = ent.get().clone();
let oldv = x.replace_key(v);
debug_assert!(
key!(x) < &oldv,
"decrease violated! {:?} !(<) {:?}",
key!(x),
&oldv
);
self.decrease_key_(x);
Some(oldv)
}
Vacant(_ent) => {
unreachable!("Empty index {i:?}")
}
}
}
fn decrease_key_(&mut self, x: Node<I, T>) {
let unmeld_ent;
let p = paren!(x);
if !p.is_none() && key!(x) < key!(p.upgrade()) {
// 假装x节点本身也是一个符合条件的父节点
mmarked!(x) = true;
unmeld_ent = x.downgrade();
} else {
unmeld_ent = WeakNode::none();
}
self.cut_meld_unmark_to_roots(unmeld_ent);
if key!(x) < key!(self.min) {
debug_assert!(paren!(x).is_none());
self.min = x;
}
}
// ...
cut-meld-unmark to roots :
如果起始节点是空节点,就退出;递归地检查当前节点是否不为根节点并且被标记,是,就取消标记,从父节点被摘出,推进根链表,然后以父节点为新的起始节点。
// impl FibHeap ...
fn cut_meld_unmark_to_roots(&mut self, ent: WeakNode<I, T>) {
if ent.is_none() {
return;
}
let mut x = ent.upgrade();
let mut p = paren!(x);
while marked!(x) && !p.is_none() {
let strongp = p.upgrade();
strongp.cut_child(x.clone());
self.push_into_roots(x.clone());
mmarked!(x) = false;
x = strongp;
p = paren!(x);
}
// 定义上不标记根,但这应该是无所谓的,标记对于可能的pop导致的树规整后的树情况更精确
mmarked!(x) = true;
}
// ...
额外算法方法
Update 方法
从语义上可以把 decrease-key
的方法拓展为完整的更新key的方法 。
- 如果索引不存在,就插入节点,否则替换节点;
- 如果 $\texttt{key}$ 相等,什么都不做;
- 如果 $\texttt{key}$ 降低,就
decrease-key
; - 如果增加,就
increase-key
// impl FibHeap ...
pub fn update(&mut self, i: I, v: T) -> Option<T> {
match self.nodes.entry(i.clone()) {
Occupied(ent) => {
let x = ent.get().clone();
let oldv = x.replace_key(v);
match key!(x).cmp(&oldv) {
Less => self.decrease_key_(x),
Equal => (),
Greater => self.increase_key_(x),
}
Some(oldv)
}
Vacant(_ent) => {
self.push(i, v);
None
}
}
}
// ...
increase-key :
注意: 由于当对最小根 increase-key
的时候需要重新搜索最小根,时间复杂度为 $O(\texttt{rank})$ ,在不保证严格二项堆的性质时,最坏时间复杂度为 $O(\texttt{n})$ , 会破坏了整个 update
其他部分的 $O(\texttt{logn})$ 的性质。
increase-key
的时候,同样地,当堆性质违背的时候(新的 $\texttt{key}$ 大于孩子节点)递归地执行 cut-meld-unmark 操作,区别在于升 $\texttt{key}$ 的是 $\texttt{x}$ 而要扫描的是x的孩子节点,好像是 $\texttt{x}$ 的孩子们降 $\texttt{key}$ 造成的堆性质违背。统计 $\texttt{x}$ 总共因为堆违背失去的孩子数(如果 $\texttt{x}$ 已被标记,初始化为 $1$ 否则为 $0$ )。
检查失去的孩子节点数,如果没有,就把启动点设置为空节点;如果恰好为 $1$ ,确认标记x,以 $\texttt{x}$ 的父节点为启动点;如果大于 $1$ ,标记 $\texttt{x}$ ,以 $\texttt{x}$ 本身为启动点。
cut-meld-unmark 递归地操作完成后,如果升 $\texttt{key}$ 的节点是最小根指向的节点,还要遍历根链表,更新最小根
// impl FibHeap ...
fn increase_key_(&mut self, x: Node<I, T>) {
let ent;
let mut children_lost = if marked!(x) { 1 } else { 0 };
for child in x.children() {
if key!(child) < key!(x) {
x.cut_child(child.clone());
self.push_into_roots(child.clone());
mmarked!(child) = false;
children_lost += 1;
}
}
match children_lost.cmp(&1) {
Less => {
ent = WeakNode::none()
}
Equal => {
mmarked!(x) = true;
ent = paren!(x);
},
Greater => {
mmarked!(x) = true;
ent = x.downgrade();
},
}
self.cut_meld_unmark_to_roots(ent);
if x.rc_eq(&self.min) {
let min_node =
self.roots().into_iter().min_by_key(|x| key!(x)).unwrap();
self.min = min_node;
}
}
// ...
典例片段
Prim 最小生成树
pub fn mst_prim(g: &Graph) -> Vec<(usize, usize)> {
let mut res = vec![];
/* choose an arbitray node as root */
let mut viter = g.vertexs().cloned();
let root;
if let Some(_root) = viter.next() {
root = _root;
} else {
return res;
}
/* setup rest collection */
let mut rest: HashSet<usize> = HashSet::new();
/* init dis heap && dis edge map */
let mut dis = FibHeap::new();
let mut dis_edge = HashMap::new();
dis.push(root, 0);
dis_edge.insert(root, Some(root));
for v in viter {
rest.insert(v);
dis.push(v, isize::MAX);
dis_edge.insert(v, None);
}
while !rest.is_empty() {
// u is current vertex
let (u, _uw) = dis.pop_item().unwrap().clone();
// "decrease-key" (It's increase-key actually for min-heap)
rest.remove(&u);
let u_pair = get!(dis_edge => u).unwrap();
if u_pair != u {
res.push((u, u_pair));
}
// calc adj
let adjs = get!(g.e => u);
/* update dis heap */
for v in adjs.into_iter().filter(|v| rest.contains(v)) {
let w_uv: isize = get!(g.w => (u, v));
if w_uv < *get!(dis => v) {
dis.decrease_key(v, w_uv);
dis_edge.insert(v, Some(u));
}
}
}
res
}
引用
-
https://github.com/minghu6/rust-minghu6/blob/snapshot-1/src/collections/heap/fib.rs ↩