Rust实践

依赖关系、引用、计数(Arc)与锁(Mutex)

  • 功能模块的设计清晰,最终一定是DAG,即使出现循环依赖也可以通过抽取公共部分为第三模块来解除循环依
    • Rust里类型初始化每个成员必须强制显示初始化,所以这种循环依赖其实写不出来,虽然也可以通过Option来绕过,但是不推荐这种做法
  • 引用是表达依赖关系的方式
    • 当一个模块只被某个模块唯一引用那么其实就是own的关系,这时在模块初始化时用move语义即可(即拥有所有权)
    • 当一个模块被多个模块引用时,会出现生命周期问题,即:被引用模块的生命周期需要大于等于引用它的所有模块的生命周期,否则容易出现use after free问题
    • 依赖关系不只是模块(struct)间,也可能是此struct方法成员启动的协程/线程间,每个协程/线程都有自己的生命周期,通过struct方法启动则说明对其有依赖,也就是此struct会被多方引用(即依赖)
  • 原子计数(Arc)是解决多引用生命周期问题很自然的工具,类似C++智能指针。Arc对象可以被clone,这样每个引用者就可以own此Arc的拷贝(注意own的是Arc拷贝的所有权,而不是持有引用),而通过此Arc的拷贝,间接持有了目标对象的引用,而Arc的原子计数可以保证目标对象的生命周期至少比其所有引用持有者长。
  • 当目标对象通过Arc被多方持有后,生命周期问题得以解决(不会出现use after free问题),但是多个引用者需要并发访问(读写)目标对象。显然,只有Arc原子引用计数是不够的(并且Rust的借用规则并不允许多个&mut),这里需要做到共享与互斥,也就需要Atomic/Mutex/RwLock来控制对通过Arc引用的目标对象的并发读写。(不要去管Send/Sync,定义很难理解"准确") 以上,是Arc<Mutex>、Arc<RwLock>常见用法的由来。常见模式:
Handler -> TableManager -> ElectionInfo
Handler -> TabletManager -> ElectionInfo

pub struct Handler {
    pub table_manager: Arc<TableManager>,  // 持有Arc副本
    pub tablet_manager: Arc<TabletManager>,
}

pub struct TableManager {
    election_info: Arc<ElectionInfo>,
}

pub struct TabletManager {
    election_info: Arc<ElectionInfo>,
    tablets: RwLock<TabletRouter>, // 并发读写
    id_allocator: AtomicU64, // 并发读写
}

// 怎么用
let election_info = Arc::new(ElectionInfo::new());
let table_manager = Arc::new(TableManager::new(election_info.clone()))
let tablet_manager = Arc::new(TableManager::new(election_info.clone()))
let handler = Handler::new(table_manager.clone(), tablet_manager.clone())
tokio::spawn({
    election_info.xxx
})
tokio::spawn({
    table_manager.yyy
})
tokio::spawn({
    tablet_manager.zzz
})
tokio::spawn({
    handler.table_manager.yyy
})
tokio::spawn({
    handler.tablet_manager.id_allocator.load()
    let router = handler.tablet_manager.tablets.write().unwrap();
    router.ggg
    // router unlocked
})

错误处理

  • 实现自定义Error的From并善用?;能极大降低不同层次错误转换的冗余代码,当然有时需要更丰富的错误信息时,可以手动构造错误。此外,出现错误时希望打印日志或者metrics等信息然后通过?;快速返回,可以使用map_err
pub async fn init_table(&self, table: &Table) -> Result<(), TabletManagerError> {
    if self.partitions.read().unwrap().get(&table_name).is_some() {
       // 手动构造TabletManagerError,errmsg中注入更多信息
       return Err(TabletManagerError::new(&format!(
           "{}:table {table_name} partitions already exist, can not init again",
           ManagerErrorKind::TableAlreadyExist,
       )));
    }
    ....
    // map_err打印错误信息
    let rw_replica_val = serde_json::to_string(&rw_replica_route_info).map_err(|err| {
        tracing::error!(
            "init tablet serialize rw replica data failed {err}, {table_name} {tablet_id}"
        );
        err
    })?; // ?;错误快速返回,并由于TabletManagerError实现了From,这里会自动转换json错误为TabletManagerError
    ....
}

impl From<serde_json::Error> for TabletManagerError {
    fn from(e: serde_json::Error) -> Self {
        TabletManagerError::new("SystemError:serde json error")
    }
}
  • 在不希望明确具体的错误类型时,可以使用anyhow::Result/anyhow::format_error!
// 返回Result<>类型
pub async fn on_leader_start(&self) -> Result<()> {
    self.tablet_task_manager
        .on_leader_start()
        .await
        .map_err(|err| {
            // 构造anyhow::Error
            anyhow::format_err!("call tablet task manager on leader start failed {err}",)
    })
}

std::sync & tokio::sync

  • 垮await或者会导致线程长时间阻塞(例如睡眠挂起),用tokio::sync::Mutex/RwLock/Channel
pub struct SystemInfoManager<T: KvStore> {
    replica_epoch_allocator: tokio::sync::Mutex<IdAllocator<T>>,
}

impl<T: KvStore> SystemInfoManager<T> {
    pub async fn alloc_replica_epoch(&self) -> Result<u64, SystemInfoManagerError> {
        // replica_epoch_allocator跨await,需要使用异步版本的Mutex,否则可能导致线程hang
        let mut id_allocator = self.replica_epoch_allocator.lock().await;
        let new_id = id_allocator.alloc_id().await.map_err(|err| {
            let err_msg = format!("alloc replica epoch failed {err}");
            tracing::error!(err_msg);
            SystemInfoManagerError::new(&err_msg)
        })?;
        Ok(new_id)
    }
}
  • 不垮await且临界区较短,使用std::sync::Mutex/RwLock,性能更好

结构体方法中启动协程定时任务

  • 参数Arc,原因在于tokio::spawn的协程需要共享结构体的所有权,并对其进行读写访问
  • tokio::select!,类似go中的select {}
  • tokio_util::sync::CancellationToken
impl TabletReplicaController {
    pub fn run(self: Arc<Self>) {
        lust::spawn({
            async move {
                let cloned_token = self.base.cancel_token.clone();
                let mut tick_count = 0;
                loop {
                    let interval =
                        Duration::from_secs(get_config().tablet_replica_reconcile_interval_sec);
                    let enabled = get_config().enable_tablet_replica_reconcile_controller;
                    tokio::select! {
                        _ = cloned_token.cancelled() => {
                            tracing::info!("tablet replica reconcile controller exit on stop signal");
                            return;
                        }
                        _ = tokio::time::sleep(interval) => {
                            tick_count += 1;
                            if !self.base.election_info.is_leader() {
                                tracing::info!("tablet replica reconcile controller not on leader, just skip");
                                continue;
                            }
                            if !enabled {
                                tracing::info!("tablet replica reconcile controller not enabled, just skip");
                                continue;
                            }
                            if let Err(err) = self.do_tablet_replica_reconcile().await {
                                metrics(metric::defs::TABLET_REPLICA_CONTROLLER_RECONCILE_FAIL_COUNT).emit(tags![], 1);
                                tracing::error!("do tablet replica reconcile failed {err}, tick {tick_count}");
                                continue;
                            }
                            metrics(metric::defs::TABLET_REPLICA_CONTROLLER_RECONCILE_SUCCEED_COUNT).emit(tags![], 1);
                            tracing::debug!("do tablet replica reconcile success, tick {tick_count}");
                        }
                    }
                }
            }
        });
    }
}

Send & Sync

按Rust的官方定义:

  • A type is Send if it is safe to send it to another thread
  • A type is Sync if it is safe to share between threads (T is Sync if and only if &T is Send) 这里不做具体解释,只举几个例子直观“感受”下:
  • MutexGuard是sync但不是send的,是为了保证锁必须是在同一个线程中加锁和解锁(试想如果是send的,被传递到其他线程析构解锁了,就违反此要求了)
  • Cell是send但不是sync的,&Cell可能因为内部可变性但是并没加锁导致并发读写问题,因此&Cell不是send的,也即Cell不是sync的。Cell拥有完整的数据T,可以转移所有权send到其他线程,所以Cell是send的
  • Rc既不是send也不是sync的,&Rc不是send好理解,内部计数不是原子的存在并发问题,Rc不是send的原因在于引用计数不是原子的,所以可能多个线程以为自己是data的唯一owner,同时释放内存,而Arc不会出现这种情况
  • T是send,则Mutex是send+sync。Mutex的互斥保证&内部可变性,从而让不同线程能拿到&mut T(如果不通过内部可变性来实现,按照Rust的借用规则,根本写不出来存在多个&mut引用的代码),从而写T的值。Arc是send+sync要求T是send+sync。因此Arc<Mutex>是send+sync

条件编译

  • 在编译期间根据传入的不同flag选择不同的实现,典型场景:跑测试时持久化store层可以是内存mock版、或者其他外部存储,我们希望复用测试用例,在跑本地单测时使用内存mock版,而在跑回归测试时使用线上外部存储的版本
(1) 在Cargo.toml中定义features
[features]
mock_meta_store = [] # 存储使用内存mock形式,主要用于单测
(2) 在代码中添加条件编译的实现
#[cfg(not(feature = "mock_meta_store"))]
pub type MetaStoreType = meta::GS3BasedKvStore;
#[cfg(feature = "mock_meta_store")]
pub type MetaStoreType = mock::MockMetaStore;
(3) 编译/测试/运行时指定特定features flag
cargo test/build/run --features "mock_meta_store"

Enum默认值

#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub enum TabletTaskType {
    #[default] // 加此标注即可
    Invalid,
    AddReplica,
    RemoveReplica,
    FailoverReplica,
    Split,
    Merge,
    Reconcile,
}

实现defer

  • scopeguard这个crate里:scopeguard::defer!、scopeguard::guard,简单场景可以使用defer!即可,guard实测在一些宏的场景下不适用(guard会改变传入其中的类型,变成类似智能指针的东西)
use scopeguard::defer;

fn some_func() {
    ....
    defer! {
        metrics(metric::defs::TASK_MANAGER_SUBMIT_LATENCY).emit(tags, Instant::now().duration_since(start).as_nanos() as i64);
    }
    ....
}

HashMap相关用法

  • entry访问方式 一行代码方便实现“key不存在则插入默认值再修改”
 ro_num_diff_by_idc
     .entry(table_name.clone())
     .or_default()
     .insert(*tablet_id, tablet_ro_scale_info);
 
 *current.entry(info.idc.clone()).or_default() += 1;

Vec相关用法

  • 预分配
let v = Vec::with_capacity(1024);
let v = vec![-1, 1024]; // 长度1024,每个元素初始化值为-1
let v = (0..1024).collect(); // 长度1023,元素为[0, 1024)
  • Vec to Vec 函数式编程链式写法,更简洁
vec_u64.into_iter().map(|n| n as i64).collect()

测试的组织

  • mod、crate、package
    • crate是编译单元,mod过多crate少无法有效利用并行编译,mod过少crate过多会增加代码改动重构成本,需要结合实际项目大小做取舍,对于大多数小型项目(~2万loc)单个crate足够
  • 单元测试、集成测试
    • 运行单测:cargo test – –nocapture
    • 运行集成测试:cargo test –test integration – –nocapture
// Cargo.toml中加入集成测试模块的定义
[[test]]
name = "integration"
path = "src/tests/integration/mod.rs"
test = "false" // 默认不运行

// 参考的项目组织结构
├── build.rs // 控制编译行为的脚本
├── Cargo.lock
├── Cargo.toml
├── clippy.toml // clippy检查规则
├── metaservice.conf // 进程启动配置文件
├── rust-toolchain.toml // 此package依赖的rust工具链
└── src
    ├── bin
    │   └── metaservice.rs // 进程启动入口
    ├── config // 配置
    │   ├── defs.rs
    │   └── mod.rs
    ├── handler // API实现
    │   ├── defs.rs
    │   └── mod.rs
    ├── lib.rs
    ├── manager // 逻辑层,单元测试放在代码实现文件中,也可对应拆分一个xx_test.rs文件
    │   ├── controller.rs
    │   ├── error.rs
    │   ├── task.rs
    │   ├── mod.rs
    │   └── ......
    ├── metric // 其他监控、日志、metric等模块
    │   ├── defs.rs
    │   └── mod.rs
    ├── store // 持久化层
    │   ├── error.rs
    │   ├── meta.rs
    │   ├── mock.rs
    │   └── mod.rs
    └── tests // 集成测试、故障注入测试等,通常运行时间相比单测较长
            └── integration // 集成测试,所有测试函数以"integration_test"开头
                ├── cluster.rs
                ├── common.rs
                ├── controller.rs
                ├── mod.rs
                ├── table.rs
                └── tablet.rs
comments powered by Disqus